|
| 1 | +import json |
| 2 | +import os |
| 3 | +import pathlib |
| 4 | +import subprocess |
| 5 | +import time |
| 6 | +from urllib.error import HTTPError |
| 7 | +from urllib.error import URLError |
| 8 | +from urllib.request import urlopen |
| 9 | + |
| 10 | +import pytest |
| 11 | + |
| 12 | + |
| 13 | +MODULES_ALWAYS_LOADED = ["ddtrace.appsec", "ddtrace.appsec._capabilities", "ddtrace.appsec._constants"] |
| 14 | +MODULE_ASM_ONLY = ["ddtrace.appsec._processor", "ddtrace.appsec._ddwaf"] |
| 15 | +MODULE_IAST_ONLY = [ |
| 16 | + "ddtrace.appsec._iast", |
| 17 | + "ddtrace.appsec._iast._taint_tracking._native", |
| 18 | + "ddtrace.appsec._iast._stacktrace", |
| 19 | +] |
| 20 | + |
| 21 | + |
| 22 | +@pytest.mark.parametrize("appsec_enabled", ["true", "false"]) |
| 23 | +@pytest.mark.parametrize("iast_enabled", ["true", None]) |
| 24 | +@pytest.mark.parametrize("aws_lambda", ["any", None]) |
| 25 | +def test_loading(appsec_enabled, iast_enabled, aws_lambda): |
| 26 | + flask_app = pathlib.Path(__file__).parent / "mini.py" |
| 27 | + env = {} |
| 28 | + if appsec_enabled: |
| 29 | + env["DD_APPSEC_ENABLED"] = appsec_enabled |
| 30 | + if iast_enabled: |
| 31 | + env["DD_IAST_ENABLED"] = iast_enabled |
| 32 | + if aws_lambda: |
| 33 | + env["AWS_LAMBDA_FUNCTION_NAME"] = aws_lambda |
| 34 | + |
| 35 | + all_env = os.environ | env |
| 36 | + |
| 37 | + process = subprocess.Popen( |
| 38 | + ["python", str(flask_app)], |
| 39 | + stdout=subprocess.PIPE, |
| 40 | + stderr=subprocess.PIPE, |
| 41 | + env=all_env, |
| 42 | + ) |
| 43 | + for i in range(16): |
| 44 | + time.sleep(1) |
| 45 | + try: |
| 46 | + with urlopen("http://localhost:8475") as response: |
| 47 | + assert response.status == 200 |
| 48 | + payload = response.read().decode() |
| 49 | + data = json.loads(payload) |
| 50 | + assert "appsec" in data |
| 51 | + # appsec is always enabled |
| 52 | + for m in MODULES_ALWAYS_LOADED: |
| 53 | + assert m in data["appsec"], f"{m} not in {data['appsec']}" |
| 54 | + for m in MODULE_ASM_ONLY: |
| 55 | + if appsec_enabled == "true" and not aws_lambda: |
| 56 | + assert m in data["appsec"], f"{m} not in {data['appsec']}" |
| 57 | + else: |
| 58 | + assert m not in data["appsec"], f"{m} in {data['appsec']}" |
| 59 | + for m in MODULE_IAST_ONLY: |
| 60 | + if iast_enabled and not aws_lambda: |
| 61 | + assert m in data["appsec"], f"{m} not in {data['appsec']}" |
| 62 | + else: |
| 63 | + assert m not in data["appsec"], f"{m} in {data['appsec']}" |
| 64 | + process.terminate() |
| 65 | + process.wait() |
| 66 | + break |
| 67 | + except HTTPError as e: |
| 68 | + process.terminate() |
| 69 | + process.wait() |
| 70 | + raise AssertionError(e.read().decode()) |
| 71 | + except URLError: |
| 72 | + continue |
| 73 | + except AssertionError: |
| 74 | + process.terminate() |
| 75 | + process.wait() |
| 76 | + raise |
| 77 | + else: |
| 78 | + process.terminate() |
| 79 | + process.wait() |
| 80 | + raise AssertionError("Server did not start") |
0 commit comments