Skip to content

Commit caa9687

Browse files
committed
Internal: improve chain data service (#488)
Problem: a few todos are still open on the chain data service. Solution: improve and rename the `get_chaindata` method. The new `prepare_sync_event_payload` method now always returns an off-chain sync event (on-chain sync events never occur anyway). It also returns the result as a Pydantic model for more flexibility. Serialization is left upon the caller. We now use a Pydantic model to translate `MessageDb` objects in the correct format.
1 parent 9249b69 commit caa9687

12 files changed

+168
-79
lines changed

src/aleph/chains/bsc.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from aleph_message.models import Chain
22
from configmanager import Config
33

4-
from aleph.chains.chaindata import ChainDataService
4+
from aleph.chains.chain_data_service import ChainDataService
55
from aleph.chains.abc import ChainReader
66
from aleph.chains.indexer_reader import AlephIndexerReader
77
from aleph.types.chain_sync import ChainEventType

src/aleph/chains/chaindata.py renamed to src/aleph/chains/chain_data_service.py

+37-56
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
import json
2+
from io import StringIO
33
from typing import Dict, Optional, List, Any, Mapping, Set, cast, Type, Union
44

55
from aleph_message.models import StoreContent, ItemType, Chain, MessageType
@@ -17,6 +17,12 @@
1717
ContentCurrentlyUnavailable,
1818
)
1919
from aleph.schemas.chains.indexer_response import MessageEvent, GenericMessageEvent
20+
from aleph.schemas.chains.sync_events import (
21+
OffChainSyncEventPayload,
22+
OnChainSyncEventPayload,
23+
OnChainContent,
24+
OnChainMessage,
25+
)
2026
from aleph.schemas.chains.tezos_indexer_response import (
2127
MessageEventPayload as TezosMessageEventPayload,
2228
)
@@ -27,18 +33,6 @@
2733
from aleph.types.files import FileType
2834
from aleph.utils import get_sha256
2935

30-
INCOMING_MESSAGE_AUTHORIZED_FIELDS = [
31-
"item_hash",
32-
"item_content",
33-
"item_type",
34-
"chain",
35-
"channel",
36-
"sender",
37-
"type",
38-
"time",
39-
"signature",
40-
]
41-
4236

