Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAI-14745] Process source airbyte connector output #102

Merged
merged 9 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 407 additions & 17 deletions airbyte-local-cli-nodejs/package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions airbyte-local-cli-nodejs/src/command.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Command, Option} from 'commander';

import {AirbyteConfig, AirbyteConfigInputType, CliOptions, FarosConfig} from './types';
import {logger, parseConfigFile, updateLogLevel} from './utils';
import {logger, OutputStream, parseConfigFile, updateLogLevel} from './utils';
import {CLI_VERSION} from './version';

// Command line program
Expand Down Expand Up @@ -183,9 +183,9 @@ export function parseAndValidateInputs(argv: string[]): FarosConfig {

// Convert the cli options to FarosConfig
const farosConfig: FarosConfig = {
// The default source output file is stdout if `srcOnly` is true
// The default source output file is stdout(`-`) if `srcOnly` is true
// Take the non-default value if provided with `srcOutputFile` option
srcOutputFile: cliOptions.srcOnly ? '/dev/null' : cliOptions.srcOutputFile,
srcOutputFile: cliOptions.srcOnly ? OutputStream.STDOUT : cliOptions.srcOutputFile,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug. this is supposed to be stdout by default

// Rename the `dstOnly` file path to `srcInputFile`
srcInputFile: cliOptions.dstOnly,
connectionName: cliOptions.connectionName,
Expand Down
39 changes: 26 additions & 13 deletions airbyte-local-cli-nodejs/src/docker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import {writeFileSync} from 'node:fs';
import {createWriteStream, writeFileSync} from 'node:fs';
import {Writable} from 'node:stream';

import Docker from 'dockerode';

import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType, FarosConfig} from './types';
import {DEFAULT_STATE_FILE, logger, SRC_CATALOG_FILENAME, SRC_CONFIG_FILENAME} from './utils';
import {
DEFAULT_STATE_FILE,
logger,
OutputStream,
processSrcDataByLine,
SRC_CATALOG_FILENAME,
SRC_CONFIG_FILENAME,
SRC_OUTPUT_DATA_FILE,
} from './utils';

// Constants
const DEFAULT_MAX_LOG_SIZE = '10m';
Expand Down Expand Up @@ -119,10 +127,8 @@ export async function checkSrcConnection(tmpDir: string, image: string, srcConfi
* --config "/configs/$src_config_filename" \
* --catalog "/configs/$src_catalog_filename" \
* --state "/configs/$src_state_filename"
*
* @argument command - for testing purposes only
*/
export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<string> {
export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<void> {
logger.info('Running source connector...');

if (!config.src?.image) {
Expand Down Expand Up @@ -180,34 +186,41 @@ export async function runSrcSync(tmpDir: string, config: FarosConfig): Promise<s
const cidfilePath = `tmp-${timestamp}-src_cid`;
writeFileSync(cidfilePath, container.id);

// Create a writable stream for the processed output data
const srcOutputFilePath = config.srcOutputFile ?? `${tmpDir}/${SRC_OUTPUT_DATA_FILE}`;
const srcOutputStream =
config.srcOutputFile === OutputStream.STDOUT ? process.stdout : createWriteStream(srcOutputFilePath);

// create a writable stream to capture the stdout
let data = '';
const stdoutStream = new Writable({
let buffer = '';
const containerOutputStream = new Writable({
write(chunk, _encoding, callback) {
data += chunk.toString();
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
lines.forEach((line: string) => {
processSrcDataByLine(line, srcOutputStream, config);
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processing the data and write to stdout/file instead of returning the data at the end of the function

callback();
},
});

// Attach the stderr to termincal stderr, and stdout to the output stream
const stream = await container.attach({stream: true, stdout: true, stderr: true});
container.modem.demuxStream(stream, stdoutStream, process.stderr);
container.modem.demuxStream(stream, containerOutputStream, process.stderr);

// Start the container
await container.start();

// Wait for the container to finish
const res = await container.wait();
logger.debug(res);
logger.debug(data);

if (res.StatusCode === 0) {
logger.info('Source connector ran successfully.');
} else {
throw new Error('Failed to run source connector.');
}

// return the stdout data
return data;
} catch (error: any) {
throw new Error(`Failed to run source connector: ${error.message ?? JSON.stringify(error)}`);
}
Expand Down
6 changes: 5 additions & 1 deletion airbyte-local-cli-nodejs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {parseAndValidateInputs} from './command';
import {checkDockerInstalled, checkSrcConnection, pullDockerImage, runSrcSync} from './docker';
import {AirbyteCliContext} from './types';
import {cleanUp, createTmpDir, loadStateFile, logger, writeConfig} from './utils';
import {cleanUp, createTmpDir, loadStateFile, logger, processSrcInputFile, writeConfig} from './utils';

async function main(): Promise<void> {
const context: AirbyteCliContext = {};
Expand All @@ -19,13 +19,17 @@ async function main(): Promise<void> {
if (cfg.srcPull && cfg.src?.image) {
await pullDockerImage(cfg.src.image);
}

// Check source connection
if (cfg.srcCheckConnection && cfg.src?.image) {
await checkSrcConnection(context.tmpDir, cfg.src.image);
}

// Run airbyte source connector
if (!cfg.srcInputFile) {
await runSrcSync(context.tmpDir, cfg);
} else {
await processSrcInputFile(context.tmpDir, cfg);
}
} catch (error: any) {
logger.error(error.message, 'Error');
Expand Down
128 changes: 127 additions & 1 deletion airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
import {spawnSync} from 'node:child_process';
import {accessSync, constants, mkdtempSync, readFileSync, rmSync, writeFileSync} from 'node:fs';
import {
accessSync,
constants,
createReadStream,
createWriteStream,
mkdtempSync,
readFileSync,
rmSync,
writeFileSync,
} from 'node:fs';
import {tmpdir} from 'node:os';
import {sep} from 'node:path';
import readline from 'node:readline';
import {Writable} from 'node:stream';

import pino from 'pino';
import pretty from 'pino-pretty';

import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types';

// constants
export enum OutputStream {
STDERR = 'STDERR',
STDOUT = 'STDOUT',
}
export const FILENAME_PREFIX = 'faros_airbyte_cli';
export const SRC_CONFIG_FILENAME = `${FILENAME_PREFIX}_src_config.json`;
export const DST_CONFIG_FILENAME = `${FILENAME_PREFIX}_dst_config.json`;
export const SRC_CATALOG_FILENAME = `${FILENAME_PREFIX}_src_catalog.json`;
export const DST_CATALOG_FILENAME = `${FILENAME_PREFIX}_dst_catalog.json`;
export const DEFAULT_STATE_FILE = 'state.json';
export const SRC_INPUT_DATA_FILE = `${FILENAME_PREFIX}_src_data`;
export const SRC_OUTPUT_DATA_FILE = `${FILENAME_PREFIX}_src_output`;

// Create a pino logger instance
export const logger = pino(pretty({colorize: true}));
Expand Down Expand Up @@ -165,3 +182,112 @@ export function writeConfig(tmpDir: string, config: FarosConfig): void {
writeFileSync(dstCatalogFilePath, JSON.stringify(airbyteConfig.dst.catalog ?? {}, null, 2));
logger.debug(`Airbyte catalog files written to: ${srcCatalogFilePath}, ${dstCatalogFilePath}`);
}

// Read file content
export function readFile(file: string): any {
try {
const data = readFileSync(file, 'utf8');
return data;
} catch (error: any) {
throw new Error(`Failed to read '${file}': ${error.message}`);
}
}

// Write file content
export function writeFile(file: string, data: any): void {
try {
writeFileSync(file, data);
} catch (error: any) {
throw new Error(`Failed to write '${file}': ${error.message}`);
}
}

/**
* Process the source output.
*
* Command line:
* tee >(
* jq -cR $jq_color_opt --unbuffered 'fromjson? |
* select(.type != "RECORD" and .type != "STATE")' |
* jq -rR --unbuffered "$jq_src_msg" >&2
* ) |
* jq -cR --unbuffered "fromjson? |
* select(.type == \"RECORD\" or .type == \"STATE\") |
* .record.stream |= \"${dst_stream_prefix}\" + ." |
* tee "$output_filepath" | ...
*
* jq_src_msg="\"${GREEN}[SRC]: \" + ${JQ_TIMESTAMP} + \" - \" + ."
*
*
* Note: `dst_stream_prefix` command option is dropped
*/

// Processing the source line by line
export function processSrcDataByLine(line: string, outputStream: Writable, cfg: FarosConfig): void {
// Reformat the JSON message
function formatSrcMsg(json: any): string {
return `[SRC] - ${JSON.stringify(json)}`;
}
// skip empty lines
if (line.trim() === '') {
return;
}

try {
const data = JSON.parse(line);

// non RECORD and STATE type messages: print as stdout
// RECORD and STATE type messages: when the output is set to stdout
if ((data.type !== 'RECORD' && data.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if (cfg.rawMessages) {
process.stdout.write(`${line}\n`);
} else {
logger.info(formatSrcMsg(data));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will look like this
Screenshot 2025-01-21 at 10 16 17

}
}
// RECORD and STATE type messages: write to output file
else {
outputStream.write(`${line}\n`);
}
} catch (error: any) {
throw new Error(`Line of data: '${line}'; Error: ${error.message}`);
}
}

export function processSrcInputFile(tmpDir: string, cfg: FarosConfig): Promise<void> {
return new Promise((resolve, reject) => {
// create input and output streams:
// - input stream: read from the data file user provided
// - output stream: write to an intermediate file. Overwrite the file if it exists, otherwise create a new one
const inputStream = createReadStream(cfg.srcInputFile!);
const outputStream = createWriteStream(`${tmpDir}/${SRC_OUTPUT_DATA_FILE}`);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the user has srcInputFile configured, i assume they won't have src-only or src-output-file configured. but i will add this constraint in the command in another PR.


// create readline interface
const rl = readline.createInterface({
input: inputStream,
crlfDelay: Infinity,
});

rl.on('line', (line) => {
try {
processSrcDataByLine(line, outputStream, cfg);
} catch (error: any) {
rl.emit('error', error);
}
})
.on('close', () => {
logger.debug('Finished processing the source output data.');
outputStream.end();
resolve();
})
.on('error', (error) => {
outputStream.end();
reject(new Error(`Failed to process the source output data: ${error.message ?? JSON.stringify(error)}`));
});

outputStream.on('error', (error: any) => {
outputStream.end();
reject(new Error(`Failed to write to the output file: ${error.message ?? JSON.stringify(error)}`));
});
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`runSrcSync should success 1`] = `
"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.4"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":***,"message":{"level":30,"msg":"Source version: 0.12.4"}},{"timestamp":***,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":***,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":***,"message":{"level":30,"msg":"State: {}"}},{"timestamp":***,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":***,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]}
"
`;
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ exports[`parseConfigFile should pass 1`] = `
}
`;

exports[`processSrcInputFile should succeed writing to an output file 1`] = `
"{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":***,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":***,"message":{"level":30,"msg":"Config: {\\"user\\":\\"chris\\"}"}},{"timestamp":***,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":***,"message":{"level":30,"msg":"State: {}"}},{"timestamp":***,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":***,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]}
"
`;

exports[`write files to temporary dir loadStateFile should pass with existing state file 1`] = `
"{"format":"base64/gzip","data":"dGVzdA=="}
"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-local-cli-nodejs/test/command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ describe('Check other options', () => {
...defaultConfig,
src: {image: 'source-image', config: {}},
dst: {image: undefined, config: {}},
srcOutputFile: '/dev/null',
srcOutputFile: 'STDOUT',
dstPull: false,
});
});
Expand Down
21 changes: 17 additions & 4 deletions airbyte-local-cli-nodejs/test/docker.it.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {readdirSync, unlinkSync} from 'node:fs';
import {readdirSync, readFileSync, rmSync, unlinkSync} from 'node:fs';
import path from 'node:path';
import {Writable} from 'node:stream';

import {checkSrcConnection, pullDockerImage, runSrcSync} from '../src/docker';
import {FarosConfig} from '../src/types';
import {SRC_OUTPUT_DATA_FILE} from '../src/utils';

const defaultConfig: FarosConfig = {
srcCheckConnection: false,
Expand Down Expand Up @@ -56,6 +57,13 @@ describe('runSrcSync', () => {
},
};

const testTmpDir = `${process.cwd()}/test/resources/dockerIt_runSrcSync`;

// remove the intermediate output file
afterEach(() => {
rmSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, {force: true});
});

// Clean up files created by the test
afterAll(() => {
const pattern = /.*-src_cid$/;
Expand All @@ -69,10 +77,15 @@ describe('runSrcSync', () => {
});

it('should success', async () => {
await expect(runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, testCfg)).resolves.not.toThrow();
await expect(runSrcSync(testTmpDir, testCfg)).resolves.not.toThrow();

// Replace timestamp for comparison
const output = readFileSync(`${testTmpDir}/${SRC_OUTPUT_DATA_FILE}`, 'utf8');
const outputWithoutTS = output.split('\n').map((line) => line.replace(/"timestamp":\d+/g, '"timestamp":***'));
expect(outputWithoutTS.join('\n')).toMatchSnapshot();
});

// Check the error message is correctly redirect to process.stderr
// Check stderr message is correctly redirect to process.stderr
it('should fail', async () => {
// Capture process.stderr
let stderrData = '';
Expand All @@ -87,7 +100,7 @@ describe('runSrcSync', () => {

try {
await expect(
runSrcSync(`${process.cwd()}/test/resources/dockerIt_runSrcSync`, {
runSrcSync(testTmpDir, {
...testCfg,
src: {image: 'farosai/airbyte-faros-graphql-source'},
}),
Expand Down
9 changes: 9 additions & 0 deletions airbyte-local-cli-nodejs/test/resources/test_src_input
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"log":{"level":"INFO","message":"Source version: 0.12.3"},"type":"LOG"}
{"log":{"level":"INFO","message":"Config: {\"user\":\"chris\"}"},"type":"LOG"}
{"log":{"level":"INFO","message":"Catalog: {}"},"type":"LOG"}
{"log":{"level":"INFO","message":"State: {}"},"type":"LOG"}
{"log":{"level":"INFO","message":"Syncing ExampleSource"},"type":"LOG"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"user":"chris"},"sourceType":"example","sourceVersion":"0.12.3"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","sourceStatus":{"status":"SUCCESS"}}
{"log":{"level":"INFO","message":"Finished syncing ExampleSource"},"type":"LOG"}
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","logs":[{"timestamp":1736891682696,"message":{"level":30,"msg":"Source version: 0.12.3"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Config: {\"user\":\"chris\"}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"Catalog: {}"}},{"timestamp":1736891682697,"message":{"level":30,"msg":"State: {}"}},{"timestamp":1736891682700,"message":{"level":30,"msg":"Syncing ExampleSource"}},{"timestamp":1736891682704,"message":{"level":30,"msg":"Finished syncing ExampleSource"}}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
invalid json
Loading
Loading