Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail backoff and fail metadata #419

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/_generated/spawn/actors/healthcheck.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckReply do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
@moduledoc false

use GRPC.Service,
name: "spawn.actors.healthcheck.HealthCheckActor",
protoc_gen_elixir_version: "0.14.0"
Expand Down Expand Up @@ -717,6 +719,8 @@ defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Service do
end

defmodule Spawn.Actors.Healthcheck.HealthCheckActor.Stub do
@moduledoc false

use GRPC.Stub, service: Spawn.Actors.Healthcheck.HealthCheckActor.Service
end

Expand Down
30 changes: 30 additions & 0 deletions lib/_generated/spawn/actors/protocol.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,20 @@ defmodule Spawn.Pipe do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1005,6 +1019,7 @@ defmodule Spawn.Pipe do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Forward do
Expand Down Expand Up @@ -1043,6 +1058,20 @@ defmodule Spawn.Forward do
json_name: "actionName",
proto3_optional: nil,
__unknown_fields__: []
},
%Google.Protobuf.FieldDescriptorProto{
name: "register_ref",
extendee: nil,
number: 3,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "registerRef",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
Expand All @@ -1059,6 +1088,7 @@ defmodule Spawn.Forward do

field(:actor, 1, type: :string)
field(:action_name, 2, type: :string, json_name: "actionName")
field(:register_ref, 3, type: :string, json_name: "registerRef")
end

defmodule Spawn.Fact.MetadataEntry do
Expand Down
15 changes: 13 additions & 2 deletions lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ defmodule Actors.Actor.CallerConsumer do
ActorSettings,
ActorSystem,
Registry,
ActorOpts,
TimeoutStrategy,
ProjectionSettings,
ActorDeactivationStrategy,
Expand Down Expand Up @@ -565,6 +564,16 @@ defmodule Actors.Actor.CallerConsumer do
timeout =
case metadata["request-timeout"] do
nil -> 60_000
value -> String.to_integer(value)
end

# when a invoke errors or throws an exception
# we can backoff or not
fail_backoff =
case metadata["fail_backoff"] do
nil -> false
"false" -> false
"true" -> true
value -> value
end

Expand Down Expand Up @@ -635,7 +644,9 @@ defmodule Actors.Actor.CallerConsumer do
{:halt, result}

{:error, :actor_invoke, error} ->
{:halt, {:error, error}}
keep_retrying_action = if fail_backoff, do: :cont, else: :halt

{keep_retrying_action, {:error, error}}

{:error, _msg} = result ->
{:cont, result}
Expand Down
46 changes: 31 additions & 15 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ defmodule Actors.Actor.Entity.Invocation do
name = Map.get(message.metadata, "actor-name")
source_action = Map.get(message.metadata, "actor-action")

action_metadata =
case Map.get(message.metadata, "action-metadata") do
nil -> %{}
metadata -> Jason.decode!(metadata)
end

action =
actor.settings.projection_settings.subjects
|> Enum.find(fn subject -> subject.source_action == source_action end)
Expand All @@ -82,7 +88,7 @@ defmodule Actors.Actor.Entity.Invocation do
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
metadata: action_metadata,
action_name: action,
payload: {:value, Google.Protobuf.Any.decode(message.state)},
caller: %ActorId{name: name, system: system_name, parent: parent}
Expand Down Expand Up @@ -404,7 +410,7 @@ defmodule Actors.Actor.Entity.Invocation do
response_checkpoint(response, checkpoint, revision, state)
end

defp is_authorized?(invocation, actions, timers) do
defp is_authorized?(invocation, _actions, _timers) do
acl_manager = get_acl_manager()

acl_manager.get_policies!()
Expand Down Expand Up @@ -580,7 +586,7 @@ defmodule Actors.Actor.Entity.Invocation do
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

response
end
Expand All @@ -595,12 +601,14 @@ defmodule Actors.Actor.Entity.Invocation do
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state, response)
:ok = do_handle_projection(id, request, settings, state, response)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, _state, response) do
defp do_handle_projection(id, request, %{sourceable: true} = _settings, _state, response) do
action = request.action_name

stream_name = StreamInitiator.stream_name(id)
id_name = String.replace(id.name, ".", "-")

Expand All @@ -615,19 +623,20 @@ defmodule Actors.Actor.Entity.Invocation do
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
{"Actor-Action", "#{action}"},
{"Action-Metadata", Jason.encode!(request.current_context.metadata)}
]
)
end

defp do_handle_projection(
id,
action,
request,
_settings,
%EntityState{actor: %Actor{settings: %ActorSettings{kind: :PROJECTION}}} = state,
response
) do
if :persistent_term.get("view-#{id.name}-#{action}", false) do
if :persistent_term.get("view-#{id.name}-#{request.action_name}", false) do
# no need to persist any state since this is a view only action
:ok
else
Expand All @@ -650,7 +659,7 @@ defmodule Actors.Actor.Entity.Invocation do
end
end

defp do_handle_projection(_id, _action, _settings, _state, _response), do: :ok
defp do_handle_projection(_id, _request, _settings, _state, _response), do: :ok

