Skip to content

feat(ingestion): per-connector CLI version matrix + resolution stamp …#17436

Open
puneetagarwal-datahub wants to merge 21 commits into
masterfrom
per-connector-cli-version-matrix
Open

feat(ingestion): per-connector CLI version matrix + resolution stamp …#17436
puneetagarwal-datahub wants to merge 21 commits into
masterfrom
per-connector-cli-version-matrix

Conversation

@puneetagarwal-datahub
Copy link
Copy Markdown
Contributor

@puneetagarwal-datahub puneetagarwal-datahub commented May 14, 2026

feat(ingestion): per-connector CLI version matrix + audit stamp + test-connection path

Adds per-connector CLI version resolution so a Snowflake fix can ship without
forcing a bump on every other connector. A JSON matrix maps server release ×
connector → version, with optional per-deployment cohort allowlists for canary
rollouts.

Matrix key conventions: Server-version keys are git describe --tags truncated at the first - (so v1.1.0-cloud → v1.1.0); CLI versions (_default, cohort version) are forwarded verbatim to pip install acryl-datahub==… and must use PyPI format (no v prefix).

Matrix schema:

{
  "v1.1.0": {                              // server key: GitHub tag with `-cloud` stripped
    "snowflake": {
      "_default": "1.6.0",                 // CLI: PyPI version, no `v`
      "cohorts": [
        { "version": "1.5.0.13.post3",     // CLI: PyPI version, no `v`
          "deployments": ["acme-corp"] }   // matches DataHubIngestionSourceConfig.deploymentId
      ]
    },
    "bigquery": { "_default": "1.6.0" }
  },
  "v1.0.2": {
    "snowflake": { "_default": "1.5.0.19" }
  }
}

deployments entries are matched against ingestion.deploymentId (bound to
the DATAHUB_EXECUTOR_CUSTOMER_ID env var injected by the Acryl Cloud Helm
chart from the K8s namespace). The leading underscore on _default marks it
as a sentinel key so it can't collide with a connector name.

Resolution priority (top wins):

  1. Per-source explicit config.version (SOURCE_CONFIG_OVERRIDE)
  2. matrix[serverVersion][connectorType].cohorts (MATRIX_COHORT)
    first cohort whose deployments list contains this
    deployment's id
  3. matrix[serverVersion][connectorType]._default (MATRIX_CONNECTOR_DEFAULT)
  4. defaultCliVersion from application.yaml (APPLICATION_DEFAULT)

Storage is pluggable. The consumer (IngestionCliVersionMatrixService) is
decoupled from where the matrix lives via the IngestionCliVersionMatrixSource
interface, selected by ingestion.cliVersionMatrix.source:

  • source: "http" (default) + http.url set
    → HttpUrlIngestionCliVersionMatrixSource (periodic HTTP GET)
  • source: "http" + http.url empty (OSS default)
    → NoOpIngestionCliVersionMatrixSource (empty matrix, never any null checks)
  • source: "none"
    → NoOpIngestionCliVersionMatrixSource (explicit kill-switch, wins over URL)

Future backends (GMS aspect on a globalSettings entity, AppConfig / Consul /
etcd, …) add their own discriminator value here — the resolver and the audit
stamp don't change.

Three execution entry points are covered:

  • CreateIngestionExecutionRequestResolver (manual triggers)
  • IngestionScheduler.ExecutionRequestRunnable (scheduled triggers)
  • CreateTestConnectionRequestResolver (test-connection from UI)

Each one stamps a structured CliVersionAudit record on the resulting
ExecutionRequestInput aspect, alongside the resolved version string in
args.version. Post-hoc forensics queries both with one SQL pass rather than
iterating the generic args map:

args.version : "1.3.1.5"
cliVersionAudit : { source: MATRIX_COHORT, serverVersion: "v1.6.0rc1" }

Each call site also emits a structured DEBUG log (and WARN when the resolved
version is empty — i.e. every tier including defaultCliVersion fell through)
under the caller's class so operators can grep all three trigger paths with
grep "Resolved ingestion CLI version".

Test connections also now honor defaultCliVersion when no explicit version is
supplied — previously they silently omitted version, causing the executor to
use whatever bundled CLI it shipped with rather than the configured default.

Feature is off by default — when no http.url is set the factory wires a
no-op source and every resolveVersion() returns empty, preserving existing
behavior exactly. In single-tenant deployments without a deployment id set,
cohort matching never fires and only the per-connector _default applies.

Also: treat empty / whitespace-only version strings as unset (bootstrap YAML
can render version: " " when the source has no version pin; forwarding
that blank to the executor would silently fall back to the bundled CLI).

Config (nested under ingestion.cliVersionMatrix):

INGESTION_VERSION_MATRIX_SOURCE (default: "http"; "none" disables)
INGESTION_VERSION_MATRIX_URL (default: empty → no-op)
INGESTION_VERSION_MATRIX_REFRESH_SECONDS (default: 600)
INGESTION_VERSION_MATRIX_AUTH_TOKEN (optional; sent as Authorization
header verbatim — supports a
private gist / S3 / CDN endpoint)
DATAHUB_EXECUTOR_CUSTOMER_ID (deployment id for cohort matching)

@github-actions github-actions Bot added product PR or Issue related to the DataHub UI/UX devops PR or Issue related to DataHub backend & deployment community-contribution PR or Issue raised by member(s) of DataHub Community labels May 14, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Linear: CAT-2046

