Skip to content

Commit d42fbca

Browse files
committed
Internal: store the full TX context in confirmations (#265)
We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format. Renamed some fields of the `TxContext` class in order to use the same format in all DB collections and to avoid a breaking change in the messages confirmation format.
1 parent c601321 commit d42fbca

13 files changed

+152
-100
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
This migration retrieves additional metadata regarding chain confirmation of messages,
3+
including the block timestamp. We reset the TX height of the node to reprocess
4+
all the chain data messages and insert additional values
5+
"""
6+
7+
8+
import logging
9+
import os
10+
from configmanager import Config
11+
from aleph.model.chains import Chain
12+
from aleph.model.pending import PendingMessage, PendingTX
13+
from aleph.model.messages import Message
14+
15+
logger = logging.getLogger(os.path.basename(__file__))
16+
17+
18+
async def upgrade(config: Config, **kwargs):
19+
logger.info("Resetting chain height to re-fetch all chaindata...")
20+
start_height = config.ethereum.start_height.value
21+
await Chain.set_last_height("ETH", start_height)
22+
23+
logger.info("Dropping all pending transactions...")
24+
await PendingTX.collection.delete_many({})
25+
26+
logger.info(
27+
"Dropping all pending confirmation messages "
28+
"(they will be reinserted automatically)..."
29+
)
30+
await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}})
31+
32+
logger.info("Removing confirmation data for all messages...")
33+
# Confirmations will be automatically added again by the pending TX processor.
34+
# By removing the confirmation entirely, we make sure to avoid intermediate states
35+
# if a message was confirmed in an unexpected way.
36+
await Message.collection.update_many(
37+
{"confirmed": True},
38+
{
39+
"$set": {
40+
"confirmed": False,
41+
},
42+
"$unset": {"confirmations": 1},
43+
},
44+
)
45+
46+
47+
async def downgrade(**kwargs):
48+
raise NotImplementedError("Downgrading this migration is not supported.")

src/aleph/chains/common.py

+37-54
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import asyncio
22
import json
33
import logging
4-
from dataclasses import asdict
54
from enum import IntEnum
65
from typing import Dict, Optional, Tuple, List
76

8-
from aleph_message.models import MessageConfirmation
97
from bson import ObjectId
108
from pydantic import ValidationError
119
from pymongo import UpdateOne
@@ -26,18 +24,17 @@
2624
from aleph.model.pending import PendingMessage, PendingTX
2725
from aleph.network import verify_signature
2826
from aleph.permissions import check_sender_authorization
29-
from aleph.storage import get_json, pin_hash, add_json, get_message_content
30-
from .tx_context import TxContext
31-
from aleph.schemas.pending_messages import (
32-
BasePendingMessage,
33-
)
27+
from aleph.schemas.pending_messages import BasePendingMessage
3428
from aleph.schemas.validated_message import (
3529
validate_pending_message,
3630
ValidatedStoreMessage,
3731
ValidatedForgetMessage,
3832
make_confirmation_update_query,
39-
make_message_upsert_query,
33+
make_message_upsert_query,
4034
)
35+
from ..schemas.message_confirmation import MessageConfirmation
36+
from aleph.storage import get_json, pin_hash, add_json, get_message_content
37+
from .tx_context import TxContext
4138

4239
LOGGER = logging.getLogger("chains.common")
4340

@@ -65,21 +62,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
6562

6663
async def delayed_incoming(
6764
message: BasePendingMessage,
68-
chain_name: Optional[str] = None,
69-
tx_hash: Optional[str] = None,
70-
height: Optional[int] = None,
65+
tx_context: Optional[TxContext] = None,
66+
check_message: bool = True,
7167
):
7268
if message is None:
7369
return
70+
7471
await PendingMessage.collection.insert_one(
7572
{
7673
"message": message.dict(exclude={"content"}),
77-
"source": dict(
78-
chain_name=chain_name,
79-
tx_hash=tx_hash,
80-
height=height,
81-
check_message=True, # should we store this?
82-
),
74+
"tx_context": tx_context.dict() if tx_context else None,
75+
"check_message": check_message,
8376
}
8477
)
8578

@@ -92,27 +85,15 @@ class IncomingStatus(IntEnum):
9285

9386
async def mark_message_for_retry(
9487
message: BasePendingMessage,
95-
chain_name: Optional[str],
96-
tx_hash: Optional[str],
97-
height: Optional[int],
88+
tx_context: Optional[TxContext],
9889
check_message: bool,
9990
retrying: bool,
10091
existing_id,
10192
):
10293
message_dict = message.dict(exclude={"content"})
10394

10495
if not retrying:
105-
await PendingMessage.collection.insert_one(
106-
{
107-
"message": message_dict,
108-
"source": dict(
109-
chain_name=chain_name,
110-
tx_hash=tx_hash,
111-
height=height,
112-
check_message=check_message, # should we store this?
113-
),
114-
}
115-
)
96+
await delayed_incoming(message, tx_context, check_message)
11697
else:
11798
LOGGER.debug(f"Incrementing for {existing_id}")
11899
result = await PendingMessage.collection.update_one(
@@ -123,9 +104,7 @@ async def mark_message_for_retry(
123104

124105
async def incoming(
125106
pending_message: BasePendingMessage,
126-
chain_name: Optional[str] = None,
127-
tx_hash: Optional[str] = None,
128-
height: Optional[int] = None,
107+
tx_context: Optional[TxContext] = None,
129108
seen_ids: Optional[Dict[Tuple, int]] = None,
130109
check_message: bool = True,
131110
retrying: bool = False,
@@ -140,16 +119,23 @@ async def incoming(
140119
item_hash = pending_message.item_hash
141120
sender = pending_message.sender
142121
confirmations = []
122+
chain_name = tx_context.chain if tx_context is not None else None
143123
ids_key = (item_hash, sender, chain_name)
144124

145-
if chain_name and tx_hash and height:
125+
if tx_context:
146126
if seen_ids is not None:
147127
if ids_key in seen_ids.keys():
148-
if height > seen_ids[ids_key]:
128+
if tx_context.height > seen_ids[ids_key]:
149129
return IncomingStatus.MESSAGE_HANDLED, []
150130

151131
confirmations.append(
152-
MessageConfirmation(chain=chain_name, hash=tx_hash, height=height)
132+
MessageConfirmation(
133+
chain=tx_context.chain,
134+
hash=tx_context.hash,
135+
height=tx_context.height,
136+
time=tx_context.time,
137+
publisher=tx_context.publisher,
138+
)
153139
)
154140

155141
filters = {
@@ -179,14 +165,14 @@ async def incoming(
179165
updates: Dict[str, Dict] = {}
180166

181167
if existing:
182-
if seen_ids is not None and height is not None:
168+
if seen_ids is not None and tx_context is not None:
183169
if ids_key in seen_ids.keys():
184-
if height > seen_ids[ids_key]:
170+
if tx_context.height > seen_ids[ids_key]:
185171
return IncomingStatus.MESSAGE_HANDLED, []
186172
else:
187-
seen_ids[ids_key] = height
173+
seen_ids[ids_key] = tx_context.height
188174
else:
189-
seen_ids[ids_key] = height
175+
seen_ids[ids_key] = tx_context.height
190176

191177
LOGGER.debug("Updating %s." % item_hash)
192178

@@ -206,9 +192,7 @@ async def incoming(
206192
LOGGER.exception("Can't get content of object %r" % item_hash)
207193
await mark_message_for_retry(
208194
message=pending_message,
209-
chain_name=chain_name,
210-
tx_hash=tx_hash,
211-
height=height,
195+
tx_context=tx_context,
212196
check_message=check_message,
213197
retrying=retrying,
214198
existing_id=existing_id,
@@ -217,13 +201,14 @@ async def incoming(
217201

218202
try:
219203
validated_message = validate_pending_message(
220-
pending_message=pending_message, content=content, confirmations=confirmations
204+
pending_message=pending_message,
205+
content=content,
206+
confirmations=confirmations,
221207
)
222208
except ValidationError as e:
223209
LOGGER.warning("Invalid pending message: %s - %s", item_hash, str(e))
224210
return IncomingStatus.FAILED_PERMANENTLY, []
225211

226-
227212
# warning: those handlers can modify message and content in place
228213
# and return a status. None has to be retried, -1 is discarded, True is
229214
# handled and kept.
@@ -250,9 +235,7 @@ async def incoming(
250235
LOGGER.debug("Message type handler has failed, retrying later.")
251236
await mark_message_for_retry(
252237
message=pending_message,
253-
chain_name=chain_name,
254-
tx_hash=tx_hash,
255-
height=height,
238+
tx_context=tx_context,
256239
check_message=check_message,
257240
retrying=retrying,
258241
existing_id=existing_id,
@@ -270,14 +253,14 @@ async def incoming(
270253
LOGGER.warning("Invalid sender for %s" % item_hash)
271254
return IncomingStatus.MESSAGE_HANDLED, []
272255

273-
if seen_ids is not None and height is not None:
256+
if seen_ids is not None and tx_context is not None:
274257
if ids_key in seen_ids.keys():
275-
if height > seen_ids[ids_key]:
258+
if tx_context.height > seen_ids[ids_key]:
276259
return IncomingStatus.MESSAGE_HANDLED, []
277260
else:
278-
seen_ids[ids_key] = height
261+
seen_ids[ids_key] = tx_context.height
279262
else:
280-
seen_ids[ids_key] = height
263+
seen_ids[ids_key] = tx_context.height
281264

282265
LOGGER.debug("New message to store for %s." % item_hash)
283266

@@ -392,5 +375,5 @@ async def incoming_chaindata(content: Dict, context: TxContext):
392375
For now we only add it to the database, it will be processed later.
393376
"""
394377
await PendingTX.collection.insert_one(
395-
{"content": content, "context": asdict(context)}
378+
{"content": content, "context": context.dict()}
396379
)

src/aleph/chains/ethereum.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ async def request_transactions(
198198
try:
199199
jdata = json.loads(message)
200200
context = TxContext(
201-
chain_name=CHAIN_NAME,
202-
tx_hash=event_data.transactionHash.hex(),
201+
chain=CHAIN_NAME,
202+
hash=event_data.transactionHash.hex(),
203203
time=timestamp,
204204
height=event_data.blockNumber,
205205
publisher=publisher,

src/aleph/chains/nuls2.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T
175175
jdata = json.loads(ddata)
176176

177177
context = TxContext(
178-
chain_name=CHAIN_NAME,
179-
tx_hash=tx["hash"],
178+
chain=CHAIN_NAME,
179+
hash=tx["hash"],
180180
height=tx["height"],
181181
time=tx["createTime"],
182182
publisher=tx["coinFroms"][0]["address"],

src/aleph/chains/tx_context.py

+5-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from dataclasses import dataclass
1+
from aleph.schemas.message_confirmation import MessageConfirmation
22

33

4-
@dataclass
5-
class TxContext:
6-
chain_name: str
7-
tx_hash: str
8-
height: int
9-
# Transaction timestamp, in Unix time (number of seconds since epoch).
10-
time: int
11-
publisher: str
4+
# At the moment, confirmation = chain transaction. This might change, but in the meantime
5+
# having TxContext inherit MessageConfirmation avoids code duplication.
6+
class TxContext(MessageConfirmation):
7+
pass

src/aleph/jobs/process_pending_messages.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
from setproctitle import setproctitle
1515

1616
from aleph.chains.common import incoming, IncomingStatus
17+
from aleph.exceptions import InvalidMessageError
1718
from aleph.logging import setup_logging
1819
from aleph.model.db_bulk_operation import DbBulkOperation
1920
from aleph.model.pending import PendingMessage
21+
from aleph.schemas.pending_messages import parse_message
2022
from aleph.services.p2p import singleton
2123
from .job_utils import prepare_loop, process_job_results
22-
from ..exceptions import InvalidMessageError
23-
from ..schemas.pending_messages import parse_message
24+
from ..chains.tx_context import TxContext
2425

2526
LOGGER = getLogger("jobs.pending_messages")
2627

@@ -60,12 +61,13 @@ async def handle_pending_message(
6061
# If an invalid message somehow ended in pending messages, drop it.
6162
return [delete_pending_message_op]
6263

64+
tx_context_dict = pending.get("tx_context")
65+
tx_context = TxContext.parse_obj(tx_context_dict) if tx_context_dict else None
66+
6367
async with sem:
6468
status, operations = await incoming(
6569
pending_message=message,
66-
chain_name=pending["source"].get("chain_name"),
67-
tx_hash=pending["source"].get("tx_hash"),
68-
height=pending["source"].get("height"),
70+
tx_context=tx_context,
6971
seen_ids=seen_ids,
7072
check_message=pending["source"].get("check_message", True),
7173
retrying=True,

src/aleph/jobs/process_pending_txs.py

+5-8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
import asyncio
66
import logging
7-
from typing import List, Dict, Optional, Set
7+
from typing import List, Dict, Optional
8+
from typing import Set
89

910
import sentry_sdk
1011
from configmanager import Config
@@ -31,7 +32,7 @@ async def handle_pending_tx(
3132

3233
db_operations: List[DbBulkOperation] = []
3334
tx_context = TxContext(**pending_tx["context"])
34-
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)
35+
LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height)
3536

3637
messages = await get_chaindata_messages(
3738
pending_tx["content"], tx_context, seen_ids=seen_ids
@@ -55,12 +56,8 @@ async def handle_pending_tx(
5556
operation=InsertOne(
5657
{
5758
"message": message.dict(exclude={"content"}),
58-
"source": dict(
59-
chain_name=tx_context.chain_name,
60-
tx_hash=tx_context.tx_hash,
61-
height=tx_context.height,
62-
check_message=True, # should we store this?
63-
),
59+
"tx_context": tx_context.dict(),
60+
"check_message": True,
6461
}
6562
),
6663
)
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from aleph_message.models import Chain
2+
from pydantic import BaseModel, Field
3+
4+
5+
class MessageConfirmation(BaseModel):
6+
chain: Chain = Field(..., description="Chain from which the confirmation was fetched.")
7+
height: int = Field(..., description="Block in which the confirmation was published.")
8+
hash: str = Field(
9+
...,
10+
description="Hash of the transaction/block in which the confirmation was published.",
11+
)
12+
time: float = Field(
13+
...,
14+
description="Transaction timestamp, in Unix time (number of seconds since epoch).",
15+
)
16+
publisher: str = Field(..., description="Publisher of the confirmation on chain.")

0 commit comments

Comments
 (0)