Skip to content

Commit 47c28f4

Browse files
authored
[FAI-14970] Skip runnning discover catalog with dst only (#108)
1 parent d1a6645 commit 47c28f4

File tree

6 files changed

+100
-18
lines changed

6 files changed

+100
-18
lines changed

airbyte-local-cli-nodejs/src/docker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ export async function runDiscoverCatalog(tmpDir: string, image: string | undefin
190190

191191
if (rawCatalog?.type === AirbyteMessageType.CATALOG && res[0].StatusCode === 0) {
192192
logger.info('Catalog discovered successfully.');
193-
return rawCatalog.catalog ?? {};
193+
return rawCatalog.catalog ?? {streams: []};
194194
}
195195
throw new Error('Catalog not found or container ends with non-zero status code');
196196
} catch (error: any) {

airbyte-local-cli-nodejs/src/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export interface CliOptions {
4141
export interface AirbyteConfig {
4242
image: string;
4343
config?: object;
44-
catalog?: object;
44+
catalog?: AirbyteConfiguredCatalog;
4545
dockerOptions?: AirbyteConfigDockerOptions;
4646
}
4747
export interface AirbyteConfigDockerOptions {
@@ -116,7 +116,7 @@ export enum DestinationSyncMode {
116116
}
117117
export interface AirbyteStream {
118118
name: string;
119-
json_schema: Dictionary<any>;
119+
json_schema?: Dictionary<any>;
120120
supported_sync_modes?: SyncMode[];
121121
source_defined_cursor?: boolean;
122122
default_cursor_field?: string[];

airbyte-local-cli-nodejs/src/utils.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,21 +291,29 @@ export async function writeCatalog(tmpDir: string, config: FarosConfig): Promise
291291
logger.debug(`Writing Airbyte catalog to files...`);
292292
const srcCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_src_catalog.json`;
293293
const dstCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_dst_catalog.json`;
294+
let srcCatalog: AirbyteConfiguredCatalog;
295+
let dstCatalog: AirbyteConfiguredCatalog;
294296

295-
// run discover catalog to get default catalog
296-
const defaultCatalog = await runDiscoverCatalog(tmpDir, config.src?.image);
297+
if (config.srcInputFile) {
298+
// run dst only
299+
srcCatalog = {streams: []};
300+
dstCatalog = config.dst?.catalog ?? {streams: []};
301+
} else {
302+
// run discover catalog to get default catalog
303+
const defaultCatalog = await runDiscoverCatalog(tmpDir, config.src?.image);
297304

298-
// src catalog: override the default with user provided catalog
299-
const srcCatalog = overrideCatalog(config.src?.catalog ?? {}, defaultCatalog, config.fullRefresh);
305+
// src catalog: override the default with user provided catalog
306+
srcCatalog = overrideCatalog(config.src?.catalog ?? {}, defaultCatalog, config.fullRefresh);
300307

301-
// dst catalog: use src catalog or override default with user provided dst catalog
302-
// append dst stream prefix to the stream name
303-
let dstCatalog;
304-
if (Object.keys((config.dst?.catalog as AirbyteCatalog)?.streams ?? []).length === 0) {
305-
dstCatalog = structuredClone(srcCatalog);
306-
} else {
307-
dstCatalog = overrideCatalog(config.dst?.catalog ?? {}, defaultCatalog, config.fullRefresh);
308+
// dst catalog: use src catalog or override default with user provided dst catalog
309+
// append dst stream prefix to the stream name
310+
if (Object.keys(config.dst?.catalog?.streams ?? []).length === 0) {
311+
dstCatalog = structuredClone(srcCatalog);
312+
} else {
313+
dstCatalog = overrideCatalog(config.dst?.catalog ?? {}, defaultCatalog, config.fullRefresh);
314+
}
308315
}
316+
309317
dstCatalog.streams.forEach((stream) => {
310318
stream.stream.name = `${config.dstStreamPrefix ?? ''}${stream.stream.name}`;
311319
});

airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,26 @@ Describe 'Run source sync'
134134
End
135135
End
136136

137+
Describe 'Run destination sync'
138+
It 'should succeed with dstOnly'
139+
airbyte_local_test() {
140+
141+
jq --arg api_key "$FAROS_API_KEY" '
142+
.dst.config.edition_configs.api_key = $api_key
143+
' ./resources/test_config_file_dst_only.json.template > ./resources/test_config_file_dst_only.json
144+
145+
./airbyte-local \
146+
--config-file './resources/test_config_file_dst_only.json' \
147+
--dst-only './resources/dockerIt_runDstSync/faros_airbyte_cli_src_output'
148+
}
149+
When call airbyte_local_test
150+
The output should include '[DST] - {"log":{"level":"INFO","message":"Errored 0 records"},"type":"LOG"}'
151+
The output should include "Destination connector ran successfully."
152+
The output should include "Airbyte CLI completed successfully."
153+
The status should equal 0
154+
End
155+
End
156+
137157
Describe 'Run source and destination sync'
138158
It 'should succeed with src and dst'
139159
airbyte_local_test() {
@@ -147,11 +167,11 @@ Describe 'Run source and destination sync'
147167
--config-file './resources/test_config_file_graph_copy.json'
148168
}
149169
When call airbyte_local_test
150-
The status should equal 0
151170
The output should include "Source connector ran successfully."
152171
The output should include '[DST] - {"log":{"level":"INFO","message":"Errored 0 records"},"type":"LOG"}'
153172
The output should include "Destination connector ran successfully."
154173
The output should include "Airbyte CLI completed successfully."
174+
The status should equal 0
155175
End
156176
End
157177

@@ -160,6 +180,7 @@ cleanup() {
160180
find . -name 'faros_airbyte_cli_config.json' -delete
161181
find . -name '*_cid' -delete
162182
find . -name '*state.json' -delete
183+
find ./resources/ -name 'test_config_file_dst_only.json' -delete
163184
find ./resources/ -name 'test_config_file_graph_copy.json' -delete
164185
}
165186
AfterAll 'cleanup'
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"dst": {
3+
"image": "farosai/airbyte-faros-destination",
4+
"config": {
5+
"edition_configs": {
6+
"edition": "cloud",
7+
"graph": "jennie-test",
8+
"graphql_api": "v2",
9+
"api_url": "https://dev.api.faros.ai",
10+
"api_key": ""
11+
}
12+
},
13+
"catalog": {
14+
"streams":[
15+
{
16+
"stream":{
17+
"name":"myfarosgraphqlsrc__faros_graphql__faros_graph"
18+
},
19+
"sync_mode":"incremental",
20+
"destination_sync_mode":"append"
21+
}
22+
]
23+
}
24+
}
25+
}

airbyte-local-cli-nodejs/test/utils.it.test.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import {chmodSync, existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync}
22
import {tmpdir} from 'node:os';
33

44
import {runDiscoverCatalog} from '../src/docker';
5-
import {FarosConfig} from '../src/types';
5+
import {FarosConfig, SyncMode} from '../src/types';
66
import {
77
checkDockerInstalled,
88
cleanUp,
@@ -31,8 +31,10 @@ const testConfig: FarosConfig = {
3131
url: 'test',
3232
},
3333
catalog: {
34-
tests: {disabled: true},
35-
projects: {disabled: true},
34+
streams: [
35+
{stream: {name: 'tests'}, sync_mode: SyncMode.INCREMENTAL, disabled: true},
36+
{stream: {name: 'projects'}, sync_mode: SyncMode.FULL_REFRESH, disabled: true},
37+
],
3638
},
3739
},
3840
dst: {
@@ -307,6 +309,32 @@ describe('write files to temporary dir', () => {
307309
expect(srcCatalog).toMatchSnapshot();
308310
expect(dstCatalog).toMatchSnapshot();
309311
});
312+
313+
it('should succeed with dst only', async () => {
314+
const dstOnlyCatalog = {
315+
streams: [
316+
{
317+
stream: {name: 'builds'},
318+
sync_mode: 'incremental',
319+
},
320+
],
321+
};
322+
const catalogTestConfig = {
323+
...structuredClone(testConfig),
324+
src: undefined as unknown,
325+
dst: {...testConfig.dst, catalog: dstOnlyCatalog},
326+
dstStreamPrefix: 'testPrefix__',
327+
srcInputFile: 'testSrcInputFile',
328+
} as FarosConfig;
329+
await writeCatalog(tmpDirPath, catalogTestConfig);
330+
331+
expect(existsSync(srcCatalogPath)).toBe(true);
332+
expect(existsSync(dstCatalogPath)).toBe(true);
333+
const srcCatalog = JSON.parse(readFileSync(srcCatalogPath, 'utf8'));
334+
const dstCatalog = JSON.parse(readFileSync(dstCatalogPath, 'utf8'));
335+
expect(srcCatalog).toEqual({streams: []});
336+
expect(dstCatalog).toEqual(dstOnlyCatalog);
337+
});
310338
});
311339
});
312340

0 commit comments

Comments
 (0)