defp do_run_workflow(
_request,
Expand All @@ -671,7 +680,7 @@ defmodule Actors.Actor.Entity.Invocation do
opts
) do
Tracer.with_span "run-workflow" do
do_side_effects(effects, opts)
do_side_effects(request, effects, opts)
do_broadcast(request, broadcast, opts)
do_handle_routing(request, response, opts)
end
Expand All @@ -689,7 +698,8 @@ defmodule Actors.Actor.Entity.Invocation do

defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name}
actor: %ActorId{name: caller_actor_name, system: system_name},
current_context: %Context{metadata: metadata}
},
%ActorInvocationResponse{
payload: payload,
Expand All @@ -708,6 +718,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
Expand Down Expand Up @@ -737,7 +748,8 @@ defmodule Actors.Actor.Entity.Invocation do
defp do_handle_routing(
%ActorInvocation{
actor: %ActorId{name: caller_actor_name, system: system_name},
payload: payload
payload: payload,
current_context: %Context{metadata: metadata}
} = _request,
%ActorInvocationResponse{
workflow:
Expand All @@ -756,6 +768,7 @@ defmodule Actors.Actor.Entity.Invocation do
system: %ActorSystem{name: system_name},
actor: %Actor{id: %ActorId{name: actor_name, system: system_name}},
action_name: cmd,
metadata: metadata,
payload: payload,
caller: %ActorId{name: caller_actor_name, system: system_name}
}
Expand Down Expand Up @@ -810,13 +823,13 @@ defmodule Actors.Actor.Entity.Invocation do
:noreply
end

def do_side_effects(effects, opts \\ [])
def do_side_effects(request, effects, opts \\ [])

def do_side_effects(effects, _opts) when effects == [] do
def do_side_effects(_request, effects, _opts) when effects == [] do
:ok
end

def do_side_effects(effects, _opts) when is_list(effects) do
def do_side_effects(request, effects, _opts) when is_list(effects) do
Tracer.with_span "handle-side-effects" do
try do
spawn(fn ->
Expand All @@ -830,6 +843,9 @@ defmodule Actors.Actor.Entity.Invocation do
} = invocation
} ->
try do
metadata = Map.merge(request.current_context.metadata, invocation.metadata)
invocation = %InvocationRequest{invocation | metadata: metadata}

Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
Expand Down
9 changes: 0 additions & 9 deletions lib/spawn/cluster/cluster_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,12 @@ defmodule Spawn.Cluster.ClusterResolver do
service = Keyword.fetch!(config, :service)
resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a))

IO.inspect(app_name, label: "Using application name ---------------------")
IO.inspect(service, label: "Using service ---------------------")
IO.inspect(resolver, label: "Using resolver ---------------------")
IO.inspect(Node.get_cookie(), label: "Using node cookie ---------------------")

cond do
app_name != nil and service != nil ->
headless_service = to_charlist(service)

IO.inspect(headless_service, label: "Using headless service ---------------------")

case resolver.(headless_service) do
{:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} ->
IO.inspect(addresses, label: "Using addresses ---------------------")
parse_response(addresses, app_name)

{:error, reason} ->
Expand Down Expand Up @@ -135,6 +127,5 @@ defmodule Spawn.Cluster.ClusterResolver do
|> Enum.map(&:inet_parse.ntoa(&1))
|> Enum.map(&"#{app_name}@#{&1}")
|> Enum.map(&String.to_atom(&1))
|> IO.inspect(label: "Parsed addresses ---------------------")
end
end
6 changes: 6 additions & 0 deletions priv/protos/spawn/actors/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ message Pipe {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Sends the input of a action of an Actor to the input of another action of an
Expand All @@ -291,6 +294,9 @@ message Forward {

// Action.
string action_name = 2;

// Register ref
string register_ref = 3;
}

// Facts are emitted by actions and represent the internal state of the moment
Expand Down
52 changes: 21 additions & 31 deletions spawn_sdk/spawn_sdk/lib/defact.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,27 @@ defmodule SpawnSdk.Defact do
)

def handle_action({unquote(action_name), payload}, context) do
try do
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
rescue
e ->
reraise SpawnSdk.Actor.MalformedActor,
[
message: "Error in action=#{unquote(action_name)} error=#{inspect(e)}",
exception: e
],
__STACKTRACE__
case {:erlang.fun_info(unquote(block_fn), :arity), unquote(action_name)} do
{{:arity, 1}, _name} ->
unquote(block_fn).(context)

{{:arity, 2}, action} when action not in ~w(init Init Setup setup) ->
unquote(block_fn).(context, payload)

{{:arity, arity}, _} ->
raise SpawnSdk.Actor.MalformedActor,
"Invalid callback arity #{arity} needs to be in (1, 2) for action=#{unquote(action_name)}"
end
|> case do
%SpawnSdk.Value{} = value ->
value

{:reply, %SpawnSdk.Value{} = value} ->
value

_ ->
raise SpawnSdk.Actor.MalformedActor,
"Return value for action=#{unquote(action_name)} must be a %Value{} struct"
end
end
end
Expand Down
Loading
Loading