Skip to content

Commit cddb6c2

Browse files
authored
fix: avoid lost attestations and sync committee messages with early subnet subscribe (#1298)
1 parent 59eec20 commit cddb6c2

File tree

6 files changed

+149
-45
lines changed

6 files changed

+149
-45
lines changed

lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,29 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
6767
Libp2pPort.publish(topic, message)
6868
end
6969

70+
@spec subscribe(non_neg_integer()) :: :ok
71+
def subscribe(subnet_id),
72+
do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
73+
7074
@spec collect(non_neg_integer(), Types.Attestation.t()) :: :ok
7175
def collect(subnet_id, attestation) do
7276
join(subnet_id)
7377
AttSubnetInfo.new_subnet_with_attestation(subnet_id, attestation)
74-
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
78+
subscribe(subnet_id)
7579
end
7680

77-
@spec stop_collecting(non_neg_integer()) ::
78-
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
79-
def stop_collecting(subnet_id) do
81+
@spec unsubscribe(non_neg_integer()) :: :ok
82+
def unsubscribe(subnet_id) do
8083
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
8184
topic = topic(subnet_id)
8285
Libp2pPort.leave_topic(topic)
8386
Libp2pPort.join_topic(topic)
87+
end
88+
89+
@spec stop_collecting(non_neg_integer()) ::
90+
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
91+
def stop_collecting(subnet_id) do
92+
unsubscribe(subnet_id)
8493
AttSubnetInfo.stop_collecting(subnet_id)
8594
end
8695

lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,34 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
7272
Libp2pPort.publish(topic, message)
7373
end
7474

75+
@spec subscribe(non_neg_integer()) :: :ok
76+
def subscribe(subnet_id),
77+
do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
78+
7579
@spec collect([non_neg_integer()], Types.SyncCommitteeMessage.t()) :: :ok
7680
def collect(subnet_ids, message) do
7781
join(subnet_ids)
7882

7983
for subnet_id <- subnet_ids do
8084
SyncSubnetInfo.new_subnet_with_message(subnet_id, message)
81-
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
85+
subscribe(subnet_id)
8286
end
8387

8488
:ok
8589
end
8690

87-
@spec stop_collecting(non_neg_integer()) ::
88-
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
89-
def stop_collecting(subnet_id) do
91+
@spec unsubscribe(non_neg_integer()) :: :ok
92+
def unsubscribe(subnet_id) do
9093
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
9194
topic = topic(subnet_id)
9295
Libp2pPort.leave_topic(topic)
9396
Libp2pPort.join_topic(topic)
97+
end
98+
99+
@spec stop_collecting(non_neg_integer()) ::
100+
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
101+
def stop_collecting(subnet_id) do
102+
unsubscribe(subnet_id)
94103
SyncSubnetInfo.stop_collecting(subnet_id)
95104
end
96105

lib/lambda_ethereum_consensus/validator/duties.ex

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
4141
aggregation: [sync_committee_aggregator_duty()]
4242
}
4343

44+
@typedoc "Set of subnet indices for a particular slot, used in attestations and sync committees."
45+
@type subnets :: MapSet.t(subnet_index :: Types.uint64())
46+
4447
@typedoc "Useful precalculated data not tied to a particular slot/duty."
45-
@type shared_data_for_duties :: %{sync_subcommittee_participants: %{}}
48+
@type shared_data_for_duties :: %{
49+
subnets: %{
50+
attesters: %{Types.slot() => subnets},
51+
sync_committees: %{Types.slot() => subnets}
52+
},
53+
sync_subcommittee_participants: %{}
54+
}
4655

4756
@type attester_duties :: [attester_duty()]
4857
@type proposer_duties :: [proposer_duty()]
@@ -103,11 +112,20 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
103112
|> then(&{&1, compute_sync_subcommittee_participants(beacon, epoch)})
104113
end
105114

115+
new_subnets_for_attestations = compute_subnets_for_attestations(new_attesters)
116+
new_subnets_for_sync_committees = compute_subnets_for_sync_committees(new_sync_committees)
117+
106118
new_duties = %{
107119
proposers: new_proposers,
108120
attesters: new_attesters,
109121
sync_committees: new_sync_committees,
110-
shared: %{sync_subcommittee_participants: sync_subcommittee_participants}
122+
shared: %{
123+
subnets: %{
124+
attesters: new_subnets_for_attestations,
125+
sync_committees: new_subnets_for_sync_committees
126+
},
127+
sync_subcommittee_participants: sync_subcommittee_participants
128+
}
111129
}
112130

113131
log_duties_for_epoch(new_duties, epoch)
@@ -312,6 +330,25 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
312330
Map.put(duty, :subnet_id, subnet_id)
313331
end
314332

333+
defp compute_subnets_for_attestations(attester_duties) do
334+
for {slot, duties} <- attester_duties,
335+
%{should_aggregate?: true, committee_index: subnet_id} <- duties,
336+
reduce: %{} do
337+
acc ->
338+
Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id))
339+
end
340+
end
341+
342+
defp compute_subnets_for_sync_committees(sync_committee_duties) do
343+
for {slot, duties} <- sync_committee_duties,
344+
%{aggregation: agg} when agg != [] <- duties,
345+
%{subcommittee_index: subnet_id} <- agg,
346+
reduce: %{} do
347+
acc ->
348+
Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id))
349+
end
350+
end
351+
315352
############################
316353
# Accessors
317354

@@ -350,6 +387,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
350387
end
351388
end
352389

390+
@spec current_subnets(duties(), Types.epoch(), Types.slot()) ::
391+
%{attesters: subnets, sync_committees: subnets}
392+
def current_subnets(duties, epoch, slot) do
393+
%{
394+
attesters: get_in(duties, [epoch, :shared, :subnets, :attesters, slot]) || MapSet.new(),
395+
sync_committees:
396+
get_in(duties, [epoch, :shared, :subnets, :sync_committees, max(0, slot - 1)]) ||
397+
MapSet.new()
398+
}
399+
end
400+
353401
@spec sync_subcommittee_participants(duties(), Types.epoch()) :: %{
354402
non_neg_integer() => [non_neg_integer()]
355403
}
@@ -399,23 +447,41 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
399447

400448
@spec log_duties_for_epoch(duties(), Types.epoch()) :: :ok
401449
def log_duties_for_epoch(
402-
%{proposers: proposers, attesters: attesters, sync_committees: sync_committees},
450+
%{
451+
proposers: proposers,
452+
attesters: attesters,
453+
sync_committees: sync_committees,
454+
shared: shared
455+
},
403456
epoch
404457
) do
405458
Logger.info(
406459
"[Duties] Proposers for epoch #{epoch} (slot=>validator):\n #{inspect(proposers)}"
407460
)
408461

409-
for %{
410-
subnet_ids: si,
411-
validator_index: vi,
412-
aggregation: agg
413-
} <- sync_committees do
414-
Logger.info(
415-
"[Duties] Sync committee for epoch: #{epoch}, validator_index: #{vi} will broadcast on subnet_ids: #{inspect(si)}.\n Slots: #{inspect(agg |> Map.keys() |> Enum.join(", "))}"
416-
)
462+
Logger.debug(
463+
"[Duties] SyncCommittees Subnets for epoch #{epoch}: #{inspect(shared.subnets.sync_committees)}"
464+
)
465+
466+
for {slot, sync_duties} <- sync_committees,
467+
length(sync_duties) > 0 do
468+
Logger.debug("[Duties] Sync committee for epoch: #{epoch}, slot: #{slot}:")
469+
470+
for %{
471+
validator_index: vi,
472+
subnet_ids: si,
473+
aggregation: agg
474+
} <- sync_duties do
475+
Logger.debug(
476+
"[Duties] Validator: #{vi}, will broadcast in subnets: #{si} and aggregate in #{inspect(agg |> Enum.map(& &1.subcommittee_index))}."
477+
)
478+
end
417479
end
418480

481+
Logger.debug(
482+
"[Duties] Attesters Subnets for epoch #{epoch}: #{inspect(shared.subnets.attesters)}"
483+
)
484+
419485
for {slot, att_duties} <- attesters,
420486
length(att_duties) > 0 do
421487
Logger.debug("[Duties] Attesters for epoch: #{epoch}, slot #{slot}:")

lib/lambda_ethereum_consensus/validator/validator_set.ex

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
55
simplify the delegation of work.
66
"""
77

8-
defstruct slot: nil, head_root: nil, duties: %{}, validators: %{}
8+
defstruct slot: nil,
9+
head_root: nil,
10+
duties: %{},
11+
subscribed_subnets: %{attesters: MapSet.new(), sync_committees: MapSet.new()},
12+
validators: %{}
913

1014
require Logger
1115

16+
alias LambdaEthereumConsensus.P2P.Gossip.Attestation
17+
alias LambdaEthereumConsensus.P2P.Gossip.SyncCommittee
1218
alias LambdaEthereumConsensus.StateTransition
1319
alias LambdaEthereumConsensus.StateTransition.Misc
1420
alias LambdaEthereumConsensus.Store.CheckpointStates
@@ -21,6 +27,7 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
2127
slot: Types.slot(),
2228
head_root: Types.root() | nil,
2329
duties: %{Types.epoch() => Duties.duties()},
30+
subscribed_subnets: %{attesters: Duties.subnets(), sync_committees: Duties.subnets()},
2431
validators: validators()
2532
}
2633

@@ -137,7 +144,9 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
137144
end
138145

139146
defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}) do
140-
maybe_propose(set, epoch, slot, head_root)
147+
set
148+
|> maybe_resubscribe_to_subnets(epoch, slot)
149+
|> maybe_propose(epoch, slot, head_root)
141150
end
142151

143152
defp process_tick(%{head_root: head_root} = set, epoch, {slot, :second_third}) do
@@ -325,6 +334,30 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
325334
|> then(&%{set | duties: &1})
326335
end
327336

337+
##########################
338+
# Subnets
339+
340+
defp maybe_resubscribe_to_subnets(set, epoch, slot) do
341+
%{subscribed_subnets: %{attesters: old_att_subnets, sync_committees: old_sync_subnets}} = set
342+
343+
%{attesters: new_att_subnets, sync_committees: new_sync_subnets} =
344+
Duties.current_subnets(set.duties, epoch, slot)
345+
346+
unsubscribe_att = MapSet.difference(old_att_subnets, new_att_subnets)
347+
unsubscribe_sync = MapSet.difference(old_sync_subnets, new_sync_subnets)
348+
349+
Enum.each(unsubscribe_att, &Attestation.unsubscribe/1)
350+
Enum.each(unsubscribe_sync, &SyncCommittee.unsubscribe/1)
351+
352+
subscribe_att = MapSet.difference(new_att_subnets, old_att_subnets)
353+
subscribe_sync = MapSet.difference(new_sync_subnets, old_sync_subnets)
354+
355+
Enum.each(subscribe_att, &Attestation.subscribe/1)
356+
Enum.each(subscribe_sync, &SyncCommittee.subscribe/1)
357+
358+
%{set | subscribed_subnets: %{attesters: new_att_subnets, sync_committees: new_sync_subnets}}
359+
end
360+
328361
##########################
329362
# Target State
330363
# TODO: (#1278) This should be taken from the store as noted by arkenan.

lib/types/att_subnet_info.ex

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ defmodule Types.AttSubnetInfo do
4444

4545
@doc """
4646
Adds a new Attestation to the SubnetInfo if the attestation's data matches the base one.
47-
Assumes that the SubnetInfo already exists.
4847
"""
4948
@spec add_attestation!(non_neg_integer(), Types.Attestation.t()) :: :ok
50-
def add_attestation!(subnet_id, attestation) do
51-
subnet_info = fetch_subnet_info!(subnet_id)
52-
53-
if subnet_info.data == attestation.data do
49+
def add_attestation!(subnet_id, %{data: att_data} = attestation) do
50+
# TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful
51+
# messages and end up with empty aggregations due to the subnet not being created yet.
52+
with {:ok, subnet_info} <- fetch_subnet_info(subnet_id),
53+
^att_data <- subnet_info.data do
5454
new_subnet_info = %__MODULE__{
5555
subnet_info
5656
| attestations: [attestation | subnet_info.attestations]
@@ -91,12 +91,6 @@ defmodule Types.AttSubnetInfo do
9191
end
9292
end
9393

94-
@spec fetch_subnet_info!(non_neg_integer()) :: t()
95-
defp fetch_subnet_info!(subnet_id) do
96-
{:ok, subnet_info} = fetch_subnet_info(subnet_id)
97-
subnet_info
98-
end
99-
10094
@spec delete_subnet(non_neg_integer()) :: :ok
10195
defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))
10296

lib/types/sync_subnet_info.ex

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,15 @@ defmodule Types.SyncSubnetInfo do
5050

5151
@doc """
5252
Adds a new SyncCommitteeMessage to the SubnetInfo if the message's data matches the base one.
53-
Assumes that the SubnetInfo already exists.
5453
"""
5554
@spec add_message!(non_neg_integer(), Types.SyncCommitteeMessage.t()) :: :ok
56-
def add_message!(
57-
subnet_id,
58-
%Types.SyncCommitteeMessage{slot: slot, beacon_block_root: root} = message
59-
) do
60-
subnet_info = fetch_subnet_info!(subnet_id)
55+
def add_message!(subnet_id, %Types.SyncCommitteeMessage{} = message) do
56+
%{slot: slot, beacon_block_root: root} = message
6157

62-
if subnet_info.data == {slot, root} do
58+
# TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful
59+
# messages and end up with empty aggregations due to the subnet not being created yet.
60+
with {:ok, subnet_info} <- fetch_subnet_info(subnet_id),
61+
{^slot, ^root} <- subnet_info.data do
6362
new_subnet_info = %__MODULE__{
6463
subnet_info
6564
| messages: [message | subnet_info.messages]
@@ -100,12 +99,6 @@ defmodule Types.SyncSubnetInfo do
10099
end
101100
end
102101

103-
@spec fetch_subnet_info!(non_neg_integer()) :: t()
104-
defp fetch_subnet_info!(subnet_id) do
105-
{:ok, subnet_info} = fetch_subnet_info(subnet_id)
106-
subnet_info
107-
end
108-
109102
@spec delete_subnet(non_neg_integer()) :: :ok
110103
defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))
111104

0 commit comments

Comments
 (0)