Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAI-14746] Run dst sync #106

Merged
merged 8 commits into from
Feb 4, 2025
Merged

[FAI-14746] Run dst sync #106

merged 8 commits into from
Feb 4, 2025

Conversation

jeniii
Copy link
Collaborator

@jeniii jeniii commented Jan 31, 2025

Description

Provide description here

  • Run dst sync
  • dst sync output processing
  • e2e tests with creds

Not included (will cover in later PRs)

  • more e2e tests
  • catalog discover should not be run if it's dst only
  • copy faros config from dst to src https://github.com/faros-ai/airbyte-local-cli/blob/main/airbyte-local.sh#L320
    • confirmed with Yandry that this is not required for most cases. this is only a convenience for users that users don't have to provide these in both src and dst config. if users are only doing src only, then they would need to provide them in the src config instead.

Type of change

  • Bug fix
  • New feature
  • Breaking change

const lastState = states.pop();
if (lastState) {
writeFileSync(`${config.stateFile}`, lastState);
logger.debug(`New state is udpated in '${config.stateFile}'.`);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get the last line of the state and write it to the state file

// Create a readable stream from the src output file and pipe it to the container stdin
const inputStream = createReadStream(`${tmpDir}/${SRC_OUTPUT_DATA_FILE}`);
const stdinStream = await container.attach({stream: true, hijack: true, stdin: true});
inputStream.pipe(stdinStream);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow i wasn't able to use the same stream for stdout and stdin. one would work and the other would break.
I have to separate them into two streams.

AttachStderr: true,
AttachStdin: true,
OpenStdin: true,
StdinOnce: true,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the configuration here is followed by inspecting the existing dst connector container settings.

@@ -24,7 +24,7 @@ async function main(): Promise<void> {
// Create temporary directory, load state file, write config and catalog to files
context.tmpDir = createTmpDir();
generateDstStreamPrefix(cfg);
loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
cfg.stateFile = loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return the state file location.
note: this is not the state file in tmp folder.

if the user provides one already, this function will return the same value.

i might refactor this later - separate the internal config usage from the user input config. in place update it's a big confusing. feel like it would be harder to maintain

process.stdout.write(`${line}\n`);
} else {
logger.info(formatDstMsg(data));
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output non-state data to stdout.

@@ -0,0 +1,51 @@
{"state":{"data":{"format":"base64/gzip","data":"H4sIAAAAAAAAA6uuBQBDv6ajAgAAAA=="}},"type":"STATE","redactedConfig":{"api_key":"REDACTED","models_filter":["org_Team"],"graphql_api":"v2","graph":"faros","result_model":"Flat","api_url":"https://dev.api.faros.ai"},"sourceType":"faros-graphql","sourceVersion":"0.12.5","record":{"stream":"myfarosgraphqlsrc__faros_graphql__"}}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume this doesn't have sensitive data?

"graph": "jennie-test",
"graphql_api": "v2",
"api_url": "https://dev.api.faros.ai",
"api_key": "test"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a template that will be copied during it test. the api_key will be replaced in some tests

@jeniii jeniii marked this pull request as ready for review February 3, 2025 19:36
});

try {
await pipelineAsync(rl, transform, outputStream);
Copy link
Collaborator Author

@jeniii jeniii Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think closing output stream when readline closes actually causing some race condition -> the output isn't able to fully written out.

update to use pipeline and transform. this should avoid the race condition. confirmed it works by adding sleep 5s in function processSrcDataByLine to make the writing out much slower than reading.

@jeniii jeniii merged commit 0d1da4b into main Feb 4, 2025
6 checks passed
@jeniii jeniii deleted the jg/dst-conn branch February 4, 2025 18:16
@github-actions github-actions bot locked and limited conversation to collaborators Feb 4, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants