Skip to content

Commit 89daa12

Browse files
committed
Add products exporter
Resolves: AlmaLinux/build-system#354
1 parent c3aa5c2 commit 89daa12

File tree

11 files changed

+609
-415
lines changed

11 files changed

+609
-415
lines changed

alws/crud/products.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async def create_product(
105105
)
106106
task_results = await asyncio.gather(*repo_tasks)
107107

108-
for repo_name, repo_url, arch, pulp_href, is_debug in task_results:
108+
for repo_name, repo_url, arch, pulp_href, export_path, is_debug in task_results:
109109
repo = models.Repository(
110110
name=repo_name,
111111
url=repo_url,
@@ -114,6 +114,7 @@ async def create_product(
114114
type=arch,
115115
debug=is_debug,
116116
production=True,
117+
export_path=export_path,
117118
)
118119
product.repositories.append(repo)
119120
items_to_insert.append(repo)

alws/dependencies.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
from contextlib import asynccontextmanager
2+
from typing import AsyncGenerator
3+
4+
from fastapi_sqla import open_async_session
15
from redis import asyncio as aioredis
6+
from sqlalchemy.ext.asyncio import AsyncSession
27

38
from alws.config import settings
49

510
__all__ = ['get_redis', 'get_async_db_key']
611

712

8-
async def get_redis() -> aioredis.Redis:
13+
async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
914
client = aioredis.from_url(settings.redis_url)
1015
try:
1116
yield client
@@ -15,3 +20,9 @@ async def get_redis() -> aioredis.Redis:
1520

1621
def get_async_db_key() -> str:
1722
return "async"
23+
24+
25+
@asynccontextmanager
26+
async def get_async_db_session() -> AsyncGenerator[AsyncSession, None]:
27+
async with open_async_session(key=get_async_db_key()) as session:
28+
yield session

alws/utils/copr.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ async def create_product_repo(
9292
platform_name: str,
9393
arch: str,
9494
is_debug: bool,
95-
) -> typing.Tuple[str, str, str, str, bool]:
95+
) -> typing.Tuple[str, str, str, str, str, bool]:
9696

9797
debug_suffix = '-debug' if is_debug else ''
9898
repo_name = (
@@ -104,7 +104,8 @@ async def create_product_repo(
104104
create_publication=True,
105105
base_path_start='copr',
106106
)
107-
return repo_name, repo_url, arch, repo_href, is_debug
107+
export_path = f"{product_name}/{platform_name}/{'debug/' if is_debug else ''}{arch}/"
108+
return repo_name, repo_url, arch, repo_href, export_path, is_debug
108109

109110

110111
async def create_product_sign_key_repo(

alws/utils/exporter.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
2-
import typing
3-
import urllib
2+
import urllib.parse
43
from pathlib import Path
4+
from typing import List, Union
55

66
import aiofiles
77
import aiohttp
@@ -12,14 +12,11 @@
1212

1313

1414
async def fs_export_repository(
15-
repository_ids: typing.List[int], db: AsyncSession
15+
repository_ids: List[int],
16+
session: AsyncSession,
1617
):
17-
export_task = await repo_exporter.create_pulp_exporters_to_fs(
18-
db, repository_ids
19-
)
20-
export_data = await repo_exporter.execute_pulp_exporters_to_fs(
21-
db, export_task
22-
)
18+
export_task = await repo_exporter.create_pulp_exporters_to_fs(session, repository_ids)
19+
export_data = await repo_exporter.execute_pulp_exporters_to_fs(session, export_task)
2320
export_paths = list(export_data.keys())
2421
for repo_elem, repo_data in export_data.items():
2522
repo_url = urllib.parse.urljoin(repo_data, 'repodata/')
@@ -32,17 +29,16 @@ async def fs_export_repository(
3229

3330

3431
async def get_repodata_file_links(base_url: str):
35-
async with aiohttp.ClientSession() as session:
32+
async with aiohttp.ClientSession(raise_for_status=True) as session:
3633
async with session.get(base_url) as response:
37-
response.raise_for_status()
3834
content = await response.text()
3935
doc = document_fromstring(content)
4036
children_urls = [base_url + a.get('href') for a in doc.xpath('//a')]
4137
return children_urls
4238

4339

44-
async def download_file(url: str, dest: str):
45-
async with aiohttp.ClientSession() as session:
40+
async def download_file(url: str, dest: Union[str, Path]):
41+
async with aiohttp.ClientSession(raise_for_status=True) as session:
4642
async with session.get(url) as response:
4743
content = await response.content.read()
4844
async with aiofiles.open(dest, 'wb') as f:

alws/utils/pulp_client.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
Any,
1212
Dict,
1313
List,
14+
Optional,
1415
)
1516

1617
import aiohttp
1718
from aiohttp.client_exceptions import ClientResponseError
1819
from aiohttp_retry import ExponentialRetry, RetryClient
1920
from fastapi import status
2021

22+
from alws.config import settings
2123
from alws.constants import UPLOAD_FILE_CHUNK_SIZE
2224
from alws.utils.file_utils import hash_content, hash_file
2325
from alws.utils.ids import get_random_unique_version
@@ -31,7 +33,7 @@ def __init__(
3133
host: str,
3234
username: str,
3335
password: str,
34-
semaphore: asyncio.Semaphore = None,
36+
semaphore: Optional[asyncio.Semaphore] = None,
3537
):
3638
self.semaphore = semaphore
3739
self._host = host
@@ -875,8 +877,7 @@ async def list_filesystem_exporters(self):
875877
result = await self.request("GET", endpoint)
876878
if result["count"] > 0:
877879
return result["results"]
878-
else:
879-
return []
880+
return []
880881

881882
async def get_filesystem_exporter(self, fse_pulp_href: str):
882883
return await self.request("GET", fse_pulp_href)
@@ -1050,3 +1051,12 @@ async def request(
10501051
exc.message += f": {str(response_json)}"
10511052
raise exc
10521053
return response_json
1054+
1055+
1056+
def get_pulp_client(semaphore: Optional[asyncio.Semaphore] = None) -> PulpClient:
1057+
return PulpClient(
1058+
host=settings.pulp_host,
1059+
username=settings.pulp_user,
1060+
password=settings.pulp_password,
1061+
semaphore=semaphore,
1062+
)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import os
2+
import re
3+
import sys
4+
5+
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
6+
7+
from fastapi_sqla import open_session
8+
from sqlalchemy import select
9+
from sqlalchemy.orm import joinedload
10+
11+
from alws.models import Product
12+
from alws.utils.fastapi_sqla_setup import sync_setup
13+
14+
15+
def main():
16+
sync_setup()
17+
with open_session() as session:
18+
for product in (
19+
session.execute(
20+
select(Product)
21+
.where(Product.is_community.is_(True))
22+
.options(joinedload(Product.repositories), joinedload(Product.platforms))
23+
)
24+
.scalars()
25+
.unique()
26+
.all()
27+
):
28+
if not product.repositories:
29+
continue
30+
platform_names = '|'.join((platform.name for platform in product.platforms))
31+
platform_pattern = re.compile(
32+
rf'-({platform_names})-(\w+)(-debug|)-dr$',
33+
flags=re.IGNORECASE,
34+
)
35+
for repo in product.repositories:
36+
if repo.arch == 'sign_key':
37+
continue
38+
regex_result = platform_pattern.search(repo.name)
39+
if not regex_result:
40+
continue
41+
platform, *_ = regex_result.groups()
42+
repo.export_path = (
43+
f"{product.name}/{platform}/{'debug/' if repo.debug else ''}{repo.arch}/"
44+
)
45+
46+
47+
if __name__ == '__main__':
48+
main()

scripts/exporters/__init__.py

Whitespace-only changes.

scripts/exporters/base_exporter.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import asyncio
2+
import logging
3+
import re
4+
import shutil
5+
import sys
6+
import urllib.parse
7+
from pathlib import Path
8+
from typing import List, Literal, Optional, Tuple
9+
10+
from plumbum import local
11+
from sqlalchemy import select
12+
13+
from alws.config import settings
14+
from alws.dependencies import get_async_db_session
15+
from alws.models import Repository
16+
from alws.utils.exporter import download_file, get_repodata_file_links
17+
from alws.utils.pulp_client import get_pulp_client
18+
19+
20+
class BasePulpExporter:
21+
def __init__(
22+
self,
23+
repodata_cache_dir: str,
24+
logger_name: str = '',
25+
log_file_path: Path = Path('/tmp/exporter.log'),
26+
verbose: bool = False,
27+
export_method: Literal['write', 'hardlink', 'symlink'] = 'hardlink',
28+
export_path: str = settings.pulp_export_path,
29+
):
30+
self.pulp_client = get_pulp_client()
31+
self.export_method = export_method
32+
self.export_path = export_path
33+
self.createrepo_c = local["createrepo_c"]
34+
35+
self.repodata_cache_dir = Path(repodata_cache_dir).expanduser().absolute()
36+
self.checksums_cache_dir = self.repodata_cache_dir.joinpath('checksums')
37+
for dir_path in (self.repodata_cache_dir, self.checksums_cache_dir):
38+
if dir_path.exists():
39+
continue
40+
dir_path.mkdir()
41+
42+
self.logger = logging.getLogger(logger_name)
43+
Path(log_file_path).parent.mkdir(exist_ok=True)
44+
logging.basicConfig(
45+
format="%(asctime)s %(levelname)-8s %(message)s",
46+
level=logging.DEBUG if verbose else logging.INFO,
47+
datefmt="%Y-%m-%d %H:%M:%S",
48+
handlers=[
49+
logging.FileHandler(filename=log_file_path, mode="a"),
50+
logging.StreamHandler(stream=sys.stdout),
51+
],
52+
)
53+
54+
def regenerate_repo_metadata(self, repo_path: str):
55+
partial_path = re.sub(str(settings.pulp_export_path), "", str(repo_path)).strip("/")
56+
repodata_path = Path(repo_path, "repodata")
57+
repo_repodata_cache = self.repodata_cache_dir.joinpath(partial_path)
58+
cache_repodata_dir = repo_repodata_cache.joinpath("repodata")
59+
self.logger.info('Repodata cache dir: %s', cache_repodata_dir)
60+
args = [
61+
"--update",
62+
"--keep-all-metadata",
63+
"--cachedir",
64+
self.checksums_cache_dir,
65+
]
66+
if repo_repodata_cache.exists():
67+
args.extend(["--update-md-path", cache_repodata_dir])
68+
args.append(repo_path)
69+
self.logger.info('Starting createrepo_c')
70+
_, stdout, _ = self.createrepo_c.run(args=args)
71+
self.logger.info(stdout)
72+
self.logger.info('createrepo_c is finished')
73+
# Cache newly generated repodata into folder for future re-use
74+
if not repo_repodata_cache.exists():
75+
repo_repodata_cache.mkdir(parents=True)
76+
else:
77+
# Remove previous repodata before copying new ones
78+
if cache_repodata_dir.exists():
79+
shutil.rmtree(cache_repodata_dir)
80+
81+
shutil.copytree(repodata_path, cache_repodata_dir)
82+
83+
async def create_filesystem_exporters(
84+
self,
85+
repository_ids: List[int],
86+
get_publications: bool = False,
87+
):
88+
async def get_exporter_data(repository: Repository) -> Tuple[str, dict]:
89+
export_path = str(Path(self.export_path, repository.export_path, "Packages"))
90+
exporter_name = (
91+
f"{repository.name}-{repository.arch}-debug"
92+
if repository.debug
93+
else f"{repository.name}-{repository.arch}"
94+
)
95+
fs_exporter_href = await self.pulp_client.create_filesystem_exporter(
96+
exporter_name,
97+
export_path,
98+
export_method=self.export_method,
99+
)
100+
101+
repo_latest_version = await self.pulp_client.get_repo_latest_version(
102+
repository.pulp_href
103+
)
104+
if not repo_latest_version:
105+
raise ValueError('cannot find latest repo version')
106+
repo_exporter_dict = {
107+
"repo_id": repository.id,
108+
"repo_url": repository.url,
109+
"repo_latest_version": repo_latest_version,
110+
"exporter_name": exporter_name,
111+
"export_path": export_path,
112+
"exporter_href": fs_exporter_href,
113+
}
114+
if get_publications:
115+
publications = await self.pulp_client.get_rpm_publications(
116+
repository_version_href=repo_latest_version,
117+
include_fields=["pulp_href"],
118+
)
119+
if publications:
120+
publication_href = publications[0].get("pulp_href")
121+
repo_exporter_dict["publication_href"] = publication_href
122+
return fs_exporter_href, repo_exporter_dict
123+
124+
async with get_async_db_session() as session:
125+
query = select(Repository).where(Repository.id.in_(repository_ids))
126+
result = await session.execute(query)
127+
repositories = list(result.scalars().all())
128+
129+
results = await asyncio.gather(*(get_exporter_data(repo) for repo in repositories))
130+
131+
return list(dict(results).values())
132+
133+
async def download_repodata(self, repodata_path, repodata_url):
134+
file_links = await get_repodata_file_links(repodata_url)
135+
for link in file_links:
136+
file_name = Path(link).name
137+
if file_name.endswith('..'):
138+
continue
139+
self.logger.info("Downloading repodata from %s", link)
140+
await download_file(link, Path(repodata_path, file_name))
141+
142+
async def _export_repository(self, exporter: dict) -> Optional[str]:
143+
self.logger.info(
144+
"Exporting repository using following data: %s",
145+
str(exporter),
146+
)
147+
export_path = exporter["export_path"]
148+
href = exporter["exporter_href"]
149+
repository_version = exporter["repo_latest_version"]
150+
try:
151+
await self.pulp_client.export_to_filesystem(href, repository_version)
152+
except Exception:
153+
self.logger.exception(
154+
"Cannot export repository via %s",
155+
str(exporter),
156+
)
157+
return
158+
parent_dir = Path(export_path).parent
159+
if not parent_dir.exists():
160+
self.logger.info(
161+
"Repository %s directory is absent",
162+
exporter["exporter_name"],
163+
)
164+
return
165+
166+
repodata_path = parent_dir.joinpath("repodata").absolute()
167+
repodata_url = urllib.parse.urljoin(exporter["repo_url"], "repodata/")
168+
if repodata_path.exists():
169+
shutil.rmtree(repodata_path)
170+
repodata_path.mkdir()
171+
self.logger.info('Downloading repodata from %s', repodata_url)
172+
try:
173+
await self.download_repodata(repodata_path, repodata_url)
174+
except Exception as e:
175+
self.logger.exception("Cannot download repodata file: %s", str(e))
176+
177+
return export_path
178+
179+
async def export_repositories(self, repo_ids: List[int]) -> List[str]:
180+
exporters = await self.create_filesystem_exporters(repo_ids)
181+
results = await asyncio.gather(*(self._export_repository(e) for e in exporters))
182+
return [path for path in results if path]

0 commit comments

Comments
 (0)