Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAI-14970] Skip runnning discover catalog with dst only #108

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-local-cli-nodejs/src/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export async function runDiscoverCatalog(tmpDir: string, image: string | undefin

if (rawCatalog?.type === AirbyteMessageType.CATALOG && res[0].StatusCode === 0) {
logger.info('Catalog discovered successfully.');
return rawCatalog.catalog ?? {};
return rawCatalog.catalog ?? {streams: []};
}
throw new Error('Catalog not found or container ends with non-zero status code');
} catch (error: any) {
Expand Down
4 changes: 2 additions & 2 deletions airbyte-local-cli-nodejs/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface CliOptions {
export interface AirbyteConfig {
image: string;
config?: object;
catalog?: object;
catalog?: AirbyteConfiguredCatalog;
dockerOptions?: AirbyteConfigDockerOptions;
}
export interface AirbyteConfigDockerOptions {
Expand Down Expand Up @@ -116,7 +116,7 @@ export enum DestinationSyncMode {
}
export interface AirbyteStream {
name: string;
json_schema: Dictionary<any>;
json_schema?: Dictionary<any>;
supported_sync_modes?: SyncMode[];
source_defined_cursor?: boolean;
default_cursor_field?: string[];
Expand Down
30 changes: 19 additions & 11 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,29 @@ export async function writeCatalog(tmpDir: string, config: FarosConfig): Promise
logger.debug(`Writing Airbyte catalog to files...`);
const srcCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_src_catalog.json`;
const dstCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_dst_catalog.json`;
let srcCatalog: AirbyteConfiguredCatalog;
let dstCatalog: AirbyteConfiguredCatalog;

// run discover catalog to get default catalog
const defaultCatalog = await runDiscoverCatalog(tmpDir, config.src?.image);
if (config.srcInputFile) {
// run dst only
srcCatalog = {streams: []};
dstCatalog = config.dst?.catalog ?? {streams: []};
} else {
// run discover catalog to get default catalog
const defaultCatalog = await runDiscoverCatalog(tmpDir, config.src?.image);

// src catalog: override the default with user provided catalog
const srcCatalog = overrideCatalog(config.src?.catalog ?? {}, defaultCatalog, config.fullRefresh);
// src catalog: override the default with user provided catalog
srcCatalog = overrideCatalog(config.src?.catalog ?? {}, defaultCatalog, config.fullRefresh);

// dst catalog: use src catalog or override default with user provided dst catalog
// append dst stream prefix to the stream name
let dstCatalog;
if (Object.keys((config.dst?.catalog as AirbyteCatalog)?.streams ?? []).length === 0) {
dstCatalog = structuredClone(srcCatalog);
} else {
dstCatalog = overrideCatalog(config.dst?.catalog ?? {}, defaultCatalog, config.fullRefresh);
// dst catalog: use src catalog or override default with user provided dst catalog
// append dst stream prefix to the stream name
if (Object.keys(config.dst?.catalog?.streams ?? []).length === 0) {
dstCatalog = structuredClone(srcCatalog);
} else {
dstCatalog = overrideCatalog(config.dst?.catalog ?? {}, defaultCatalog, config.fullRefresh);
}
}

dstCatalog.streams.forEach((stream) => {
stream.stream.name = `${config.dstStreamPrefix ?? ''}${stream.stream.name}`;
});
Expand Down
23 changes: 22 additions & 1 deletion airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,26 @@ Describe 'Run source sync'
End
End

Describe 'Run destination sync'
It 'should succeed with dstOnly'
airbyte_local_test() {

jq --arg api_key "$FAROS_API_KEY" '
.dst.config.edition_configs.api_key = $api_key
' ./resources/test_config_file_dst_only.json.template > ./resources/test_config_file_dst_only.json

./airbyte-local \
--config-file './resources/test_config_file_dst_only.json' \
--dst-only './resources/dockerIt_runDstSync/faros_airbyte_cli_src_output'
}
When call airbyte_local_test
The output should include '[DST] - {"log":{"level":"INFO","message":"Errored 0 records"},"type":"LOG"}'
The output should include "Destination connector ran successfully."
The output should include "Airbyte CLI completed successfully."
The status should equal 0
End
End

Describe 'Run source and destination sync'
It 'should succeed with src and dst'
airbyte_local_test() {
Expand All @@ -147,11 +167,11 @@ Describe 'Run source and destination sync'
--config-file './resources/test_config_file_graph_copy.json'
}
When call airbyte_local_test
The status should equal 0
The output should include "Source connector ran successfully."
The output should include '[DST] - {"log":{"level":"INFO","message":"Errored 0 records"},"type":"LOG"}'
The output should include "Destination connector ran successfully."
The output should include "Airbyte CLI completed successfully."
The status should equal 0
End
End

Expand All @@ -160,6 +180,7 @@ cleanup() {
find . -name 'faros_airbyte_cli_config.json' -delete
find . -name '*_cid' -delete
find . -name '*state.json' -delete
find ./resources/ -name 'test_config_file_dst_only.json' -delete
find ./resources/ -name 'test_config_file_graph_copy.json' -delete
}
AfterAll 'cleanup'
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"dst": {
"image": "farosai/airbyte-faros-destination",
"config": {
"edition_configs": {
"edition": "cloud",
"graph": "jennie-test",
"graphql_api": "v2",
"api_url": "https://dev.api.faros.ai",
"api_key": ""
}
},
"catalog": {
"streams":[
{
"stream":{
"name":"myfarosgraphqlsrc__faros_graphql__faros_graph"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i expect the user to provide the stream name with prefix

},
"sync_mode":"incremental",
"destination_sync_mode":"append"
}
]
}
}
}
34 changes: 31 additions & 3 deletions airbyte-local-cli-nodejs/test/utils.it.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {chmodSync, existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync}
import {tmpdir} from 'node:os';

import {runDiscoverCatalog} from '../src/docker';
import {FarosConfig} from '../src/types';
import {FarosConfig, SyncMode} from '../src/types';
import {
checkDockerInstalled,
cleanUp,
Expand Down Expand Up @@ -31,8 +31,10 @@ const testConfig: FarosConfig = {
url: 'test',
},
catalog: {
tests: {disabled: true},
projects: {disabled: true},
streams: [
{stream: {name: 'tests'}, sync_mode: SyncMode.INCREMENTAL, disabled: true},
{stream: {name: 'projects'}, sync_mode: SyncMode.FULL_REFRESH, disabled: true},
],
},
},
dst: {
Expand Down Expand Up @@ -307,6 +309,32 @@ describe('write files to temporary dir', () => {
expect(srcCatalog).toMatchSnapshot();
expect(dstCatalog).toMatchSnapshot();
});

it('should succeed with dst only', async () => {
const dstOnlyCatalog = {
streams: [
{
stream: {name: 'builds'},
sync_mode: 'incremental',
},
],
};
const catalogTestConfig = {
...structuredClone(testConfig),
src: undefined as unknown,
dst: {...testConfig.dst, catalog: dstOnlyCatalog},
dstStreamPrefix: 'testPrefix__',
srcInputFile: 'testSrcInputFile',
} as FarosConfig;
await writeCatalog(tmpDirPath, catalogTestConfig);

expect(existsSync(srcCatalogPath)).toBe(true);
expect(existsSync(dstCatalogPath)).toBe(true);
const srcCatalog = JSON.parse(readFileSync(srcCatalogPath, 'utf8'));
const dstCatalog = JSON.parse(readFileSync(dstCatalogPath, 'utf8'));
expect(srcCatalog).toEqual({streams: []});
expect(dstCatalog).toEqual(dstOnlyCatalog);
});
});
});

Expand Down
Loading