Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7b9096d
feat(ingestion): per-connector CLI version matrix + resolution stamp …
puneetagarwal-datahub May 13, 2026
3f26512
docs(ingestion): note CliVersionProvenance is GMS-side intent
puneetagarwal-datahub May 19, 2026
26da88e
fix(deps): restore transitive lockfile entries dropped during rebase
puneetagarwal-datahub May 20, 2026
bc0762b
refactor(ingestion): rename CLI version matrix types per review feedback
puneetagarwal-datahub May 28, 2026
5d9bd0a
refactor(ingestion): wrap inner matrix map in a named ServerEntry POJO
puneetagarwal-datahub May 28, 2026
a70e4d8
refactor(ingestion): nest matrix config under cliVersionMatrix with a…
puneetagarwal-datahub May 28, 2026
5aa149a
refactor(ingestion): shut down matrix refresh scheduler on Spring tea…
puneetagarwal-datahub May 28, 2026
def1f4d
refactor(ingestion): name the matrix refresh thread and make it a daemon
puneetagarwal-datahub May 28, 2026
81f7ce7
refactor(ingestion): structured CLI version resolution logging at cal…
puneetagarwal-datahub May 28, 2026
4fffad5
docs(ingestion): inline PR refs + document version regex shape
puneetagarwal-datahub May 28, 2026
5467528
refactor(ingestion): finish naming, simplify source discriminator, pr…
puneetagarwal-datahub May 29, 2026
8e12e5e
refactor(ingestion): use Jackson for extractSourceType (matches matri…
puneetagarwal-datahub May 29, 2026
da2a8af
refactor(ingestion): fetch matrix with java.net.http.HttpClient (HTTP…
puneetagarwal-datahub May 29, 2026
eb57b99
refactor(ingestion): treat CLI version matrix service as a required bean
puneetagarwal-datahub May 31, 2026
1c029e3
refactor(ingestion): move extractSourceType to IngestionUtils, reuse …
puneetagarwal-datahub May 31, 2026
5b011a8
refactor(ingestion): drop redundant hasVersion() guard on per-source …
puneetagarwal-datahub May 31, 2026
623c938
refactor(ingestion): drop redundant @Scope("singleton") on matrix beans
puneetagarwal-datahub May 31, 2026
1844ef2
docs(ingestion): document non-PyPI version sentinels accepted by matr…
puneetagarwal-datahub May 31, 2026
96f7920
refactor(ingestion): remove superseded resolveIngestionCliVersion util
puneetagarwal-datahub May 31, 2026
456e900
feat(ingestion-matrix): send Accept: application/vnd.github.raw on ma…
puneetagarwal-datahub Jun 4, 2026
711cc3d
Merge branch 'master' into per-connector-cli-version-matrix
puneetagarwal-datahub Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.ingestion.IngestionCliVersionMatrixService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
Expand Down Expand Up @@ -453,6 +454,7 @@ public class GmsGraphQLEngine {
private final FeatureFlags featureFlags;

private final IngestionConfiguration ingestionConfiguration;
private final IngestionCliVersionMatrixService ingestionCliVersionMatrixService;
private final AuthenticationConfiguration authenticationConfiguration;
private final AuthorizationConfiguration authorizationConfiguration;
private final VisualConfiguration visualConfiguration;
Expand Down Expand Up @@ -596,6 +598,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
this.ingestionCliVersionMatrixService =
Objects.requireNonNull(args.ingestionCliVersionMatrixService);
this.authenticationConfiguration = Objects.requireNonNull(args.authenticationConfiguration);
this.authorizationConfiguration = Objects.requireNonNull(args.authorizationConfiguration);
this.visualConfiguration = args.visualConfiguration;
Expand Down Expand Up @@ -1368,14 +1372,18 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher(
"createIngestionExecutionRequest",
new CreateIngestionExecutionRequestResolver(
this.entityClient, this.ingestionConfiguration))
this.entityClient,
this.ingestionConfiguration,
this.ingestionCliVersionMatrixService))
.dataFetcher(
"cancelIngestionExecutionRequest",
new CancelIngestionExecutionRequestResolver(this.entityClient))
.dataFetcher(
"createTestConnectionRequest",
new CreateTestConnectionRequestResolver(
this.entityClient, this.ingestionConfiguration))
this.entityClient,
this.ingestionConfiguration,
this.ingestionCliVersionMatrixService))
.dataFetcher(
"upsertCustomAssertion", new UpsertCustomAssertionResolver(assertionService))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.ingestion.IngestionCliVersionMatrixService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.search.SemanticSearchService;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class GmsGraphQLEngineArgs {
SecretService secretService;
NativeUserService nativeUserService;
IngestionConfiguration ingestionConfiguration;
IngestionCliVersionMatrixService ingestionCliVersionMatrixService;
AuthenticationConfiguration authenticationConfiguration;
AuthorizationConfiguration authorizationConfiguration;
GitVersion gitVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.linkedin.execution.ExecutionRequestSource;
import com.linkedin.ingestion.DataHubIngestionSourceInfo;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.ingestion.IngestionCliVersionMatrixService;
import com.linkedin.metadata.ingestion.IngestionCliVersionResolutionHelper;
import com.linkedin.metadata.ingestion.IngestionCliVersionResolutionLogger;
import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.IngestionUtils;
Expand All @@ -31,12 +34,15 @@
import graphql.schema.DataFetchingEnvironment;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;

/** Creates an on-demand ingestion execution request. */
@Slf4j
public class CreateIngestionExecutionRequestResolver
implements DataFetcher<CompletableFuture<String>> {

Expand All @@ -48,11 +54,16 @@ public class CreateIngestionExecutionRequestResolver

private final EntityClient _entityClient;
private final IngestionConfiguration _ingestionConfiguration;
private final IngestionCliVersionMatrixService _versionMatrixService;

public CreateIngestionExecutionRequestResolver(
final EntityClient entityClient, final IngestionConfiguration ingestionConfiguration) {
final EntityClient entityClient,
final IngestionConfiguration ingestionConfiguration,
final IngestionCliVersionMatrixService versionMatrixService) {
_entityClient = entityClient;
_ingestionConfiguration = ingestionConfiguration;
// Always a wired Spring bean (NoOp-backed when no matrix backend is configured), never null.
_versionMatrixService = Objects.requireNonNull(versionMatrixService);
}

@Override
Expand Down Expand Up @@ -122,11 +133,25 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
recipe = injectRunId(recipe, executionRequestUrn.toString());
recipe = IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn.toString());
arguments.put(RECIPE_ARG_NAME, recipe);
arguments.put(
VERSION_ARG_NAME,
IngestionUtils.resolveIngestionCliVersion(
// getVersion() returns null for an unset optional field, so no hasVersion() guard is
// needed. The helper normalizes null / empty / whitespace-only versions (bootstrap
// YAML can render `version: "{{ config.version }}"` as 3 spaces) to "unset", falling
// through to the matrix / application default instead of pinning the bundled CLI.
final IngestionCliVersionResolutionHelper.Result resolution =
IngestionCliVersionResolutionHelper.resolve(
ingestionSourceInfo.getConfig().getVersion(),
_ingestionConfiguration.getDefaultCliVersion()));
ingestionSourceInfo.getType(),
_versionMatrixService,
_ingestionConfiguration.getDefaultCliVersion());
arguments.put(VERSION_ARG_NAME, resolution.getVersion());
execInput.setCliVersionAudit(resolution.getStamp());
IngestionCliVersionResolutionLogger.log(
log,
IngestionCliVersionResolutionLogger.TRIGGER_MANUAL,
resolution,
ingestionSourceInfo.getType(),
IngestionCliVersionResolutionLogger.IDENTIFIER_INGESTION_SOURCE,
ingestionSourceUrn.toString());
String debugMode = "false";
if (ingestionSourceInfo.getConfig().hasDebugMode()) {
debugMode = ingestionSourceInfo.getConfig().isDebugMode() ? "true" : "false";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import com.linkedin.execution.ExecutionRequestInput;
import com.linkedin.execution.ExecutionRequestSource;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.ingestion.IngestionCliVersionMatrixService;
import com.linkedin.metadata.ingestion.IngestionCliVersionResolutionHelper;
import com.linkedin.metadata.ingestion.IngestionCliVersionResolutionLogger;
import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.IngestionUtils;
Expand All @@ -24,10 +27,31 @@
import graphql.schema.DataFetchingEnvironment;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

/** Creates an on-demand ingestion execution request. */
/**
* Creates an on-demand "test connection" ingestion execution request.
*
* <p>Version resolution priority (top wins):
*
* <ol>
* <li>{@code input.version} — explicit per-request override (existing behavior)
* <li>{@code matrix[serverVersion][source.type]} — connector-specific version pin from {@link
* IngestionCliVersionMatrixService} when enabled
* <li>{@code matrix[serverVersion][source.type]._default}
* <li>{@link IngestionConfiguration#getDefaultCliVersion()} — application-wide fallback
* </ol>
*
* <p>Prior to this change the test-connection path silently omitted {@code version} when the input
* did not provide one, causing the executor to fall back to whatever bundled default it shipped
* with — different from the path that real (non-test) executions take. The {@code
* defaultCliVersion} fallback below closes that gap; the matrix lookup brings test connections onto
* the same per-connector-pin behavior real executions get.
*/
@Slf4j
public class CreateTestConnectionRequestResolver implements DataFetcher<CompletableFuture<String>> {

private static final String TEST_CONNECTION_TASK_NAME = "TEST_CONNECTION";
Expand All @@ -38,11 +62,16 @@ public class CreateTestConnectionRequestResolver implements DataFetcher<Completa

private final EntityClient _entityClient;
private final IngestionConfiguration _ingestionConfiguration;
private final IngestionCliVersionMatrixService _versionMatrixService;

public CreateTestConnectionRequestResolver(
final EntityClient entityClient, final IngestionConfiguration ingestionConfiguration) {
final EntityClient entityClient,
final IngestionConfiguration ingestionConfiguration,
final IngestionCliVersionMatrixService versionMatrixService) {
_entityClient = entityClient;
_ingestionConfiguration = ingestionConfiguration;
// Always a wired Spring bean (NoOp-backed when no matrix backend is configured), never null.
_versionMatrixService = Objects.requireNonNull(versionMatrixService);
}

@Override
Expand Down Expand Up @@ -80,18 +109,32 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
RECIPE_ARG_NAME,
IngestionUtils.injectPipelineName(
input.getRecipe(), executionRequestUrn.toString()));
// Mirror the manual-ingestion path (CreateIngestionExecutionRequestResolver) which
// routes the same call through IngestionUtils.resolveIngestionCliVersion. Without
// this, a test-connection request with no input.version (or a blank one) silently
// omits args.version, causing the executor to fall back to its bundled CLI version
// rather than the configured defaultCliVersion. That divergence makes test
// connections run on a different CLI than the actual ingestion will use — hiding
// compatibility issues that surface in production.
arguments.put(
VERSION_ARG_NAME,
IngestionUtils.resolveIngestionCliVersion(
input.getVersion(), _ingestionConfiguration.getDefaultCliVersion()));
// input.getVersion() may be null, empty, or whitespace-only (UI forms can submit any
// of these — an unfilled "version" field commonly renders as a 3-space string). The
// helper normalizes all three to "unset" so resolution falls through to the matrix /
// application default; without that normalization the blank would forward verbatim to
// the executor and silently pin to its bundled CLI.
final String connectorType =
IngestionUtils.extractSourceType(
context.getOperationContext().getObjectMapper(), input.getRecipe());
final IngestionCliVersionResolutionHelper.Result resolution =
IngestionCliVersionResolutionHelper.resolve(
input.getVersion(),
connectorType,
_versionMatrixService,
_ingestionConfiguration.getDefaultCliVersion());
if (resolution.getVersion() != null && !resolution.getVersion().isEmpty()) {
arguments.put(VERSION_ARG_NAME, resolution.getVersion());
}
execInput.setArgs(new StringMap(arguments));
execInput.setCliVersionAudit(resolution.getStamp());
IngestionCliVersionResolutionLogger.log(
log,
IngestionCliVersionResolutionLogger.TRIGGER_TEST_CONNECTION,
resolution,
connectorType,
IngestionCliVersionResolutionLogger.IDENTIFIER_EXECUTION_REQUEST,
executionRequestUrn.toString());

final MetadataChangeProposal proposal =
buildMetadataChangeProposalWithKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.datahub.authorization.AuthorizationResult;
import com.datahub.authorization.EntitySpec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Secret;
Expand Down Expand Up @@ -35,6 +36,7 @@ public static QueryContext getMockAllowContext() {
when(mockContext.getOperationContext()).thenReturn(mock(OperationContext.class));
when(mockContext.getOperationContext().authorize(any(), nullable(EntitySpec.class), any()))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
when(mockContext.getOperationContext().getObjectMapper()).thenReturn(new ObjectMapper());
return mockContext;
}

Expand Down
Loading
Loading