4337
class ChainDataService:
4438
def __init__(
@@ -47,52 +41,39 @@ def __init__(
4741
self.session_factory = session_factory
4842
self.storage_service = storage_service
4943

50-
# TODO: split this function in severa
51-
async def get_chaindata(
52-
self, session: DbSession, messages: List[MessageDb], bulk_threshold: int = 2000
53-
):
54-
"""Returns content ready to be broadcast on-chain (aka chaindata).
44+
async def prepare_sync_event_payload(
45+
self, session: DbSession, messages: List[MessageDb]
46+
) -> OffChainSyncEventPayload:
47+
"""
48+
Returns the payload of a sync event to be published on chain.
49+
50+
We publish message archives on-chain at regular intervals. This function prepares the data
51+
before the node emits a transaction on-chain:
52+
1. Pack all messages as a JSON file
53+
2. Add this file to IPFS and get its CID
54+
3. Return the CID + some metadata.
5555
56-
If message length is over bulk_threshold (default 2000 chars), store list
57-
in IPFS and store the object hash instead of raw list.
56+
Note that the archive file is pinned on IPFS but not inserted in the `file_pins` table
57+
here. This is left upon the caller once the event is successfully emitted on chain to avoid
58+
persisting unused archives.
5859
"""
60+
# In previous versions, it was envisioned to store messages on-chain. This proved to be
61+
# too expensive. The archive uses the same format as these "on-chain" data.
62+
archive = OnChainSyncEventPayload(
63+
protocol=ChainSyncProtocol.ON_CHAIN_SYNC,
64+
version=1,
65+
content=OnChainContent(
66+
messages=[OnChainMessage.from_orm(message) for message in messages]
67+
),
68+
)
69+
archive_content = archive.json()
5970

60-
# TODO: this function is used to guarantee that the chain sync protocol is not broken
61-
# while shifting to Postgres.
62-
# * exclude the useless fields in the DB query directly and get rid of
63-
# INCOMING_MESSAGE_AUTHORIZED_FIELDS
64-
# * use a Pydantic model to enforce the output format
65-
def message_to_dict(_message: MessageDb) -> Mapping[str, Any]:
66-
message_dict = {
67-
k: v
68-
for k, v in _message.to_dict().items()
69-
if k in INCOMING_MESSAGE_AUTHORIZED_FIELDS
70-
}
71-
# Convert the time field to epoch
72-
message_dict["time"] = message_dict["time"].timestamp()
73-
return message_dict
74-
75-
message_dicts = [message_to_dict(message) for message in messages]
76-
77-
chaindata = {
78-
"protocol": ChainSyncProtocol.ON_CHAIN_SYNC,
79-
"version": 1,
80-
"content": {"messages": message_dicts},
81-
}
82-
content = json.dumps(chaindata)
83-
if len(content) > bulk_threshold:
84-
ipfs_id = await self.storage_service.add_json(
85-
session=session, value=chaindata
86-
)
87-
return json.dumps(
88-
{
89-
"protocol": ChainSyncProtocol.OFF_CHAIN_SYNC,
90-
"version": 1,
91-
"content": ipfs_id,
92-
}
93-
)
94-
else:
95-
return content
71+
ipfs_cid = await self.storage_service.add_file(
72+
session=session, fileobject=StringIO(archive_content), engine=ItemType.ipfs
73+
)
74+
return OffChainSyncEventPayload(
75+
protocol=ChainSyncProtocol.OFF_CHAIN_SYNC, version=1, content=ipfs_cid
76+
)
9677

9778
@staticmethod
9879
def _get_sync_messages(tx_content: Mapping[str, Any]):

src/aleph/chains/connector.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from aleph.storage import StorageService
99
from aleph.types.db_session import DbSessionFactory
1010
from .bsc import BscConnector
11-
from .chaindata import ChainDataService
11+
from .chain_data_service import ChainDataService
1212
from .abc import ChainReader, ChainWriter
1313
from .ethereum import EthereumConnector
1414
from .nuls2 import Nuls2Connector

src/aleph/chains/ethereum.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from aleph.toolkit.timestamp import utc_now
2828
from aleph.types.db_session import DbSessionFactory
2929
from aleph.utils import run_in_executor
30-
from .chaindata import ChainDataService
30+
from .chain_data_service import ChainDataService
3131
from .abc import ChainWriter, Verifier, ChainReader
3232
from .indexer_reader import AlephIndexerReader
3333
from ..db.models import ChainTxDb
@@ -346,8 +346,8 @@ async def packer(self, config: Config):
346346
LOGGER.info("Chain sync: %d unconfirmed messages")
347347

348348
# This function prepares a chain data file and makes it downloadable from the node.
349-
content = await self.chain_data_service.get_chaindata(
350-
session=session, messages=messages, bulk_threshold=200
349+
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
350+
session=session, messages=messages
351351
)
352352
# Required to apply update to the files table in get_chaindata
353353
session.commit()
@@ -360,7 +360,7 @@ async def packer(self, config: Config):
360360
account,
361361
int(gas_price * 1.1),
362362
nonce,
363-
content,
363+
sync_event_payload.json(),
364364
)
365365
LOGGER.info("Broadcast %r on %s" % (response, CHAIN_NAME))
366366

src/aleph/chains/indexer_reader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from pydantic import BaseModel
2222

2323
import aleph.toolkit.json as aleph_json
24-
from aleph.chains.chaindata import ChainDataService
24+
from aleph.chains.chain_data_service import ChainDataService
2525
from aleph.db.accessors.chains import (
2626
get_missing_indexer_datetime_multirange,
2727
add_indexer_range,

src/aleph/chains/nuls2.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from aleph.toolkit.timestamp import utc_now
3131
from aleph.types.db_session import DbSessionFactory
3232
from aleph.utils import run_in_executor
33-
from .chaindata import ChainDataService
33+
from .chain_data_service import ChainDataService
3434
from .abc import Verifier, ChainWriter
3535
from aleph.schemas.chains.tx_context import TxContext
3636
from ..db.models import ChainTxDb
@@ -197,12 +197,13 @@ async def packer(self, config: Config):
197197

198198
if len(messages):
199199
# This function prepares a chain data file and makes it downloadable from the node.
200-
content = await self.chain_data_service.get_chaindata(
200+
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
201201
session=session, messages=messages
202202
)
203203
# Required to apply update to the files table in get_chaindata
204204
session.commit()
205205

206+
content = sync_event_payload.json()
206207
tx = await prepare_transfer_tx(
207208
address,
208209
[(target_addr, CHEAP_UNIT_FEE)],

src/aleph/chains/tezos.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from nacl.exceptions import BadSignatureError
1212

1313
import aleph.toolkit.json as aleph_json
14-
from aleph.chains.chaindata import ChainDataService
14+
from aleph.chains.chain_data_service import ChainDataService
1515
from aleph.chains.common import get_verification_buffer
1616
from aleph.chains.abc import Verifier, ChainReader
1717
from aleph.db.accessors.chains import get_last_height, upsert_chain_sync_status

src/aleph/jobs/process_pending_txs.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from sqlalchemy import delete
1212

1313
from ..chains.signature_verifier import SignatureVerifier
14-
from aleph.chains.chaindata import ChainDataService
14+
from aleph.chains.chain_data_service import ChainDataService
1515
from aleph.db.accessors.pending_txs import get_pending_txs
1616
from aleph.db.connection import make_engine, make_session_factory
1717
from aleph.db.models.pending_txs import PendingTxDb
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from typing import Literal, Optional, List, Union, Annotated
2+
3+
from aleph_message.models import ItemHash, ItemType, Chain, MessageType
4+
from pydantic import BaseModel, Field, validator
5+
6+
from aleph.types.chain_sync import ChainSyncProtocol
7+
from aleph.types.channel import Channel
8+
import datetime as dt
9+
10+
11+
class OnChainMessage(BaseModel):
12+
class Config:
13+
orm_mode = True
14+
15+
sender: str
16+
chain: Chain
17+
signature: Optional[str]
18+
type: MessageType
19+
item_content: Optional[str]
20+
item_type: ItemType
21+
item_hash: ItemHash
22+
time: float
23+
channel: Optional[Channel] = None
24+
25+
@validator("time", pre=True)
26+
def check_time(cls, v, values):
27+
if isinstance(v, dt.datetime):
28+
return v.timestamp()
29+
30+
return v
31+
32+
33+
class OnChainContent(BaseModel):
34+
messages: List[OnChainMessage]
35+
36+
37+
class OnChainSyncEventPayload(BaseModel):
38+
protocol: Literal[ChainSyncProtocol.ON_CHAIN_SYNC]
39+
version: int
40+
content: OnChainContent
41+
42+
43+
class OffChainSyncEventPayload(BaseModel):
44+
protocol: Literal[ChainSyncProtocol.OFF_CHAIN_SYNC]
45+
version: int
46+
content: str
47+
48+
49+
SyncEventPayload = Annotated[
50+
Union[OnChainSyncEventPayload, OffChainSyncEventPayload],
51+
Field(discriminator="protocol"),
52+
]

src/aleph/storage.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import logging
66
from hashlib import sha256
77
from typing import Any, IO, Optional, cast, Final
8-
from aiohttp import web
98

109
from aleph_message.models import ItemType
1110

@@ -20,13 +19,9 @@
2019
from aleph.services.ipfs.common import get_cid_version
2120
from aleph.services.p2p.http import request_hash as p2p_http_request_hash
2221
from aleph.services.storage.engine import StorageEngine
23-
from aleph.toolkit.constants import MiB
2422
from aleph.types.db_session import DbSession
2523
from aleph.types.files import FileType
2624
from aleph.utils import get_sha256
27-
from aleph.schemas.pending_messages import (
28-
parse_message,
29-
)
3025

3126
LOGGER = logging.getLogger(__name__)
3227

@@ -144,7 +139,9 @@ async def _verify_content_hash(
144139
) -> None:
145140
"""
146141
Checks that the hash of a content we fetched from the network matches the expected hash.
147-
:return: True if the hashes match, False otherwise.
142+
Raises an exception if the content does not match the expected hash.
143+
:raises InvalidContent: The computed hash does not match.
144+
:raises ContentCurrentlyUnavailable: The hash cannot be computed at this time.
148145
"""
149146
config = get_config()
150147
ipfs_enabled = config.ipfs.enabled.value

tests/chains/test_chain_data_service.py

+63-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,71 @@
1-
import pytest
2-
from aleph_message.models import Chain, StoreContent, MessageType, ItemType, PostContent
1+
from typing import IO
32

4-
from aleph.chains.chaindata import ChainDataService
5-
from aleph.db.models import ChainTxDb
3+
import pytest
4+
from aleph_message.models import (
5+
Chain,
6+
StoreContent,
7+
MessageType,
8+
ItemType,
9+
PostContent,
10+
ItemHash,
11+
)
12+
13+
from aleph.chains.chain_data_service import ChainDataService
14+
from aleph.db.models import ChainTxDb, MessageDb
15+
from aleph.schemas.chains.sync_events import OnChainSyncEventPayload
616
from aleph.schemas.chains.tezos_indexer_response import MessageEventPayload
717
from aleph.schemas.pending_messages import parse_message
818
from aleph.toolkit.timestamp import timestamp_to_datetime
919
from aleph.types.chain_sync import ChainSyncProtocol
10-
from aleph.types.db_session import DbSessionFactory
20+
from aleph.types.db_session import DbSessionFactory, DbSession
21+
import datetime as dt
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_prepare_sync_event_payload(mocker):
26+
archive_cid = "Qmsomething"
27+
28+
messages = [
29+
MessageDb(
30+
item_hash=ItemHash(
31+
"abe22332402a5c45f20491b719b091fd0d7eab65ca1bcf4746840b787dee874b"
32+
),
33+
type=MessageType.store,
34+
chain=Chain.ETH,
35+
sender="0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B",
36+
signature="0x813f0be4ddd852e7f0c723ac95333be762d80690fe0fc0705ec0e1b7df7fa92d5cdbfba8ab0321aee8769c93f7bc5dc9d1268cb66e8cb453a6b8299ba3faac771b",
37+
item_type=ItemType.inline,
38+
item_content='{"address":"0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B","time":1697718147.2695966,"item_type":"storage","item_hash":"ecbfcb9f92291b9385772c9b5cd094788f928ccb696ad1ecbf179a4e308e4350","mime_type":"application/octet-stream"}',
39+
time=dt.datetime(2023, 10, 19, 12, 22, 27, 269707, tzinfo=dt.timezone.utc),
40+
channel="TEST",
41+
)
42+
]
43+
44+
async def mock_add_file(
45+
session: DbSession, fileobject: IO, engine: ItemType = ItemType.ipfs
46+
) -> str:
47+
content = fileobject.read()
48+
archive = OnChainSyncEventPayload.parse_raw(content)
49+
50+
assert archive.version == 1
51+
assert len(archive.content.messages) == len(messages)
52+
# Check that the time field was converted
53+
assert archive.content.messages[0].time == messages[0].time.timestamp()
54+
55+
return archive_cid
56+
57+
storage_service = mocker.AsyncMock()
58+
storage_service.add_file = mock_add_file
59+
chain_data_service = ChainDataService(
60+
session_factory=mocker.MagicMock(), storage_service=storage_service
61+
)
62+
63+
sync_event_payload = await chain_data_service.prepare_sync_event_payload(
64+
session=mocker.MagicMock(), messages=messages
65+
)
66+
assert sync_event_payload.protocol == ChainSyncProtocol.OFF_CHAIN_SYNC
67+
assert sync_event_payload.version == 1
68+
assert sync_event_payload.content == archive_cid
1169

1270

1371
@pytest.mark.asyncio

tests/message_processing/test_process_pending_txs.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sqlalchemy import select
99

1010
from aleph.chains.signature_verifier import SignatureVerifier
11-
from aleph.chains.chaindata import ChainDataService
11+
from aleph.chains.chain_data_service import ChainDataService
1212
from aleph.db.models import PendingMessageDb, MessageStatusDb
1313
from aleph.db.models.chains import ChainTxDb
1414
from aleph.db.models.pending_txs import PendingTxDb

0 commit comments

Comments
 (0)