Skip to content

Commit

Permalink
add pyright and e2e tests for CI/CD (#53)
Browse files Browse the repository at this point in the history
* add pyright and e2e tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* check for ci/cd

* fix return logic

* fix -it flags

* fix tty

* fix concurrency

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
C-Loftus and pre-commit-ci[bot] authored Dec 9, 2024
1 parent b81938a commit c1abbaf
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 41 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/e2e_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: E2E Dagster build

on:
push:
workflow_dispatch: # Allows manual triggering of the workflow

jobs:
setup-docker-and-run:
runs-on: ubuntu-latest

steps:
# Checkout the repository
- name: Checkout Code
uses: actions/checkout@v4

# Set up Docker
- name: Set up Docker
uses: docker/setup-buildx-action@v2

# Run the Python script that handles the build and execution
- name: Launch Docker Stack and run tests
run: python3 main.py local && python3 main.py test
25 changes: 25 additions & 0 deletions .github/workflows/pyright.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Pyright Type Checks

on:
push:
workflow_dispatch: # Allows manual triggering of the workflow

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
cache: "pip"

- run: |
python -m venv .venv
source .venv/bin/activate
pip install -r Docker/user_code_requirements.txt
- run: echo "$PWD/.venv/bin" >> $GITHUB_PATH

- uses: jakebailey/pyright-action@v2
with:
pylance-version: latest-release
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}/code",
"remoteRoot": "/opt/dagster/app/code"
"localRoot": "${workspaceFolder}/userCode",
"remoteRoot": "/opt/dagster/app/userCode"
}
],
"justMyCode": true,
Expand Down
2 changes: 1 addition & 1 deletion Docker/Dockerfile_user_code
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ENV DAGSTER_DEBUG=$DAGSTER_DEBUG

# configs and runtime code
WORKDIR /opt/dagster/app
COPY code /opt/dagster/app/code
COPY userCode /opt/dagster/app/userCode
COPY templates /opt/dagster/app/templates

# Expose the necessary ports
Expand Down
8 changes: 7 additions & 1 deletion Docker/docker-compose-user-code.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ services:
environment:
DAGSTER_CURRENT_IMAGE: "dagster_user_code_image"
volumes:
- ../code:/opt/dagster/app/code
- ../userCode:/opt/dagster/app/userCode
# When materialized via the UI, dagster runs the
# user code inside the webserver container
# However, if we are just running pytest we need to have direct
# access to the docker sock inside the user code container
- /var/run/docker.sock:/var/run/docker.sock

networks:
- dagster_network
env_file: "../.env"
Expand Down
4 changes: 2 additions & 2 deletions Docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ fi

if [ "$DAGSTER_DEBUG" = "true" ]; then
echo "Starting dagster in debug mode and waiting for connection to debugpy"
exec python -m debugpy --configure-subProcess true --listen 0.0.0.0:5678 -m dagster dev -h 0.0.0.0 -p 3000 --python-file /opt/dagster/app/code/main.py -d /opt/dagster/app/code
exec python -m debugpy --configure-subProcess true --listen 0.0.0.0:5678 -m dagster dev -h 0.0.0.0 -p 3000 -m userCode.main
else
echo "Starting dagster code server"
exec dagster code-server start -h 0.0.0.0 -p 4000 --python-file /opt/dagster/app/code/main.py -d /opt/dagster/app/code
exec dagster code-server start -h 0.0.0.0 -p 4000 -m userCode.main
fi
1 change: 1 addition & 0 deletions Docker/user_code_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ lxml # used for parsing sitemaps
pyyaml # used for processing gleaner/nabu configs
beautifulsoup4 # used for parsing sitemaps
aiohttp
pytest
2 changes: 1 addition & 1 deletion dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ scheduler:
class: DagsterDaemonScheduler

run_queue:
max_concurrent_runs: 1
max_concurrent_runs: 3