Thanks for your contribution! We have created an internal ticket to track this PR. A member of the core DataHub team will be assigned to review it within the next few business days - you will get a follow-up comment once a reviewer is assigned.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 14, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
22669 1 22668 184
View the top 1 failed test(s) by shortest run time
tests.integration.test_plugin::test_airflow_plugin[v2_sqlite_operator]
Stack Traces | 36.6s run time
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_airflow_plugin_v2_sqlite_0')
golden_filename = 'v2_sqlite_operator'
test_case = DagTestCase(dag_id='sqlite_operator', success=True, multiple_connections=False, platform_instance='myairflow', enable_datajob_lineage=True, cluster=None, test_variant=None)

    @pytest.mark.parametrize(
        ["golden_filename", "test_case"],
        [
            pytest.param(
                f"v2_{test_case.dag_test_id}",
                test_case,
                id=f"v2_{test_case.dag_test_id}",
            )
            for test_case in test_cases
        ],
    )
    def test_airflow_plugin(
        tmp_path: pathlib.Path,
        golden_filename: str,
        test_case: DagTestCase,
    ) -> None:
        # This test:
        # - Configures the plugin.
        # - Starts a local airflow instance in a subprocess.
        # - Runs a DAG that uses an operator supported by the extractor.
        # - Waits for the DAG to complete.
        # - Validates the metadata generated against a golden file.
    
        golden_path = GOLDENS_FOLDER / f"{golden_filename}.json"
    
        dag_id = test_case.dag_id
    
        with _run_airflow(
            tmp_path,
            dags_folder=DAGS_FOLDER,
            multiple_connections=test_case.multiple_connections,
            platform_instance=test_case.platform_instance,
            enable_datajob_lineage=test_case.enable_datajob_lineage,
            cluster=test_case.cluster,
        ) as airflow_instance:
            print(f"Running DAG {dag_id}...")
            _wait_for_dag_to_load(airflow_instance, dag_id)
    
            trigger_cmd = [
                str(airflow_instance.airflow_executable),
                "dags",
                "trigger",
                "--logical-date",
                "2023-09-27T21:34:38+00:00",
                "-r",
                "manual_run_test",
                dag_id,
            ]
    
            subprocess.check_call(
                trigger_cmd,
                env=airflow_instance.env_vars,
            )
    
            print("Waiting for DAG to finish...")
>           _wait_for_dag_finish(
                airflow_instance, dag_id, require_success=test_case.success
            )

tests/integration/test_plugin.py:670: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv/lib/python3.11.............../site-packages/tenacity/__init__.py:338: in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
venv/lib/python3.11.............../site-packages/tenacity/__init__.py:477: in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
venv/lib/python3.11.............../site-packages/tenacity/__init__.py:378: in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
venv/lib/python3.11.............../site-packages/tenacity/__init__.py:400: in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15....../x64/lib/python3.11....../concurrent/futures/_base.py:449: in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.11.15....../x64/lib/python3.11....../concurrent/futures/_base.py:401: in __get_result
    raise self._exception
venv/lib/python3.11.............../site-packages/tenacity/__init__.py:480: in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

