Skip to content
This repository has been archived by the owner on Nov 25, 2023. It is now read-only.

Add option to use write_batch #7

Open
shriv opened this issue Jun 6, 2023 · 5 comments
Open

Add option to use write_batch #7

shriv opened this issue Jun 6, 2023 · 5 comments

Comments

@shriv
Copy link

shriv commented Jun 6, 2023

Hi there,
I'm a new user of meltano and singer style taps and targets! I'm currently working on POC pipeline to move data from an oracle database to s3 parquet files..! I really appreciate this s3 parquet version you have written. :-)

One issue at the moment is that I find that the pipeline is running rather slowly. I can't figure out where the bottleneck is. Is it possible to include a batch write feature that would use write_batch() instead of write_table()? If the pipeline is still slow, I'll then see if I can add a batch query feature in the tap itself..!

Let me know if you're busy and I can try to add this myself - though it will probably take me much longer! :-)

Thanks a lot..!

@jkausti
Copy link
Owner

jkausti commented Jun 7, 2023

Hi @shriv, thanks for raising an issue!

I tried to quickly look into the differences between write_batch() and write_table(), however i didn't really get a full understanding of if one is faster than the other, but they should at least produce the same results. I will have time during the weekend to look into it a bit more and do some testing, but if you want to do your own tests, you can fork the repo and just replace the code in the process_batch function to look something like this:

def process_batch(self, context: dict) -> None:
    """Process the batch."""

    record_batch = pa.RecordBatch.from_pylist(context["records"])
    self.pa_schema = record_batch.schema
    writer = self.pq_writer
    writer.write_batch(record_batch)

I haven't tested it so I can't guarantee that it will work, but if it works and is indeed faster, then let me know or you can raise a PR with the modified code. Otherwise I will do some testing of my own during the weekend.

@shriv
Copy link
Author

shriv commented Jun 8, 2023

Thanks @jkausti !! :-) I looked up write_batch again and perhaps I was mistaken in what it does. The functionality I'm after is writing to multiple files (with some pre-determined size e.g. 100k rows) instead of appending to a single table. I'll do some more research on this including adding your code suggestion next week :-)

@jkausti
Copy link
Owner

jkausti commented Jun 10, 2023

Ok I get your point. I think the only option of writing to multiple files would be to use write_dataset(), however I'm not sure the ParquetWriter -class supports that and the implementation would look quite different.

This might also be a limitation of how the Singer protocol inside Meltano is passing data between the tap and target. Are you able to test with other targets and to see if you experience similar slowness with them?

@shriv
Copy link
Author

shriv commented Jun 14, 2023

Good suggestion @jkausti ! I'll do some tests with the local target-parquet. I notice that it even has the option of adjusting the file size so can write to multiple files so I can check if there is any performance improvement..

@shriv
Copy link
Author

shriv commented Jun 15, 2023

I did a couple of simple tests with target-parquet and I definitely see a performance improvement with batch write. I also tried to adjust the tap stream with a cursor_array_size so that it does fewer round trips with larger values. This didn't seem to matter with the local tests. There is a 50% reduction in pipeline speed for a dataset with ~16M rows. There seems to be no obvious performance improvement with batch size of 1M rows or 2M rows.

Looking at the code of target-parquet, it seems like the batch write could be enabled without needing write_dataset though I'm no expert in taps! :-)

N.B I am on LAN with ~ 64.3Mbps upload speed and 180Mbps download speed.

  • 1M cursor array size and saving to S3 parquet as single file
    • time meltano run tap-oracle target-s3-parquet
    • 2444.47s user 71.09s system 115% cpu 36:09.10 total
  • 1M cursor array size and local parquet target with data written to multiple files of row size 1M.
    • time meltano run tap-oracle target-parquet
    • 1122.90s user 150.89s system 163% cpu 12:57.04 total
  • 2M cursor array size and local parquet target with data written to multiple files of row size 2M.
    • time meltano run tap-oracle target-parquet
    • 1122.62s user 268.49s system 146% cpu 15:48.33 total
  • no cursor array size and local parquet target with data written to multiple files of row size 2M.
    • time meltano run tap-oracle target-parquet
    • 1121.01s user 202.17s system 131% cpu 16:46.21 total

Hope this benchmarking helps! If you are able to enable the batch file write option, that would be quite helpful! :-)

Thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants