Skip to content

Commit 86f4df2

Browse files
authored
fix: duplicate and old attestations aggregates (#1303)
1 parent 6156307 commit 86f4df2

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
6969
get_operation(:voluntary_exit, count)
7070
end
7171

72-
@spec get_attestations(non_neg_integer()) :: list(Types.Attestation.t())
73-
def get_attestations(count) do
74-
get_operation(:attestation, count)
72+
@spec get_attestations(non_neg_integer(), Types.slot()) :: list(Types.Attestation.t())
73+
def get_attestations(count, slot) do
74+
slot = slot || fetch_slot!()
75+
get_operation(:attestation, count, &ignore?(&1, slot))
7576
end
7677

7778
@spec get_sync_committee_contributions() :: list(Types.SignedContributionAndProof.t())
@@ -141,16 +142,22 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
141142
defp get_operation(operation, count) when operation in @operations do
142143
# NOTE: we don't remove these from the db, since after a block is built
143144
# :new_block will be called, and already added messages will be removed
145+
operation
146+
|> fetch_operation!()
147+
|> cap_operations(count)
148+
end
144149

145-
slot = fetch_slot!()
146-
147-
operations = fetch_operation!(operation)
148-
149-
if count == :all,
150-
do: operations |> Enum.reject(&ignore?(&1, slot)),
151-
else: operations |> Stream.reject(&ignore?(&1, slot)) |> Enum.take(count)
150+
defp get_operation(operation, count, filter) when operation in @operations do
151+
operation
152+
|> fetch_operation!()
153+
|> Stream.reject(filter)
154+
|> cap_operations(count)
152155
end
153156

157+
defp cap_operations(%Stream{} = operations, :all), do: Enum.to_list(operations)
158+
defp cap_operations(operations, :all), do: operations
159+
defp cap_operations(operations, count), do: Enum.take(operations, count)
160+
154161
@impl true
155162
def handle_gossip_message(store, topic, msg_id, message) do
156163
handle_gossip_message(topic, msg_id, message)
@@ -166,14 +173,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
166173
{:ok,
167174
%Types.SignedAggregateAndProof{message: %Types.AggregateAndProof{aggregate: aggregate}}} <-
168175
Ssz.from_ssz(uncompressed, Types.SignedAggregateAndProof) do
169-
votes = BitField.count(aggregate.aggregation_bits)
170-
slot = aggregate.data.slot
171-
root = aggregate.data.beacon_block_root |> Base.encode16()
172-
173176
Logger.debug(
174-
"[Gossip] Aggregate decoded. Total attestations: #{votes}",
175-
slot: slot,
176-
root: root
177+
"[Gossip] Aggregate decoded. Total attestations: #{BitField.count(aggregate.aggregation_bits)}",
178+
slot: aggregate.data.slot,
179+
root: aggregate.data.beacon_block_root
177180
)
178181

179182
# We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment.
@@ -271,11 +274,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
271274
defp ignore?(%Types.Attestation{}, nil), do: false
272275

273276
defp ignore?(%Types.Attestation{data: data}, slot) do
277+
# Right now this preset is 1, so we add every attestation ASAP, but it could be changed in the future
274278
data.slot + ChainSpec.get("MIN_ATTESTATION_INCLUSION_DELAY") > slot
275279
end
276280

277-
defp ignore?(_, _), do: false
278-
279281
defp update_operation(operation, f) when is_function(f) do
280282
fetch_operation!(operation)
281283
|> f.()

lib/lambda_ethereum_consensus/validator/block_builder.ex

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
4242
{:ok, eth1_vote} <- fetch_eth1_data(request.slot, mid_state),
4343
{:ok, block_request} <-
4444
request
45-
|> Map.merge(fetch_operations_for_block())
45+
|> Map.merge(fetch_operations_for_block(request.slot))
4646
|> Map.put_new_lazy(:deposits, fn -> fetch_deposits(mid_state, eth1_vote) end)
4747
|> Map.put(:blob_kzg_commitments, blobs_bundle.commitments)
4848
|> BuildBlockRequest.validate(pre_state),
@@ -84,7 +84,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
8484
graffiti: block_request.graffiti_message,
8585
proposer_slashings: block_request.proposer_slashings,
8686
attester_slashings: block_request.attester_slashings,
87-
attestations: block_request.attestations,
87+
attestations: select_best_aggregates(block_request.attestations),
8888
deposits: block_request.deposits,
8989
voluntary_exits: block_request.voluntary_exits,
9090
bls_to_execution_changes: block_request.bls_to_execution_changes,
@@ -152,21 +152,22 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
152152
end
153153
end
154154

155-
@spec fetch_operations_for_block() :: %{
155+
@spec fetch_operations_for_block(Types.slot()) :: %{
156156
proposer_slashings: [Types.ProposerSlashing.t()],
157157
attester_slashings: [Types.AttesterSlashing.t()],
158158
attestations: [Types.Attestation.t()],
159159
sync_committee_contributions: [Types.SyncCommitteeContribution.t()],
160160
voluntary_exits: [Types.VoluntaryExit.t()],
161161
bls_to_execution_changes: [Types.SignedBLSToExecutionChange.t()]
162162
}
163-
defp fetch_operations_for_block() do
163+
defp fetch_operations_for_block(slot) do
164164
%{
165165
proposer_slashings:
166166
ChainSpec.get("MAX_PROPOSER_SLASHINGS") |> OperationsCollector.get_proposer_slashings(),
167167
attester_slashings:
168168
ChainSpec.get("MAX_ATTESTER_SLASHINGS") |> OperationsCollector.get_attester_slashings(),
169-
attestations: ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(),
169+
attestations:
170+
ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(slot),
170171
sync_committee_contributions: OperationsCollector.get_sync_committee_contributions(),
171172
voluntary_exits:
172173
ChainSpec.get("MAX_VOLUNTARY_EXITS") |> OperationsCollector.get_voluntary_exits(),
@@ -209,6 +210,17 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
209210
signature
210211
end
211212

213+
defp select_best_aggregates(attestations) do
214+
attestations
215+
|> Enum.group_by(& &1.data.index)
216+
|> Enum.map(fn {_, attestations} ->
217+
Enum.max_by(
218+
attestations,
219+
&(&1.aggregation_bits |> BitVector.count())
220+
)
221+
end)
222+
end
223+
212224
defp get_sync_aggregate(contributions, slot, parent_root) do
213225
# We group by the contributions by subcommittee index, get only the ones related to the previous slot
214226
# and pick the one with the most amount of set bits in the aggregation bits.

0 commit comments

Comments
 (0)