Skip to content

Commit ae6eea7

Browse files
committed
[Storage] Garbage collector CLI
Added a new command to the CCN operator CLI to run a garbage collector on local storage. The new `gc run` command lists all the files that are not linked to any message or permanent pin and deletes them. Using the --verbose option, the command will print more details on the files it will preserve and delete. The --dry-run option allows to run the GC without actually deleting any file.
1 parent 50798e3 commit ae6eea7

File tree

4 files changed

+178
-3
lines changed

4 files changed

+178
-3
lines changed

setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ testing =
9797
pytest-aiohttp
9898
pytest-asyncio
9999
pytest-mock
100+
types-pytz
100101
types-pyyaml
101102
types-requests
102103
types-setuptools
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""
2+
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
3+
of messages already on the node. The files that are not linked to any message are scheduled for
4+
deletion.
5+
"""
6+
import asyncio
7+
import datetime as dt
8+
from typing import Any, Dict, FrozenSet, List, Optional
9+
from typing import cast
10+
11+
import pytz
12+
import typer
13+
from aleph_message.models import MessageType
14+
from configmanager import Config
15+
16+
import aleph.model
17+
from aleph.ccn_cli.cli_config import CliConfig
18+
from aleph.config import get_defaults
19+
from aleph.model import init_db_globals
20+
from aleph.model.filepin import PermanentPin
21+
from aleph.model.hashes import delete_value as delete_gridfs_file
22+
from aleph.model.messages import Message
23+
24+
gc_ns = typer.Typer()
25+
26+
27+
async def get_hashes(
28+
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None
29+
) -> FrozenSet[str]:
30+
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
31+
value = dictionary[fields[0]]
32+
if len(fields) > 1:
33+
return rgetitem(value, fields[1:])
34+
return value
35+
36+
filters = {
37+
# Check if the hash field exists in case the message was forgotten
38+
item_hash_field: {"$exists": 1},
39+
item_type_field: {"$in": ["ipfs", "storage"]},
40+
}
41+
if msg_type:
42+
filters["type"] = msg_type
43+
44+
hashes = (
45+
rgetitem(msg, item_hash_field.split("."))
46+
async for msg in Message.collection.find(
47+
filters,
48+
{item_hash_field: 1},
49+
batch_size=1000,
50+
)
51+
)
52+
53+
# Temporary fix for api2. A message has a list of dicts as item hash.
54+
hashes = [h for h in hashes if isinstance(h, str)]
55+
56+
return frozenset(hashes)
57+
58+
59+
def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
60+
typer.echo("The following files will be preserved:")
61+
for file_type, files in files_to_preserve.items():
62+
typer.echo(f"* {len(files)} {file_type}")
63+
64+
65+
async def list_files_to_preserve(
66+
gridfs_files_dict: Dict[str, Dict],
67+
temporary_files_ttl: int,
68+
) -> Dict[str, FrozenSet[str]]:
69+
files_to_preserve_dict = {}
70+
71+
# Preserve any file that was uploaded less than an hour ago
72+
current_datetime = pytz.utc.localize(dt.datetime.utcnow())
73+
files_to_preserve_dict["temporary files"] = frozenset(
74+
[
75+
file["filename"]
76+
for file in gridfs_files_dict.values()
77+
if file["uploadDate"]
78+
> current_datetime - dt.timedelta(seconds=temporary_files_ttl)
79+
]
80+
)
81+
82+
# Get all the messages that potentially store data in local storage:
83+
# * any message with item_type in ["storage", "ipfs"]
84+
# * STOREs with content.item_type in ["storage", "ipfs"]
85+
files_to_preserve_dict["non-inline messages"] = await get_hashes(
86+
item_type_field="item_type",
87+
item_hash_field="item_hash",
88+
)
89+
files_to_preserve_dict["stores"] = await get_hashes(
90+
item_type_field="content.item_type",
91+
item_hash_field="content.item_hash",
92+
msg_type=MessageType.store,
93+
)
94+
95+
# We also keep permanent pins, even if they are also stored on IPFS
96+
files_to_preserve_dict["file pins"] = frozenset(
97+
[
98+
pin["multihash"]
99+
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
100+
]
101+
)
102+
103+
return files_to_preserve_dict
104+
105+
106+
async def run(ctx: typer.Context, dry_run: bool):
107+
config = Config(schema=get_defaults())
108+
cli_config = cast(CliConfig, ctx.obj)
109+
config.yaml.load(str(cli_config.config_file_path))
110+
111+
init_db_globals(config=config)
112+
if aleph.model.db is None: # for mypy
113+
raise ValueError("DB not initialized as expected.")
114+
115+
# Get a set of all the files currently in GridFS
116+
gridfs_files_dict = {
117+
file["filename"]: file
118+
async for file in aleph.model.db["fs.files"].find(
119+
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
120+
batch_size=1000,
121+
)
122+
}
123+
gridfs_files = frozenset(gridfs_files_dict.keys())
124+
125+
typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")
126+
127+
files_to_preserve_dict = await list_files_to_preserve(
128+
gridfs_files_dict=gridfs_files_dict,
129+
temporary_files_ttl=config.storage.temporary_files_ttl.value,
130+
)
131+
files_to_preserve = frozenset().union(*files_to_preserve_dict.values())
132+
files_to_delete = gridfs_files - files_to_preserve
133+
134+
if cli_config.verbose:
135+
print_files_to_preserve(files_to_preserve_dict)
136+
137+
restored_memory = sum(
138+
gridfs_files_dict[filename]["length"] for filename in files_to_delete
139+
)
140+
typer.echo(
141+
f"{len(files_to_delete)} will be deleted, totaling {restored_memory} bytes."
142+
)
143+
144+
if dry_run:
145+
if cli_config.verbose:
146+
if files_to_delete:
147+
typer.echo("The following files will be deleted:")
148+
for file_to_delete in files_to_delete:
149+
typer.echo(f"* {file_to_delete}")
150+
151+
else:
152+
for file_to_delete in files_to_delete:
153+
typer.echo(f"Deleting {file_to_delete}...")
154+
await delete_gridfs_file(file_to_delete)
155+
156+
typer.echo("Done.")
157+
158+
159+
@gc_ns.command(name="run")
160+
def run_gc(
161+
ctx: typer.Context,
162+
dry_run: bool = typer.Option(
163+
False, help="If set, display files to delete without deleting them."
164+
),
165+
):
166+
asyncio.run(run(ctx, dry_run))

src/aleph/ccn_cli/main.py

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typer
55

66
from .cli_config import CliConfig
7+
from .commands.garbage_collector import gc_ns
78
from .commands.keys import keys_ns
89
from .commands.migrations import migrations_ns
910

@@ -17,6 +18,7 @@ def validate_config_file_path(config: Optional[Path]) -> Optional[Path]:
1718

1819
return config
1920

21+
2022
def validate_key_dir(key_dir: Optional[Path]) -> Optional[Path]:
2123
if key_dir is not None:
2224
if key_dir.exists and not key_dir.is_dir():
@@ -63,6 +65,7 @@ def main(
6365
ctx.obj = cli_config
6466

6567

68+
app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.")
6669
app.add_typer(keys_ns, name="keys", help="Operations on private keys.")
6770
app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.")
6871

src/aleph/config.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ def get_defaults():
77
return {
88
"logging": {
99
"level": logging.WARNING,
10-
"max_log_file_size": 1_000_000_000 # 1GB,
10+
"max_log_file_size": 1_000_000_000, # 1GB,
1111
},
1212
"aleph": {
1313
"queue_topic": "ALEPH-QUEUE",
@@ -39,7 +39,12 @@ def get_defaults():
3939
"/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U",
4040
],
4141
},
42-
"storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"},
42+
"storage": {
43+
"folder": "./data/",
44+
"store_files": False,
45+
"engine": "mongodb",
46+
"temporary_files_ttl": 3600,
47+
},
4348
"nuls": {
4449
"chain_id": 8964,
4550
"enabled": False,
@@ -89,7 +94,7 @@ def get_defaults():
8994
"peers": [
9095
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
9196
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
92-
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF"
97+
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
9398
],
9499
},
95100
"sentry": {

0 commit comments

Comments
 (0)