Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to pipe multiple times #4

Open
jetibest opened this issue Mar 29, 2023 · 0 comments
Open

Unable to pipe multiple times #4

jetibest opened this issue Mar 29, 2023 · 0 comments

Comments

@jetibest
Copy link

When trying to pipe the connection.transaction.message_stream to a target stream more than once, an error is thrown.

Example code in a queue plugin (without error handling for simplicity):

// in any plugin, possibly multiple times:
var txn = connection.transaction;
['/tmp/test-a', '/tmp/test-b', '/tmp/test-c'].forEach(file =>
{
    txn.message_stream.pipe(fs.createWriteStream(file));
    txn.message_stream.pause();
}

// finally, in the queue plugin:
txn.message_stream.resume();

This throws an error "Cannot pipe while currently piping". Browsing the code, it looks like this message_stream cannot be used in place of any regular readable stream?

The first idea for a workaround would be to listen to 'data', 'error', and 'end' events, instead of using the message_stream.pipe() function. But that does not work out of the box either, I've tried multiple ways, but it only got me in a deeper mess.

Currently, I am using the following workaround instead (with error handling):

var txn = connection.transaction;
var multistream = txn.notes.message_stream || (txn.notes.message_stream = new stream.PassThrough());
['/tmp/test-a', '/tmp/test-b', '/tmp/test-c'].forEach(file =>
{
    var target = fs.createWriteStream(file);
    target.on('error', err => { throw err; });
    multistream.on('error', err => target.destroy(err));
    multistream.pipe(target);
});

await new Promise((resolve, reject) =>
{
    multistream.on('error', reject);
    multistream.on('finish', resolve);
    
    txn.message_stream.on('error', err => multistream.destroy(err));
    txn.message_stream.pipe(multistream);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant