Skip to content

Commit 37d9c6d

Browse files
authored
[FAI-13688] Run source airbyte connector docker container (#101)
1 parent 685eb94 commit 37d9c6d

14 files changed

+533
-168
lines changed

airbyte-local-cli-nodejs/package-lock.json

Lines changed: 276 additions & 136 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-local-cli-nodejs/package.json

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
"lint-fix": "npm run lint -- --fix",
1212
"clean": "rm -rf lib node_modules package-lock.json",
1313
"bundle": "npm run build && esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js --external:*.node",
14-
"pkg": "npm run bundle && pkg --output ./out/pkg/airbyte-local dist/index.js",
15-
"pkg-linuxstatic": "npm run bundle && pkg --output ./out/pkg/airbyte-local -t linuxstatic dist/index.js",
14+
"pkg": "npm run bundle && pkg --debug --output ./out/pkg/airbyte-local dist/index.js",
15+
"pkg-linuxstatic": "npm run bundle && pkg --debug --output ./out/pkg/airbyte-local -t linuxstatic dist/index.js",
1616
"test": "jest",
1717
"shellspec": "npm run bundle && pkg --output ./test/exec/airbyte-local dist/index.js && cp -rf ./test/resources ./test/exec/resources && shellspec --chdir ./test/exec"
1818
},
@@ -35,16 +35,16 @@
3535
"@tsconfig/strictest": "^2.0.5",
3636
"@types/dockerode": "^3.3.32",
3737
"@types/jest": "^29.5.14",
38-
"@types/lodash": "^4.17.13",
38+
"@types/lodash": "^4.17.14",
3939
"@types/node": "^20.17.6",
40-
"@typescript-eslint/eslint-plugin": "^8.13.0",
40+
"@typescript-eslint/eslint-plugin": "^8.19.1",
4141
"@typescript-eslint/parser": "^8.13.0",
42-
"@yao-pkg/pkg": "^6.1.1",
42+
"@yao-pkg/pkg": "^6.2.0",
4343
"eslint": "^8.57.1",
4444
"eslint-config-faros": "^0.1.0",
4545
"eslint-config-prettier": "^9.1.0",
4646
"eslint-plugin-simple-import-sort": "^12.1.1",
47-
"prettier": "^3.4.1",
47+
"prettier": "^3.4.2",
4848
"ts-jest": "^29.2.5",
4949
"tsx": "^4.19.2",
5050
"typescript": "~5.6.3"
@@ -53,8 +53,8 @@
5353
"commander": "^12.1.0",
5454
"dockerode": "^4.0.2",
5555
"lodash": "^4.17.21",
56-
"pino": "^9.5.0",
57-
"pino-pretty": "^11.3.0"
56+
"pino": "^9.6.0",
57+
"pino-pretty": "^13.0.0"
5858
},
5959
"jest": {
6060
"silent": false,

airbyte-local-cli-nodejs/src/docker.ts

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
import {writeFileSync} from 'node:fs';
12
import {Writable} from 'node:stream';
23

34
import Docker from 'dockerode';
45

5-
import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType} from './types';
6-
import {logger, SRC_CONFIG_FILENAME} from './utils';
6+
import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType, FarosConfig} from './types';
7+
import {DEFAULT_STATE_FILE, logger, SRC_CATALOG_FILENAME, SRC_CONFIG_FILENAME} from './utils';
8+
9+
// Constants
10+
const DEFAULT_MAX_LOG_SIZE = '10m';
711

812
// Create a new Docker instance
913
let _docker = new Docker();
@@ -97,3 +101,114 @@ export async function checkSrcConnection(tmpDir: string, image: string, srcConfi
97101
throw new Error(`Failed to validate source connection: ${error.message ?? JSON.stringify(error)}.`);
98102
}
99103
}
104+
105+
/**
106+
* Spinning up a docker container to run source airbyte connector.
107+
* Platform is set to 'linux/amd64' as we only publish airbyte connectors images for linux/amd64.
108+
*
109+
* Docker cli command:
110+
* docker run --name $src_container_name --init \
111+
* -v "$tempdir:/configs" \
112+
* $max_memory $max_cpus --log-opt max-size="$max_log_size" \
113+
* --env LOG_LEVEL="$log_level" \
114+
* $src_docker_options
115+
* --cidfile="$tempPrefix-src_cid" \
116+
* -a stdout -a stderr \
117+
* "$src_docker_image" \
118+
* read \
119+
* --config "/configs/$src_config_filename" \
120+
* --catalog "/configs/$src_catalog_filename" \
121+
* --state "/configs/$src_state_filename"
122+
*
123+
* @argument command - for testing purposes only
124+
*/
125+
export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<string> {
126+
logger.info('Running source connector...');
127+
128+
if (!config.src?.image) {
129+
throw new Error('Source image is missing.');
130+
}
131+
132+
try {
133+
const timestamp = Date.now();
134+
const srcContainerName = `airbyte-local-src-${timestamp}`;
135+
const cmd = [
136+
'read',
137+
'--config',
138+
`/configs/${SRC_CONFIG_FILENAME}`,
139+
'--catalog',
140+
`/configs/${SRC_CATALOG_FILENAME}`,
141+
'--state',
142+
`/configs/${DEFAULT_STATE_FILE}`,
143+
];
144+
const maxNanoCpus = config.src?.dockerOptions?.maxCpus;
145+
const maxMemory = config.src?.dockerOptions?.maxMemory;
146+
const createOptions: Docker.ContainerCreateOptions = {
147+
// Default config: can be overridden by the docker options provided by users
148+
name: srcContainerName,
149+
Image: config.src.image,
150+
...config.src?.dockerOptions?.additionalOptions,
151+
152+
// Default options: cannot be overridden by users
153+
Cmd: cmd,
154+
AttachStdout: true,
155+
AttachStderr: true,
156+
platform: 'linux/amd64',
157+
Env: [`LOG_LEVEL=${config.logLevel}`, ...(config.src?.dockerOptions?.additionalOptions?.Env || [])],
158+
HostConfig: {
159+
// Defautl host config: can be overridden by users
160+
NanoCpus: maxNanoCpus, // 1e9 nano cpus = 1 cpu
161+
Memory: maxMemory, // 1024 * 1024 bytes = 1MB
162+
LogConfig: {
163+
Type: 'json-file',
164+
Config: {
165+
'max-size': config.src?.dockerOptions?.maxLogSize ?? DEFAULT_MAX_LOG_SIZE,
166+
},
167+
},
168+
...config.src?.dockerOptions?.additionalOptions?.HostConfig,
169+
// Default options: cannot be overridden by users
170+
Binds: [`${tmpDir}:/configs`],
171+
AutoRemove: true,
172+
Init: true,
173+
},
174+
};
175+
176+
// Create the Docker container
177+
const container = await _docker.createContainer(createOptions);
178+
179+
// Write the container ID to the cidfile
180+
const cidfilePath = `tmp-${timestamp}-src_cid`;
181+
writeFileSync(cidfilePath, container.id);
182+
183+
// create a writable stream to capture the stdout
184+
let data = '';
185+
const stdoutStream = new Writable({
186+
write(chunk, _encoding, callback) {
187+
data += chunk.toString();
188+
callback();
189+
},
190+
});
191+
// Attach the stderr to termincal stderr, and stdout to the output stream
192+
const stream = await container.attach({stream: true, stdout: true, stderr: true});
193+
container.modem.demuxStream(stream, stdoutStream, process.stderr);
194+
195+
// Start the container
196+
await container.start();
197+
198+
// Wait for the container to finish
199+
const res = await container.wait();
200+
logger.debug(res);
201+
logger.debug(data);
202+
203+
if (res.StatusCode === 0) {
204+
logger.info('Source connector ran successfully.');
205+
} else {
206+
throw new Error('Failed to run source connector.');
207+
}
208+
209+
// return the stdout data
210+
return data;
211+
} catch (error: any) {
212+
throw new Error(`Failed to run source connector: ${error.message ?? JSON.stringify(error)}`);
213+
}
214+
}

airbyte-local-cli-nodejs/src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {parseAndValidateInputs} from './command';
2-
import {checkDockerInstalled, checkSrcConnection, pullDockerImage} from './docker';
2+
import {checkDockerInstalled, checkSrcConnection, pullDockerImage, runSrcSync} from './docker';
33
import {AirbyteCliContext} from './types';
44
import {cleanUp, createTmpDir, loadStateFile, logger, writeConfig} from './utils';
55

@@ -23,6 +23,10 @@ async function main(): Promise<void> {
2323
if (cfg.srcCheckConnection && cfg.src?.image) {
2424
await checkSrcConnection(context.tmpDir, cfg.src.image);
2525
}
26+
// Run airbyte source connector
27+
if (!cfg.srcInputFile) {
28+
await runSrcSync(context.tmpDir, cfg);
29+
}
2630
} catch (error: any) {
2731
logger.error(error.message, 'Error');
2832
cleanUp(context);

airbyte-local-cli-nodejs/src/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ export interface AirbyteConfig {
4040
image: string;
4141
config?: object;
4242
catalog?: object;
43-
dockerOptions?: string;
43+
dockerOptions?: AirbyteConfigDockerOptions;
44+
}
45+
export interface AirbyteConfigDockerOptions {
46+
maxMemory?: number; // unit: MB
47+
maxCpus?: number; // unit: CPU
48+
maxLogSize?: string; // default: 10m (10MB)
49+
additionalOptions?: any;
4450
}
4551
export enum AirbyteConfigInputType {
4652
FILE = 'file',

airbyte-local-cli-nodejs/src/utils.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export const SRC_CONFIG_FILENAME = `${FILENAME_PREFIX}_src_config.json`;
1414
export const DST_CONFIG_FILENAME = `${FILENAME_PREFIX}_dst_config.json`;
1515
export const SRC_CATALOG_FILENAME = `${FILENAME_PREFIX}_src_catalog.json`;
1616
export const DST_CATALOG_FILENAME = `${FILENAME_PREFIX}_dst_catalog.json`;
17+
export const DEFAULT_STATE_FILE = 'state.json';
1718

1819
// Create a pino logger instance
1920
export const logger = pino(pretty({colorize: true}));
@@ -82,16 +83,15 @@ export function createTmpDir(absTmpDir?: string): string {
8283

8384
// Load the existing state file and write to the temporary folder
8485
export function loadStateFile(tempDir: string, filePath?: string, connectionName?: string): void {
85-
const path = filePath ?? (connectionName ? `${connectionName}__state.json` : 'state.json');
86-
const name = 'state.json';
86+
const path = filePath ?? (connectionName ? `${connectionName}__state.json` : DEFAULT_STATE_FILE);
8787

8888
// Read the state file and write to temp folder
8989
// Write an empty state file if the state file hasn't existed yet
9090
try {
9191
accessSync(path, constants.R_OK);
9292
const stateData = readFileSync(path, 'utf8');
93-
logger.debug(`Writing state file to temporary directory: '${tempDir}/${name}'...`);
94-
writeFileSync(`${tempDir}/${name}`, stateData);
93+
logger.debug(`Writing state file to temporary directory: '${tempDir}/${DEFAULT_STATE_FILE}'...`);
94+
writeFileSync(`${tempDir}/${DEFAULT_STATE_FILE}`, stateData);
9595
} catch (error: any) {
9696
if (error.code !== 'ENOENT') {
9797
throw new Error(`Failed to read state file '${path}' : ${error.message}`);
@@ -100,8 +100,8 @@ export function loadStateFile(tempDir: string, filePath?: string, connectionName
100100
`State file '${filePath}' not found. Please make sure the state file exists and have read access.`,
101101
);
102102
}
103-
writeFileSync(`${tempDir}/${name}`, '{}');
104-
logger.debug(`State file '${name}' not found. An empty state file is created.`);
103+
writeFileSync(`${tempDir}/${DEFAULT_STATE_FILE}`, '{}');
104+
logger.debug(`State file '${DEFAULT_STATE_FILE}' not found. An empty state file is created.`);
105105
}
106106
}
107107

airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ exports[`parseConfigFile should pass 1`] = `
1212
"graph": "default",
1313
},
1414
},
15-
"dockerOptions": "--memory 2048m --cpus 2",
15+
"dockerOptions": {
16+
"maxCpus": 2,
17+
"maxMemory": 2048,
18+
},
1619
"image": "farosai/airbyte-servicenow-destination",
1720
},
1821
"src": {
@@ -22,7 +25,10 @@ exports[`parseConfigFile should pass 1`] = `
2225
"url": "https://test-instance.service-now.com",
2326
"username": "test-username",
2427
},
25-
"dockerOptions": "--memory 2048m --cpus 2",
28+
"dockerOptions": {
29+
"maxCpus": 2,
30+
"maxMemory": 2048,
31+
},
2632
"image": "farosai/airbyte-servicenow-source",
2733
},
2834
}

airbyte-local-cli-nodejs/test/docker.it.test.ts

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,29 @@
1-
import {checkSrcConnection, pullDockerImage} from '../src/docker';
1+
import {readdirSync, unlinkSync} from 'node:fs';
2+
import path from 'node:path';
3+
import {Writable} from 'node:stream';
4+
5+
import {checkSrcConnection, pullDockerImage, runSrcSync} from '../src/docker';
6+
import {FarosConfig} from '../src/types';
7+
8+
const defaultConfig: FarosConfig = {
9+
srcCheckConnection: false,
10+
dstUseHostNetwork: false,
11+
srcPull: false,
12+
dstPull: false,
13+
fullRefresh: false,
14+
rawMessages: false,
15+
keepContainers: false,
16+
logLevel: 'info',
17+
debug: false,
18+
stateFile: undefined,
19+
connectionName: undefined,
20+
srcOutputFile: undefined,
21+
srcInputFile: undefined,
22+
};
223

324
beforeAll(async () => {
425
await pullDockerImage('farosai/airbyte-example-source');
26+
await pullDockerImage('farosai/airbyte-faros-graphql-source');
527
});
628

729
describe('checkSrcConnection', () => {
@@ -25,3 +47,55 @@ describe('checkSrcConnection', () => {
2547
).rejects.toThrow('Failed to validate source connection: User is not chris.');
2648
});
2749
});
50+
51+
describe('runSrcSync', () => {
52+
const testCfg: FarosConfig = {
53+
...defaultConfig,
54+
src: {
55+
image: 'farosai/airbyte-example-source',
56+
},
57+
};
58+
59+
// Clean up files created by the test
60+
afterAll(() => {
61+
const pattern = /.*-src_cid$/;
62+
const files = readdirSync(process.cwd());
63+
const matchingFiles = files.filter((file) => pattern.test(file));
64+
65+
matchingFiles.forEach((file) => {
66+
const filePath = path.join(process.cwd(), file);
67+
unlinkSync(filePath);
68+
});
69+
});
70+
71+
it('should success', async () => {
72+
await expect(runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, testCfg)).resolves.not.toThrow();
73+
});
74+
75+
// Check the error message is correctly redirect to process.stderr
76+
it('should fail', async () => {
77+
// Capture process.stderr
78+
let stderrData = '';
79+
const originalStderrWrite = process.stderr.write;
80+
const stderrStream = new Writable({
81+
write(chunk, _encoding, callback) {
82+
stderrData += chunk.toString();
83+
callback();
84+
},
85+
});
86+
process.stderr.write = stderrStream.write.bind(stderrStream) as any;
87+
88+
try {
89+
await expect(
90+
runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, {
91+
...testCfg,
92+
src: {image: 'farosai/airbyte-faros-graphql-source'},
93+
}),
94+
).rejects.toThrow();
95+
} finally {
96+
process.stderr.write = originalStderrWrite;
97+
}
98+
99+
expect(stderrData).toContain(`Faros API key was not provided`);
100+
});
101+
});

airbyte-local-cli-nodejs/test/exec/faros_airbyte_cli_config.json

Lines changed: 0 additions & 9 deletions
This file was deleted.

airbyte-local-cli-nodejs/test/exec/spec/index_spec.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,21 @@ Describe 'Check source connection'
109109
The status should equal 1
110110
End
111111
End
112+
113+
Describe 'Run source sync'
114+
It 'should fail if source sync fails auth'
115+
airbyte_local_test() {
116+
./airbyte-local \
117+
--src 'farosai/airbyte-faros-graphql-source' \
118+
--src-only
119+
}
120+
When call airbyte_local_test
121+
The stderr should include "Faros API key was not provided"
122+
The output should include "Failed to run source connector: Failed to run source connector."
123+
The status should equal 1
124+
End
125+
End
126+
127+
# Clean up temeporary test files
128+
find . -name 'faros_airbyte_cli_config.json' -delete
129+
find . -name '*-src_cid' -delete
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"user": "chris"
3+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}

0 commit comments

Comments
 (0)