airflow_instance = AirflowInstance(airflow_home=PosixPath('.../pytest-of-runner/pytest-0/test_airflow_plugin_v2_sqlite_0/airflow_home'), airflow_port=10071, pid=7360, env_vars={'PATH': '.../airflow-plugin/venv/bin:.../airflow-plugin/venv/bin:.../hostedtoolcache/Python/3.11.15/x64/bin:.../hostedtoolcache/Python/3.11.15/x64:.../hostedtoolcache/Java_Zulu_jdk/21.0.11-10/x64/bin:/snap/bin:.../home/runner/.local/bin:/opt/pipx_bin:.../home/runner/.cargo/bin:.../home/runner/.config.../composer/vendor/bin:.../usr/local/.ghcup/bin:.../home/runner/.dotnet/tools:.../usr/local/sbin:.../usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:.../usr/local/games:/snap/bin', 'AIRFLOW_HOME': '.../pytest-of-runner/pytest-0/test_airflow_plugin_v2_sqlite_0/airflow_home', 'no_proxy': '*', 'PYTHONFAULTHANDLER': '1', 'PYTHONPATH': '/tmp', 'AIRFLOW__API__PORT': '10071', 'AIRFLOW__API__BASE_URL': 'http://airflow.example.com', 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False', 'AIRFLOW__CORE__DAGS_FOLDER': '.../tests/integration/dags', 'AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION': 'False', 'AIRFLOW__CORE__LAZY_LOAD_PLUGINS': 'True', 'AIRFLOW__DATAHUB__CONN_ID': 'datahub_file_default', 'AIRFLOW__DATAHUB__DAG_FILTER_STR': '{ "deny": ["dag_to_skip"] }', 'AIRFLOW_CONN_DATAHUB_FILE_DEFAULT': 'datahub-file://%2Ftmp%2Fpytest-of-runner%2Fpytest-0%2Ftest_airflow_plugin_v2_sqlite_0%2Fdatahub_metadata.json', 'AIRFLOW_CONN_MY_SNOWFLAKE': 'snowflake://fake_username:fake_password@/DATAHUB_TEST_SCHEMA?account=fake_account&database=DATAHUB_TEST_DATABASE&warehouse=fake_warehouse&role=fake_role&insecure_mode=true', 'AIRFLOW_CONN_MY_AWS': 'aws:///?region_name=us-east-1&aws_access_key_id=AKIAIOSFODNN7EXAMPLE&aws_secret_access_key=wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY', 'AIRFLOW_CONN_MY_BIGQUERY': 'google-cloud-platform:///?project=test_project&key_path=%2Fdev%2Fnull', 'AIRFLOW_CONN_MY_SQLITE': 'sqlite://%2Ftmp%2Fpytest-of-runner%2Fpytest-0%2Ftest_airflow_plugin_v2_sqlite_0%2Fmy_sqlite.db', 'AIRFLOW_CONN_MY_TERADATA': 'teradata://fake_username:fake_password@fake_teradata_host/?tmode=ANSI', 'DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT': '30', 'AIRFLOW__DATAHUB__LOG_LEVEL': 'DEBUG', 'AIRFLOW__DATAHUB__DEBUG_EMITTER': 'True', 'SQLALCHEMY_SILENCE_UBER_WARNING': '1', 'AIRFLOW__DATAHUB__ENABLE_DATAJOB_LINEAGE': 'true', 'AIRFLOW__API__AUTH_BACKENDS': 'airflow.api.auth.backend.anonymous', 'AIRFLOW__OPENLINEAGE__TRANSPORT': '{"type": "console"}', 'AIRFLOW__EXECUTION_API__JWT_EXPIRATION_TIME': '300', 'AIRFLOW__API_AUTH__JWT_SECRET': 'test-secret-key-for-jwt-signing-in-tests', 'AIRFLOW__DATAHUB__PLATFORM_INSTANCE': 'myairflow'}, airflow_executable=PosixPath('.../airflow-plugin/venv/bin/airflow'), username='admin', password='X5993NhDHfBc5ykr', metadata_file=PosixPath('.../pytest-of-runner/pytest-0/test_airflow_plugin_v2_sqlite_0/datahub_metadata.json'), metadata_file2=PosixPath('.../pytest-of-runner/pytest-0/test_airflow_plugin_v2_sqlite_0/datahub_metadata_2.json'))
dag_id = 'sqlite_operator', require_success = True

    @tenacity.retry(
        reraise=True,
        wait=tenacity.wait_fixed(1),
        stop=tenacity.stop_after_delay(90),
        retry=tenacity.retry_if_exception_type(NotReadyError),
    )
    def _wait_for_dag_finish(
        airflow_instance: AirflowInstance, dag_id: str, require_success: bool
    ) -> None:
        print("Checking if DAG is finished")
        res = _make_api_request(
            airflow_instance.session,
            f"{airflow_instance.airflow_url}.../api/v2/dags/{dag_id}/dagRuns",
        )
    
        dag_runs = res.json()["dag_runs"]
        if not dag_runs:
            raise NotReadyError("No DAG runs found")
    
        dag_run = dag_runs[0]
        if dag_run["state"] == "failed":
            if require_success:
>               raise ValueError("DAG failed")
E               ValueError: DAG failed

tests/integration/test_plugin.py:143: ValueError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@maggiehays maggiehays added the needs-review Label for PRs that need review from a maintainer. label May 14, 2026
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from bebb0c1 to 824b5be Compare May 14, 2026 08:58
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from 824b5be to 54f3112 Compare May 14, 2026 10:51
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from 54f3112 to cb8837f Compare May 14, 2026 11:07
@puneetagarwal-datahub puneetagarwal-datahub changed the title Draft: feat(ingestion): per-connector CLI version matrix + resolution stamp … feat(ingestion): per-connector CLI version matrix + resolution stamp … May 14, 2026
@puneetagarwal-datahub puneetagarwal-datahub marked this pull request as draft May 14, 2026 14:45
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from cb8837f to d536d3d Compare May 14, 2026 14:48
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from d536d3d to 2543500 Compare May 14, 2026 15:03
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from 2543500 to 2e9dece Compare May 14, 2026 15:08
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from 2e9dece to 16b4f05 Compare May 14, 2026 15:33
@puneetagarwal-datahub puneetagarwal-datahub force-pushed the per-connector-cli-version-matrix branch from 16b4f05 to c8b2ca0 Compare May 14, 2026 15:42
Copy link
Copy Markdown
Collaborator

@david-leifker david-leifker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks reasonable. A few corner cases and gaps that need addressing.

* <p>The property name ends with "Token" so it is auto-redacted by the system-info properties
* collector (see {@code PropertiesCollectorConfigurationTest}).
*/
private String authToken;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leads me to ask how this will work in airgapped environments. I had assumed that the configuration was from GMS. We need to support systems with no outside access.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @david-leifker !

The matrix storage is pluggable and configurable. HttpMatrixSource is just the first implementation. AFIK the plan is to serve it from S3 in a follow-up iteration, by implementing a new matrix source.

Neither HTTP nor S3 may work for air-gapped environments, but that's just a matter of providing a matrix source that fits their requirements, whether fetching from GMS itself or somewhere else (e.g. a ConfigMap in k8s?).

Is air-gapped support a blocking requirement for this first iteration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @david-leifker we have already mentioned this gap in RFC for the same . https://www.notion.so/acryldata/RFC-Independent-version-deployment-of-connectors-by-customer-and-connector-type-74afc6a6427783508d768150ae08be46?source=copy_link

On a high level thought , We can plugin the matrix inside the codebase which is shipped , start a local server and server json via that or we can think of something else .

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as there is no regression in the current design, per source configuration with bundled and a common default configured via GMS's config endpoint, not a blocker.

cliVersionMatrix:
# Source backend: "http" (default) or "none". Setting "none" is an explicit kill-switch even
# when an http.url is configured. With "http" but no url, the factory wires a no-op source.
source: "${INGESTION_VERSION_MATRIX_SOURCE:http}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there is a way to source this matrix from GMS itself, while a centralized external source is useful for Saas, there are on-prem scenarios where an individual operator may want to have control of this matrix without relying on an external system.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're certainly the most interested in this feature, managing a fleet of instances. It's true, it can be useful even for smaller onprem setups (dev, pre, prod).

Either way, this ties into my comment here: #17436 (comment) — the key point being that air-gapped support doesn't need to be solved now, as long as we know it can be addressed later with different matrix source implementations.

Copy link
Copy Markdown
Collaborator

@david-leifker david-leifker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving, I would have liked to see a comprehensive implementation with GMS' config endpoint being able to serve the matrix configuration, passed through via environment variable containing a json configuration option. It would have been a simple and straightforward addition to serving it from a remote location.

@puneetagarwal-datahub
Copy link
Copy Markdown
Contributor Author

Approving, I would have liked to see a comprehensive implementation with GMS' config endpoint being able to serve the matrix configuration, passed through via environment variable containing a json configuration option. It would have been a simple and straightforward addition to serving it from a remote location.

Good call — the source layer is pluggable by design .We kept the HTTP source as the default because the primary goal is updating per-connector versions fleet-wide without redeploying GMS ( edit the matrix once, all instances refresh in ~10min). Inline can't give that, but it's a great fit for self-contained/air-gapped deployments and is cheap to add alongside.

I will work on it once this is shipped and working .

puneetagarwal-datahub and others added 21 commits June 4, 2026 18:33
…+ test-connection path

Adds per-connector CLI version resolution so a Snowflake fix can ship
without forcing a bump on every other connector. A JSON matrix hosted at
INGESTION_VERSION_MATRIX_URL maps server release x connector -> version,
with optional per-deployment cohort allowlists for canary rollouts.

Matrix schema:

  {
    "1.3.1.4": {
      "snowflake": {
        "_default": "1.3.1.4",
        "cohorts": [
          { "version": "1.3.1.5",
            "deployments": ["deployment-slug-1", "deployment-slug-2"] }
        ]
      }
    }
  }

`deployments` entries are matched against `ingestion.deploymentId`,
which is sourced from the existing DATAHUB_EXECUTOR_CUSTOMER_ID env var
that the Acryl Cloud Helm chart already injects from the K8s namespace.
The leading underscore on `_default` marks it as a sentinel key so it
can't collide with a connector name.

Resolution priority (top wins):
  1. Per-source explicit `config.version` (unchanged)
  2. matrix[serverVersion][connectorType].cohorts - first cohort whose
     `deployments` list contains this deployment's id
  3. matrix[serverVersion][connectorType]._default
  4. defaultCliVersion from application.yaml (unchanged fallback)

Storage is pluggable. Matrix consumption (IngestionVersionMatrixService)
is decoupled from where the matrix lives via a MatrixSource interface:

  - HttpUrlMatrixSource - periodic HTTP GET (the URL-backed default)
  - NoOpMatrixSource    - empty matrix, wired when no URL is configured
                          so the consumer never needs null checks

HttpUrlMatrixSource optionally sends an `Authorization` request header
when INGESTION_VERSION_MATRIX_AUTH_TOKEN is set, so the matrix can live
behind authentication (e.g. a private GitHub repo's raw URL). Format is
verbatim - "token ghp_xxx" for a GitHub PAT, "Bearer ey..." for an OIDC
token. Property name ends with "Token" so PropertiesCollector's
keyword-based redaction catches it in the system-info endpoint. Unset
by default; public URLs work unchanged.

Future backends (GMS aspect on a globalSettings entity, AppConfig/Consul/
etcd, signed S3) just implement MatrixSource - the resolver and the
resolution stamp don't change.

Three execution entry points are covered:
  - CreateIngestionExecutionRequestResolver (manual triggers)
  - IngestionScheduler.ExecutionRequestRunnable (scheduled triggers)
  - CreateTestConnectionRequestResolver (test-connection from UI)

Each one stamps a structured CliVersionResolution record on the
resulting ExecutionRequestInput aspect. The stamp captures provenance
only - which tier of the chain produced the version, and the GMS server
version at write time. The CLI version string itself remains in
`args.version` (the wire-format field consumed by the executor) -
unchanged contract, no duplication:

  args: { "version": "1.3.1.5", ... }
  cliVersionResolution: {
    source: MATRIX_COHORT,
    serverVersion: "1.3.1.4"
  }

`serverVersion` is populated on every tier (PER_SOURCE, MATRIX_*,
WORKSPACE_DEFAULT) so cross-version analytics on execution requests
work without a separate per-aspect GMS version stamp.

Test connections also now honor defaultCliVersion when no explicit
version is supplied - previously they silently omitted version,
which caused the executor to use whatever bundled CLI it shipped
with rather than the configured default.

Feature is off by default - when INGESTION_VERSION_MATRIX_URL is unset
the factory wires a NoOpMatrixSource and every resolveVersion() returns
empty, so existing behavior is preserved exactly. In single-tenant
deployments without a deploymentId set, cohort matching never fires
and only the per-connector `_default` from the matrix applies.

Also: treat empty version strings as unset (bootstrap YAML can render
"" for the version field, and an empty value forwarded to the executor
silently falls back to the bundled CLI rather than the configured
default).

Config:
  INGESTION_VERSION_MATRIX_URL              (default: empty / disabled)
  INGESTION_VERSION_MATRIX_REFRESH_SECONDS  (default: 600)
  INGESTION_VERSION_MATRIX_AUTH_TOKEN       (default: empty / no auth)
  DATAHUB_EXECUTOR_CUSTOMER_ID              (existing; Acryl Cloud Helm
                                             chart injects this from the
                                             K8s namespace — sourced
                                             into `ingestion.deploymentId`)
The aspect stamps what GMS resolved, not what the executor actually
installed. Calls out the three cases where these diverge — extra_pip
transitive dep, no-acryl opt-out, bundled image short-circuit — so
forensics queries don't conflate intent with install outcome.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The rebase onto master (Jackson 2.21.1 → 2.21.3) inadvertently dropped
caffeine, jackson-dataformat-smile, jsr305, and errorprone 2.3.3 from
the metadata-service:configuration lockfile, plus narrowed the errorprone
2.47.0 scope. CI surfaced these as 'Resolved … which is not part of the
dependency lock state' failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Addresses naming concerns raised in PR review:
- `Matrix` was too generic — clearly an ingestion CLI version matrix
- `CliVersionProvenance` used a suffix uncommon in this codebase
- `CliVersionSource` enum values `PER_SOURCE` / `WORKSPACE_DEFAULT` read awkwardly

Class renames (metadata-service/configuration/.../ingestion/):
  Matrix                        -> IngestionCliVersionMatrix
  MatrixSource                  -> IngestionCliVersionMatrixSource
  NoOpMatrixSource              -> NoOpIngestionCliVersionMatrixSource
  HttpUrlMatrixSource           -> HttpUrlIngestionCliVersionMatrixSource
  IngestionVersionMatrixService -> IngestionCliVersionMatrixService

Factory + bean renames:
  IngestionVersionMatrixServiceFactory -> IngestionCliVersionMatrixServiceFactory
  bean "matrixSource"                  -> "ingestionCliVersionMatrixSource"
  bean "ingestionVersionMatrixService" -> "ingestionCliVersionMatrixService"

PDL changes (metadata-models/.../execution/):
  CliVersionProvenance         -> CliVersionAudit (record + .pdl file)
  ExecutionRequestInput field:
    cliVersionProvenance       -> cliVersionAudit
  CliVersionSource enum values:
    PER_SOURCE                 -> SOURCE_CONFIG_OVERRIDE
    WORKSPACE_DEFAULT          -> APPLICATION_DEFAULT
    (MATRIX_COHORT, MATRIX_CONNECTOR_DEFAULT unchanged)

All three call sites updated to use renamed types + setter
(setCliVersionAudit): CreateIngestionExecutionRequestResolver,
CreateTestConnectionRequestResolver, IngestionScheduler. Tests + factories
updated accordingly. End-to-end validated against a running GMS: all 4
resolution tiers + test-connection + scheduled trigger emit the new
`cliVersionAudit` field with new enum values.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Addresses PR review feedback that `Map<String, Map<String, ConnectorEntry>>`
read as opaque at lookup sites — the meaning of either string key was only
inferable from variable names and Javadoc. Wrapping the inner map in a
named POJO makes call sites self-documenting:

  Before:
    Map<String, ConnectorEntry> serverEntry = matrix.getEntriesForServer(v);
    ConnectorEntry entry = serverEntry.get(connectorType);

  After:
    ServerEntry serverEntry = matrix.getEntriesForServer(v);
    ConnectorEntry entry = serverEntry.getConnectorEntry(connectorType);

`ServerEntry` also gives a single place to add server-level behavior
(e.g. `getConnectorTypes()` for diagnostics) without changing every signature
that used to traffic in `Map<String, ConnectorEntry>`.

Scope:
  - New file: ServerEntry.java
  - IngestionCliVersionMatrix: inner Map value type → ServerEntry
  - IngestionCliVersionMatrixService: lookup uses getConnectorEntry(...)
  - HttpUrlIngestionCliVersionMatrixSource.parseMatrix: builds ServerEntry
    instances (its constructor takes over the unmodifiableMap wrapping)
  - HttpUrlIngestionCliVersionMatrixSourceValidationTest: call-site updates;
    one containsKey assertion replaced with assertNull(getConnectorEntry)

Declarative Jackson deserialization (the reviewer's secondary suggestion)
was considered and deferred: the current manual tree-API parser keeps
fail-closed file-level validation alongside fail-open entry-level skipping
with structured path-aware WARN logs, behavior that's awkward to preserve
with @JsonDeserialize.

All 90 configuration unit tests still pass; end-to-end re-verified against
a running GMS (3 demo scripts: 4-tier ladder, full happy path, phase-2
edge cases including scheduled trigger and executor observation).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… source discriminator

Addresses PR review feedback that the HTTP-specific matrix config was
hardcoded directly under `ingestion:` and the backend choice was implicit
("HTTP if URL set, NoOp otherwise"). Adding a second backend would have
required piling more flat properties on `IngestionConfiguration` plus
arguing about precedence between them.

New structure in application.yaml:

  ingestion:
    cliVersionMatrix:
      source: "${INGESTION_VERSION_MATRIX_SOURCE:}"   # "http" | "none" | unset
      http:
        url: "${INGESTION_VERSION_MATRIX_URL:}"
        refreshSeconds: ${INGESTION_VERSION_MATRIX_REFRESH_SECONDS:600}
        authToken: "${INGESTION_VERSION_MATRIX_AUTH_TOKEN:}"

Adding a future backend (GMS aspect, AppConfig, …) becomes purely additive:
a new nested block under `cliVersionMatrix`, a new case in the factory's
source switch — no changes to HTTP's config or precedence logic.

Backward-compat: when `source` is unset, the factory infers from
`http.url` presence. Pre-discriminator deployments that only set the
INGESTION_VERSION_MATRIX_URL env var keep working unchanged. Explicit
`source: none` is a kill-switch — useful for ops to disable the feature
without unsetting URL env vars. The discriminator is case-insensitive.

Changes:
  - New POJO: HttpMatrixSourceConfiguration (url, refreshSeconds, authToken)
  - New POJO: CliVersionMatrixConfiguration (source + http)
  - IngestionConfiguration: 3 flat versionMatrix* fields -> cliVersionMatrix
  - application.yaml: restructured (env var names preserved)
  - IngestionCliVersionMatrixServiceFactory: reads nested config + applies
    discriminator with URL-presence inference fallback
  - IngestionCliVersionMatrixServiceFactoryTest: rewrote to cover all
    paths (unset/http/none, URL-presence inference, case-insensitivity);
    7 tests up from 4
  - PropertiesCollectorConfigurationTest: property paths updated for the
    redaction allowlist + visible-properties list. authToken still
    auto-redacted because the leaf name ends with "Token".

End-to-end verified against a running GMS using existing env vars
(no operator change required): matrix fetched + cached, all 4 resolution
tiers + test-connection + scheduled trigger green; system-info reports
the new nested keys with authToken redacted.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…rdown

Addresses PR review feedback that HttpUrlIngestionCliVersionMatrixSource
created a ScheduledExecutorService in its constructor without storing the
handle, leaving no way to stop the background refresh thread on context
teardown. The default thread factory creates non-daemon threads, so a
leaked executor keeps the JVM from exiting cleanly.

In steady-state production this is invisible (one executor for the JVM
lifetime), but it bites in:
  - Spring DevTools dev-loop: every restart leaks one refresh thread
  - Integration tests with multiple Spring contexts: thread accumulation,
    confusing log noise from dead-context refreshes
  - K8s pod shutdown: non-daemon thread blocks clean JVM exit, forcing
    SIGKILL after terminationGracePeriodSeconds rather than clean drain

Fix:
  - Promote `executor` to a field so the bean can manage its lifecycle
  - @PreDestroy public void shutdown() — graceful drain pattern matching
    the project convention (see KafkaTraceReaderFactory.shutdown()):
      executor.shutdown()
      -> awaitTermination(5s)
      -> executor.shutdownNow() on timeout or interruption
      -> propagate Thread.currentThread().interrupt() on InterruptedException
  - Uses jakarta.annotation.PreDestroy (matches existing usage in
    SystemInfoService, KafkaTraceReaderFactory, etc.)

Test added: testShutdown_stopsBackgroundRefresh — spins up an in-process
HTTP server, configures a 1-second refresh, waits for the first fetch,
calls shutdown(), sleeps 2.5s, asserts the request counter did not move.
Proves the scheduled executor stopped firing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Addresses PR review feedback that the matrix refresh thread was carrying
the default Executors.defaultThreadFactory() name `pool-N-thread-1`, which
tells an operator triaging a hung pod nothing about what the thread does.

Plumbing matches the project convention in UpdateIndicesUpgradeStrategy:
construct the executor with a ThreadFactory that names the thread and
flips the daemon bit:

  Executors.newSingleThreadScheduledExecutor(r -> {
    Thread t = new Thread(r, "ingestion-cli-version-matrix-refresh");
    t.setDaemon(true);
    return t;
  });

Daemon flag is belt-and-suspenders with the @PreDestroy shutdown added
in the previous commit — if the hook somehow never fires (forced kill,
container-runtime quirks), the JVM can still exit cleanly because the
refresh thread is no longer counted toward the "block JVM exit" set.
@PreDestroy remains the primary shutdown path.

Test added: testRefreshThreadIsNamedAndDaemon — after first fetch, scans
Thread.getAllStackTraces() for the named thread, asserts it exists and
isDaemon() is true. Cleans up via shutdown() so the thread doesn't
linger into the next test.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…l sites

Addresses PR review feedback that CliVersionResolutionHelper should stay
logging-free, with structured debug/warn logs added at the resolver layer
where the resolution is actually consumed.

Three OSS call sites now emit identically-shaped log lines:

  Resolved ingestion CLI version (manual trigger):     tier=... version=...
  Resolved ingestion CLI version (test-connection):    tier=... version=...
  Resolved ingestion CLI version (scheduled trigger):  tier=... version=...

Each line carries tier (cliVersionAudit.source), version, serverVersion,
connector type, and an identifier (ingestion source URN or execution
request URN). DEBUG is the normal level; a WARN fires only when the
resolved version is empty -- meaning every tier including defaultCliVersion
fell through and the executor will silently use its bundled CLI.

Implementation: rather than duplicate the helper across three callers,
introduced CliVersionResolutionLogger in metadata-service/configuration
(same module as CliVersionResolutionHelper). Callers pass their own
@Slf4j-generated logger so log entries appear under the calling class,
preserving operators' existing class-name-based log filters. Constants
expose the trigger labels (TRIGGER_MANUAL, TRIGGER_TEST_CONNECTION,
TRIGGER_SCHEDULED) and identifier keys (IDENTIFIER_INGESTION_SOURCE,
IDENTIFIER_EXECUTION_REQUEST) so the call sites read declaratively.

Changes:
  - New: CliVersionResolutionLogger
  - CreateIngestionExecutionRequestResolver: @slf4j + one log call
  - CreateTestConnectionRequestResolver: extracted connectorType local +
    one log call (avoids calling extractSourceType twice)
  - IngestionScheduler: one log call

End-to-end verified against a running GMS: all three trigger labels
appear in the GMS log under their respective resolver classes with the
correct tier/version/connector/identifier values for every scenario from
the demo scripts (4-tier ladder, test-connection, scheduled cron).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Addresses two review suggestions on PR #17436:

(#9) Linear / PR ID references in code go stale and force readers off-site
to recover the context. Replaced six `#17471` references across the matrix
helper, two GraphQL resolvers, the scheduler, and two test files with
self-contained explanations of WHY each whitespace/empty normalization
exists -- namely that bootstrap YAML rendering produces
`version: "   "` (3 spaces) when an ingestion source has no version pin,
and forwarding that blank to the executor would silently use its bundled
CLI instead of defaultCliVersion.

(#10) Expanded the version-regex doc comment in
HttpUrlIngestionCliVersionMatrixSource to state explicitly that the
pattern is NOT a PEP 440 validator and to list five concrete accepted
shapes (standard release, rc, post, dev with epoch, local version
identifier) covering everything DataHub publishes to PyPI today.
Operators who fat-finger the matrix file get a clean WARN at fetch time
rather than a cryptic pip error minutes later; pip remains the source of
truth for whether a version actually exists.

All doc-comment changes, no logic touched. Tests confirm no behavior
change: metadata-service:configuration:test, the two resolver test
suites, and ingestion-scheduler:test all green.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…esent-tense Javadoc

Addresses three more items from the latest review pass on PR #17436:

(a) Rename the two remaining types we forgot in the first naming sweep:
      CliVersionResolutionHelper -> IngestionCliVersionResolutionHelper
      CliVersionResolutionLogger -> IngestionCliVersionResolutionLogger
    Everything in the matrix surface area now carries the `Ingestion` prefix
    consistently.

(c) Strip development-narrative phrasing from Javadoc. The reviewer called out
    ServerEntry's "vs the previous" framing; the same shape appeared in
    several other files I added. Rewrote each as a present-tense description
    of the current design, dropping "Lives separately ... on purpose", "Held
    as a field so ...", "replaces an earlier set of flat keys", "without an
    env change", and similar phrases that read as a changelog rather than
    docs.

(d) Simplify the cliVersionMatrix.source discriminator. The original "URL-
    presence auto-inference for backward compat" branch was complexity for
    zero benefit -- there are no pre-discriminator deployments.
    application.yaml now defaults source to "http"; the factory does:

      if source == "none"     -> NoOp                 (explicit kill-switch)
      else if http.url empty  -> NoOp                 (OSS default)
      else                    -> HttpUrlIngestionCliVersionMatrixSource

    This is the same observable behavior with one fewer branch and no
    "compatibility" framing. The dropped resolveSource(...) helper, the
    URL-presence inference, and the related Javadoc all came out.

Tests (all 77 across the 8 affected suites still green):
  - IngestionCliVersionMatrixServiceFactoryTest rewritten around the new
    contract: "URL controls HTTP vs NoOp when source != none" + "source=none
    is a kill-switch". noneIsCaseInsensitive added in place of the previous
    httpIsCaseInsensitive (only `none` short-circuits now, anything else
    falls through to URL-presence).
  - Inner-class refs (IngestionCliVersionResolutionHelper.Result) and
    callers (CreateIngestionExecutionRequestResolver,
    CreateTestConnectionRequestResolver, IngestionScheduler) updated by
    rename.

End-to-end verified against a running GMS: all 4 resolution tiers + test-
connection + scheduled trigger + executor consumption green; system-info
now shows ingestion.cliVersionMatrix.source = "http" (the new default).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…x fetcher)

Partial response to PR review #6 (two JSON libraries in the matrix surface
area).

extractSourceType was the only new code my PR added that used org.json. The
matrix fetcher (HttpUrlIngestionCliVersionMatrixSource) uses Jackson — having
two libraries doing the same parse-a-small-JSON job in adjacent files added
maintenance cognitive load without any runtime benefit. Switch
extractSourceType to Jackson so all new code emitted by this PR uses a
single library.

The body shrinks slightly because Jackson's chained .path() returns a
missing-node when a segment is absent, so the explicit has() checks for
`source` and `type` collapse into a single isTextual() check at the leaf.

Out of scope: CreateIngestionExecutionRequestResolver.injectRunId still
uses org.json. That method is from 2022 (commit 57b7ade, John Joyce)
and predates this PR. Its conversion is unrelated tech debt — not bundled
to keep this PR's review diff focused on what this PR actually adds.

Behaviour unchanged: malformed JSON, missing source, missing source.type,
non-textual source.type, and empty-string source.type all still produce
the same null + DEBUG-log outcome. 12 tests in
CreateTestConnectionRequestResolverTest cover these paths and stay green.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…/HTTPS only)

Addresses PR review #7 (SSRF / URLConnection accepts any protocol). The old
implementation used java.net.URLConnection, which transparently handles
file://, jar://, ftp://, and other schemes. If a future code path lets a
non-admin influence the matrix URL (e.g. a GmsAspect-backed source), the URL
would be reachable as an SSRF primitive against the GMS pod's filesystem.
Not exploitable today (the URL is operator-controlled via env var), but
fixing it now means the future backend is born safe.

Switched HttpUrlIngestionCliVersionMatrixSource.refresh() to
java.net.http.HttpClient (Java 11+). HttpClient supports HTTP and HTTPS only
-- any other scheme throws IllegalArgumentException at send-time, which the
existing catch (Exception) branch turns into the same "Failed to refresh ...
Retaining last known matrix" WARN that other fetch failures produce. The
connect and per-request timeouts (10s, unchanged) carry over. Added an
explicit non-2xx response check because URLConnection used to throw
IOException on non-2xx from getInputStream() whereas HttpClient does not.
Also added an InterruptedException handler that restores Thread.interrupt()
per project convention.

Test infrastructure migrated from temp-file + file:// URIs to embedded
com.sun.net.httpserver.HttpServer fixtures, since HttpClient rejects
file://:

  - IngestionCliVersionMatrixServiceTest: new startMatrixServer(json) helper
    + @AfterMethod tearDown. Six sites that previously called
    Files.createTempFile / tmp.toUri() now go through the helper. Dropped
    the Files/Path imports.
  - CreateIngestionExecutionRequestResolverTest.matrixServiceForConnector:
    same migration with a JVM shutdown hook to stop the per-call server.

Behaviour preserved end-to-end: 75 unit tests across 7 suites pass; the
three demo scripts (4-tier ladder, full happy path, phase 2 edge cases) run
green against a live GMS fetching from an HTTPS gist. Internal matrix-content
parsing, auth-header forwarding, retain-last-known-good on failure, and
the @PreDestroy shutdown all unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The ingestionCliVersionMatrixService is always wired as a Spring bean — a
NoOpIngestionCliVersionMatrixSource-backed instance when no matrix backend is
configured — so it is never null in production. Stop modelling it as optional:

- Remove the telescoping 2-arg constructors from CreateTestConnectionRequest-
  Resolver and CreateIngestionExecutionRequestResolver; only the 3-arg form was
  ever used in GmsGraphQLEngine. Both now Objects.requireNonNull the service.
- Drop the dead `service != null ? getServerVersion() : null` checks in the
  resolvers and IngestionScheduler.
- IngestionCliVersionResolutionHelper.resolve now takes a @nonnull service and
  reads serverVersion from it directly, instead of callers passing both the
  service and a value pulled from it.
- GmsGraphQLEngine requireNonNulls the field to match its @Autowired wiring.

Tests updated to construct resolvers with a NoOp-backed service for the
matrix-disabled case. No behavior change: OSS still falls through to
defaultCliVersion via the empty NoOp matrix.

Addresses platform review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…shared ObjectMapper

CreateTestConnectionRequestResolver parsed the recipe's source.type with a
private static `new ObjectMapper()` and a resolver-local helper method. Move
that logic to IngestionUtils.extractSourceType(ObjectMapper, String) and pass
the OperationContext's shared mapper instead of allocating a new one.

- IngestionUtils.extractSourceType: best-effort source.type extraction,
  ObjectMapper supplied by the caller.
- Resolver calls IngestionUtils.extractSourceType(
    context.getOperationContext().getObjectMapper(), recipe); drops the static
  MAPPER, the local method, and the Jackson imports.
- Test moved to IngestionUtilsTest; IngestTestUtils.getMockAllowContext now
  stubs getObjectMapper() so resolver tests exercise the op-context mapper.

Addresses review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…CLI version

DataHubIngestionSourceConfig.version is an optional PDL field, so the generated
getVersion() already returns null when unset. The `hasVersion() ? getVersion()
: null` ternary in the manual-execution resolver and the scheduler was redundant
— pass getConfig().getVersion() straight to the resolution helper, which already
normalizes null / empty / whitespace-only to "unset".

No behavior change; matches how the test-connection resolver passes its raw
input version.

Addresses review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Singleton is the default Spring bean scope, so the explicit @scope("singleton")
on both ingestionCliVersionMatrixSource and ingestionCliVersionMatrixService
beans (and the Scope import) added nothing. Remove for consistency with the
rest of the codebase.

Addresses review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ix validation

The permissive version pattern (^[\w.+!-]+$) already accepts the special
non-PyPI sentinels the ingestion executor recognizes (e.g. "bundled",
"no-acryl-datahub") — they are alphanumeric/hyphenated so they pass. Make that
intent explicit in the Javadoc (noting the list is non-exhaustive by design)
and add a regression test (permissiveVersionPatternAcceptsSpecialSentinels)
locking in that these sentinels load rather than being rejected as invalid.

Addresses review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
resolveIngestionCliVersion only did blank->default fallback with no matrix
lookup. Now that all three call sites (manual, scheduled, test-connection)
route through IngestionCliVersionResolutionHelper.resolve — which folds that
normalization into the matrix-aware resolution — the old util has no remaining
callers. Remove it (and its tests) so there is a single CLI-version resolution
path and no method that silently bypasses the matrix.

Addresses review feedback on PR #17436.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…trix fetch

Lets the matrix URL point at the GitHub contents API
(api.github.com/repos/<org>/<repo>/contents/<path>?ref=<branch>), the
only authenticated way to read a file from a private/internal GitHub
repo — raw.githubusercontent.com does not honor the Authorization header
for those. With this Accept the contents API returns the raw file body
instead of the base64-wrapped JSON envelope (which would otherwise parse
to an empty matrix). Plain file hosts (raw URLs, gists, S3, CDNs) ignore
the unknown Accept and still return the file, so it's safe for public
URLs too — backward compatible with the existing gist setup.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution PR or Issue raised by member(s) of DataHub Community devops PR or Issue related to DataHub backend & deployment merge-pending-ci A PR that has passed review and should be merged once CI is green. product PR or Issue related to the DataHub UI/UX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants