|
| 1 | +import time |
| 2 | +from io import BytesIO |
| 3 | +from tarfile import TarFile, TarInfo |
| 4 | +from typing import Optional |
| 5 | + |
| 6 | +import bcrypt |
| 7 | +from testcontainers.core.container import DockerContainer |
| 8 | + |
| 9 | +class DockerRegistryContainer(DockerContainer): |
| 10 | + # https://docs.docker.com/registry/ |
| 11 | + credentials_path: str = "/htpasswd/credentials.txt" |
| 12 | + |
| 13 | + def __init__( |
| 14 | + self, |
| 15 | + image: str = "registry:latest", |
| 16 | + port: int = 5000, |
| 17 | + username: str = None, |
| 18 | + password: str = None, |
| 19 | + **kwargs, |
| 20 | + ) -> None: |
| 21 | + super().__init__(image=image, **kwargs) |
| 22 | + self.port: int = port |
| 23 | + self.username: Optional[str] = username |
| 24 | + self.password: Optional[str] = password |
| 25 | + self.with_exposed_ports(self.port) |
| 26 | + |
| 27 | + def _copy_credentials(self): |
| 28 | + # Create credentials and write them to the container |
| 29 | + hashed_password: str = bcrypt.hashpw( |
| 30 | + self.password.encode("utf-8"), |
| 31 | + bcrypt.gensalt(rounds=12, prefix=b"2a"), |
| 32 | + ).decode("utf-8") |
| 33 | + content = f"{self.username}:{hashed_password}".encode("utf-8") |
| 34 | + |
| 35 | + with BytesIO() as tar_archive_object, TarFile( |
| 36 | + fileobj=tar_archive_object, mode="w" |
| 37 | + ) as tmp_tarfile: |
| 38 | + tarinfo: TarInfo = TarInfo(name=self.credentials_path) |
| 39 | + tarinfo.size = len(content) |
| 40 | + tarinfo.mtime = time.time() |
| 41 | + |
| 42 | + tmp_tarfile.addfile(tarinfo, BytesIO(content)) |
| 43 | + tar_archive_object.seek(0) |
| 44 | + self.get_wrapped_container().put_archive("/", tar_archive_object) |
| 45 | + |
| 46 | + def start(self): |
| 47 | + if self.username and self.password: |
| 48 | + self.with_env("REGISTRY_AUTH_HTPASSWD_REALM", "local-registry") |
| 49 | + self.with_env("REGISTRY_AUTH_HTPASSWD_PATH", self.credentials_path) |
| 50 | + super().start() |
| 51 | + self._copy_credentials() |
| 52 | + return self |
| 53 | + else: |
| 54 | + super().start() |
| 55 | + return self |
| 56 | + |
| 57 | + def get_registry(self) -> str: |
| 58 | + host: str = self.get_container_host_ip() |
| 59 | + port: str = self.get_exposed_port(self.port) |
| 60 | + return f"{host}:{port}" |
0 commit comments