run_monitoring:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion docs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ This repository is a refactor of the [gleanerio/scheduler](https://github.com/gl

## Gleaner and Nabu Notes

The current pipeline for gleaner/nabu operations in Dagster is as follows. All of the steps are inside Dagster [here](../code/main.py) with each being a separate asset.
The current pipeline for gleaner/nabu operations in Dagster is as follows. All of the steps are inside Dagster [here](../userCode/main.py) with each being a separate asset.

Since Dagster uses docs as code, the best way to get the most accurate documentation is by opening up the local UI and looking at the asset description; this will source our code and the associated comments.

Expand Down
50 changes: 43 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
"""


def run_subprocess(command: str):
def run_subprocess(command: str, returnStdoutInsteadOfPrint: bool = False):
"""Run a shell command and stream the output in realtime"""
process = subprocess.Popen(
command, shell=True, stdout=sys.stdout, stderr=sys.stderr
command,
shell=True,
stdout=subprocess.PIPE if returnStdoutInsteadOfPrint else sys.stdout,
stderr=sys.stderr,
)
process.communicate()
stdout, _ = process.communicate()
if process.returncode != 0:
sys.exit(process.returncode)

return stdout.decode("utf-8") if returnStdoutInsteadOfPrint else None


def down():
"""Stop the docker swarm stack"""
Expand All @@ -33,12 +38,16 @@ def up(local: bool, debug: bool):

if not os.path.exists(".env"):
print("Missing .env file. Do you want to copy .env.example to .env ? (y/n)")
answer = input().lower()
if answer == "y" or answer == "yes":
# check if you are running in a terminal or in CI/CD
if not sys.stdin.isatty():
shutil.copy(".env.example", ".env")
else:
print("Missing .env file. Exiting")
return
answer = input().lower()
if answer == "y" or answer == "yes":
shutil.copy(".env.example", ".env")
else:
print("Missing .env file. Exiting")
return

# Reset the swarm if it exists
run_subprocess("docker swarm leave --force || true")
Expand Down Expand Up @@ -90,7 +99,29 @@ def refresh():
)


def test():
"""Run pytest inside the user code container"""

# get the name of the container
containerName = run_subprocess(
"docker ps --filter name=geoconnex_crawler_dagster_user_code --format '{{.Names}}'",
returnStdoutInsteadOfPrint=True,
)
if not containerName:
raise RuntimeError("Could not find the user code container to run pytest")
containerName = containerName.strip() # Container name sometimes has extra \n

# If we are in CI/CD we need to skip the interactive / terminal flags
if not sys.stdin.isatty():
run_subprocess(f"docker exec {containerName} pytest")
else:
run_subprocess(f"docker exec -it {containerName} pytest")


def main():
# set DOCKER_CLI_HINTS false to avoid the advertisement message after every docker cmd
os.environ["DOCKER_CLI_HINTS"] = "false"

# make sure the user is in the same directory as this file
file_dir = os.path.dirname(os.path.abspath(__file__))
if file_dir != os.getcwd():
Expand Down Expand Up @@ -120,6 +151,9 @@ def main():
"prod",
help="Spin up the docker swarm stack with remote s3 and graphdb",
)

subparsers.add_parser("test", help="Run pytest inside the user code container")

args = parser.parse_args()
if args.command == "down":
down()
Expand All @@ -129,6 +163,8 @@ def main():
up(local=False, debug=False)
elif args.command == "refresh":
refresh()
elif args.command == "test":
test()
else:
parser.print_help()

Expand Down
File renamed without changes.
Empty file added userCode/lib/__init__.py
Empty file.
6 changes: 3 additions & 3 deletions code/lib/classes.py → userCode/lib/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any
from dagster import get_dagster_logger
from minio import Minio
from urllib3 import HTTPResponse
from urllib3 import BaseHTTPResponse
from .env import (
GLEANER_MINIO_SECRET_KEY,
GLEANER_MINIO_ACCESS_KEY,
Expand Down Expand Up @@ -42,8 +42,8 @@ def read(self, remote_path: str):
logger.info(f"S3 SERVER : {self.endpoint}")
logger.info(f"S3 PORT : {GLEANER_MINIO_PORT}")
logger.info(f"S3 BUCKET : {GLEANER_MINIO_BUCKET}")
logger.debug(f"S3 object path : {remote_path}")
response: HTTPResponse = self.client.get_object(
logger.debug(f"S3 object path : {remote_path}")
response: BaseHTTPResponse = self.client.get_object(
GLEANER_MINIO_BUCKET, remote_path
)
data = response.read()
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions code/lib/utils.py → userCode/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def create_service(
container_context.networks if len(container_context.networks) else None
),
restart_policy=RestartPolicy(condition="none"),
# Replicated jobs terminate after run
# TODO: There is still a potential error here. If a container fails,
# the job finishes but still appears in the swarm stack
# This might cause an issue
mode=ServiceMode("replicated-job", concurrency=1, replicas=1),
configs=[gleaner, nabu],
)
Expand Down
61 changes: 38 additions & 23 deletions code/main.py → userCode/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from datetime import datetime
from typing import Tuple
from aiohttp import ClientSession
from aiohttp import ClientSession, ClientTimeout
from bs4 import BeautifulSoup
from dagster import (
AssetCheckResult,
Expand All @@ -23,18 +23,19 @@
)
import docker
import dagster_slack
import docker.errors
import requests
import yaml
from lib.classes import S3
from lib.utils import (
from .lib.classes import S3
from .lib.utils import (
remove_non_alphanumeric,
run_scheduler_docker_image,
slack_error_fn,
template_config,
)
from urllib.parse import urlparse

from lib.env import (
from .lib.env import (
GLEANER_GRAPH_URL,
GLEANER_HEADLESS_ENDPOINT,
REMOTE_GLEANER_SITEMAP,
Expand Down Expand Up @@ -133,7 +134,9 @@ def gleaner_links_are_valid():
dead_links: list[dict[str, Tuple[int, str]]] = []

async def validate_url(url: str):
async with ClientSession() as session:
# Geoconnex links generally take at absolute max 8 seconds if it is very large sitemap
# If it is above 12 seconds that is a good signal that something is wrong
async with ClientSession(timeout=ClientTimeout(total=12)) as session:
resp = await session.get(url)

if resp.status != 200:
Expand Down Expand Up @@ -164,30 +167,44 @@ def docker_client_environment():
"""Set up dagster by pulling both the gleaner and nabu images and moving the config files into docker configs"""
get_dagster_logger().info("Getting docker client and pulling images: ")
client = docker.DockerClient(version="1.43")
# check if the docker socket is available
client.images.pull(GLEANERIO_GLEANER_IMAGE)
client.images.pull(GLEANERIO_NABU_IMAGE)
# we create configs as docker config objects so
# we can more easily reuse them and not need to worry about
# navigating / mounting file systems for local config access
api_client = docker.APIClient()

try:
gleanerconfig = client.configs.list(filters={"name": ["gleaner"]})
nabuconfig = client.configs.list(filters={"name": ["nabu"]})
if gleanerconfig:
api_client.remove_config(gleanerconfig[0].id)
if nabuconfig:
api_client.remove_config(nabuconfig[0].id)
except IndexError as e:
get_dagster_logger().info(
f"No configs found to remove during docker client environment creation: {e}"
)
# At the start of the pipeline, remove any existing configs
# and try to regenerate a new one
# since we don't want old/stale configs to be used

s3_client = S3()
client.configs.create(name="nabu", data=s3_client.read("configs/nabuconfig.yaml"))
client.configs.create(
name="gleaner", data=s3_client.read("configs/gleanerconfig.yaml")
)
# However, if another container is using the config it will fail and throw an error
# Instead of using a mutex and trying to synchronize access,
# we just assume that a config that is in use is not stale.
configs = {
"gleaner": "configs/gleanerconfig.yaml",
"nabu": "configs/nabuconfig.yaml",
}

for config_name, config_file in configs.items():
try:
config = client.configs.list(filters={"name": [config_name]})
if config:
api_client.remove_config(config[0].id)
except docker.errors.APIError as e:
get_dagster_logger().info(
f"Skipped removing {config_name} config during docker client environment creation since it is likely in use. Underlying skipped exception was {e}"
)
except IndexError as e:
get_dagster_logger().info(f"No config found for {config_name}: {e}")

try:
client.configs.create(name=config_name, data=S3().read(config_file))
except docker.errors.APIError as e:
get_dagster_logger().info(
f"Skipped creating {config_name} config during docker client environment creation since it is likely in use. Underlying skipped exception was {e}"
)


@asset_check(asset=docker_client_environment)
Expand Down Expand Up @@ -422,8 +439,6 @@ def crawl_entire_graph_schedule():
text_fn=slack_error_fn,
default_status=DefaultSensorStatus.RUNNING,
monitor_all_code_locations=True,
monitor_all_repositories=True,
monitored_jobs=[harvest_job],
)
],
# Commented out but can uncomment if we want to send other slack msgs
Expand Down
File renamed without changes.
Empty file added userCode/test/__init__.py
Empty file.
File renamed without changes.
21 changes: 21 additions & 0 deletions userCode/test/test_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dagster import load_assets_from_modules, materialize
import userCode.main as main


from dagster import AssetsDefinition, AssetSpec, SourceAsset


def test_materialize_configs():
assets = load_assets_from_modules([main])
# It is possible to load certain asset types that cannot be passed into
# Materialize so we filter them to avoid a pyright type error
filtered_assets = [
asset
for asset in assets
if isinstance(asset, (AssetsDefinition, AssetSpec, SourceAsset))
]
result = materialize(
assets=filtered_assets,
selection=["nabu_config", "gleaner_config", "docker_client_environment"],
)
assert result.success

0 comments on commit c1abbaf

Please sign in to comment.