Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increased memory usage with dlt > 1.3.0 and filesystem module during extract phase #2221

Open
trin94 opened this issue Jan 16, 2025 · 18 comments
Assignees
Labels
bug Something isn't working question Further information is requested

Comments

@trin94
Copy link

trin94 commented Jan 16, 2025

dlt version

1.3.0

Describe the problem

We're observing memory issues when upgrading from dlt 1.3.0 to dlt 1.5.0.

We use the filesystem module to load data (less than 50.000k files, each file < 10 MB, most of them < 1 MB) into a postgres instance.

  • On 1.3.0, we observed < 1 GB of memory usage during the extract phase (extraction phase takes 16 minutes).
  • On 1.4.1 and 1.5.0, our container job gets killed because it exceeded 8 GB of memory usage after 30 minutes.

The only difference between these runs is the dlt version.

Expected behavior

Memory usage stays roughly the same ~ 1GB but at least < 2GB during the extraction phase.

Steps to reproduce

Probably, a similar file set needs to be used. Unfortunately, I cannot share the files as it's production data.

Operating system

Linux

Runtime environment

Other

Python version

3.12

dlt data source

filesystem

dlt destination

Postgres

Other deployment details

We're using a Container App Job on Azure.

Additional information

Python:

def run_dlt_pipeline():
    pipeline = dlt.pipeline(
        pipeline_name="pipeline_name",
        destination="postgres",
        dataset_name="dataset_schema",
    )
    pipeline.run(raw_data())


@dlt.source(name="source_name")
def raw_data(base_path="source_name"):
    base_path = base_path.rstrip("/")

    for dataset in [data_set_1, data_set_2, data_set_3, data_set_4]:
        filesystem_config = {
            "file_glob": f"{base_path}/{dataset.name}/**/*.jsonl",
        }

        pipe = filesystem(**filesystem_config)
        pipe.apply_hints(incremental=dlt.sources.incremental("modification_date"))

        pipe |= read_jsonl()
        pipe.apply_hints(write_disposition="merge", primary_key=primary_keys_nested)

        yield pipe.with_name(f"pipe_{dataset.name}")

Config:

progress = "log"

[runtime]
log_level = "WARNING"  # the system log level of dlt
dlthub_telemetry = false

[sources.filesystem]
bucket_url = "az://raw-data"

# set buffer size for extract and normalize stages
[data_writer]
buffer_max_items = 500

# only for the extract stage - for all sources
[sources.data_writer]
file_max_items = 10000
file_max_bytes = 10000000

[extract]
workers = 4  # default value: 5
max_parallel_items = 10  # default value: 20

[normalize]
workers = 2
start_method = "spawn"

[load]
workers = 4

Kinda offtopic:

  • GitHub issue template does not allow selecting Python 3.12 that we're using
@trin94 trin94 changed the title Increased memory usage with dlt > 1.3.0 and filesystem module Increased memory usage with dlt > 1.3.0 and filesystem module during extract phase Jan 16, 2025
@rudolfix
Copy link
Collaborator

@trin94 thanks for this report! we didn't touch the filesystem source code and code of extract pipeline so it is pretty hard to tell what could be the root cause. We had exactly this problem with azure blob storage but with load phase: after parallel uploads got introduced adlfs was opening hundreds of parallel buffers by default killing processes with OOM
https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#azure-blob-storage (see max_concurrency)

could you tell me if there are any differences for fsspec and adlfs dependencies in your 1.3.0 and 1.4.1 environments?

@rudolfix rudolfix self-assigned this Jan 17, 2025
@rudolfix rudolfix added the question Further information is requested label Jan 17, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Jan 17, 2025
@rudolfix
Copy link
Collaborator

Another thing you could try is to set parallelism to 1 and the queueing strategy to fifo

[extract]
workers = 1  # default value: 5
max_parallel_items = 1  # default value: 20
next_item_mode="fifo"

is this reducing memory consumption? (only one resources will be extracted at the same time)

@trin94
Copy link
Author

trin94 commented Jan 21, 2025

Hi @rudolfix,

thank you a lot for your response!

could you tell me if there are any differences for fsspec and adlfs dependencies in your 1.3.0 and 1.4.1 environments?

No, there are no differences. We upgraded all dependencies to their most recent versions:

dlt                       1.3.0
adlfs                     2024.12.0
fsspec                    2024.12.0

dlt                       1.4.1
adlfs                     2024.12.0
fsspec                    2024.12.0

dlt                       1.5.0
fsspec                    2024.12.0
adlfs                     2024.12.0

With dlt 1.3.0 it is working as expected, using 1.4.1 we're observing increased memory consumption.

Another thing you could try is to set parallelism to 1 and the queueing strategy to fifo [...] is this reducing memory consumption? (only one resources will be extracted at the same time)

Unfortunately, this did not help.

Image

@rudolfix
Copy link
Collaborator

@trin94 thanks this is very helpful (and worrying!). we'll investigate that with high prio

@rudolfix rudolfix added the bug Something isn't working label Jan 27, 2025
@rudolfix rudolfix assigned sh-rp and unassigned rudolfix Jan 27, 2025
@rudolfix
Copy link
Collaborator

@trin94 when doing code diff, I can see only one thing that got added to 1.4.1:

"skip_instance_cache": True,

automatically to defaults. you could try to disable this change with a config ie.
setting DESTINATION__FILESYSTEM__KWARGS to {"skip_instance_cache": false}

https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#adding-additional-configuration

@sh-rp
Copy link
Collaborator

sh-rp commented Jan 27, 2025

@trin94 I will try to reproduce this here with a smaller generated dataset. I'd like you to confirm that you are sure that the pipeline fails during the extract phase and not during any of the other phases such as normalization or loading? You could just run pipeline.extract() in the container to make sure. Also could you tell me how large the dataset is you are loading? I will try to see the increased mem usage on a smaller one I create here.

@sh-rp
Copy link
Collaborator

sh-rp commented Jan 27, 2025

@trin94 Can you run your pipeline locally (only the extract phase) and have all the same settings but have psutil installed in the environment? You will get a memory printout with your logs. Can you confirm that you see this problem there too? I have now extracted a 1.2gb jsonl dataset I generated locally and see no memory increase. I will test doing the same from an azure bucket next.

@trin94
Copy link
Author

trin94 commented Jan 27, 2025

Hey @sh-rp,

Thank you for looking into this!

I'd like you to confirm that you are sure that the pipeline fails during the extract phase and not during any of the other phases such as normalization or loading?

Yes, it's the extract phase. We've been using psutil in our pipeline on Azure to print debug info and we can observe memory usage growing over time.

Also could you tell me how large the dataset is you are loading?

It's around 28 GB in size.

Can you run your pipeline locally (only the extract phase) and have all the same settings but have psutil installed in the environment?

Locally, I don't see any RAM being eaten up as well. I tested it on a subset (~12 GB in size) and it stayed below 1.4 GB using the parallel configuration in the first message. It also cleaned properly when it transitioned to the normalize phase dropping to below 1 GB.

@rudolfix

you could try to disable this change with a config ie.
setting DESTINATION__FILESYSTEM__KWARGS to {"skip_instance_cache": false}

I will check it later (maybe even tomorrow) and report back if it fixes the issue.

Thank you a lot!

Best regards
Elias

@sh-rp
Copy link
Collaborator

sh-rp commented Jan 27, 2025

I also did a test with the same files on azure and also cannot see an increased or increasing memory footprint. For reference the script to create the files:

names = ["dave", "anton", "marcin"]
details = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".split(" ")

if __name__ == "__main__":
    print("Creating test data...")
    entry_count = 0
    for file_number in range(0, 300):
        file_data = []
        with open(f"test_data/data_{file_number}.jsonl", "w") as f:
            for entry in range(0, 10000):
                entry_count += 1
                entry = {
                    "id": entry_count,
                    "name": random.choice(names),
                    "details": " ".join([word for word in random.choices(details, k=random.randint(1, 100))]),
                    "created_at": datetime.now().isoformat()
                }
                json_string = json.dumps(entry)
                f.write(json_string)
                f.write("\n")
        print(f"Created file {file_number}")
    print("Test data created successfully")

And the simplified pipeline script:

if __name__ == "__main__":
    
    dlt.secrets["credentials.azure_storage_account_name"] = "dltdata"
    dlt.secrets["credentials.azure_storage_account_key"] = ""

    @dlt.source(name="source_name")
    def raw_data():

        bucket_files = readers(bucket_url="az://dlt-ci-test-bucket", file_glob="davestests/*.jsonl")

        pipe = bucket_files.read_jsonl()
        pipe.apply_hints(incremental=dlt.sources.incremental("created_at"))
        pipe.apply_hints(write_disposition="merge", primary_key="id")

        yield pipe
            
    pipeline = dlt.pipeline(
        pipeline_name="pipeline_name",
        destination="postgres",
        dataset_name="dataset_schema",
    )

    pipeline.extract(raw_data())

@trin94
Copy link
Author

trin94 commented Jan 27, 2025

Hey,

automatically to defaults. you could try to disable this change with a config ie.
setting DESTINATION__FILESYSTEM__KWARGS to {"skip_instance_cache": false}

Unfortunately, this did not reduce memory consumption.

For reference the script to create the files: [...]

I just checked one of our test files. A single line in our jsonl file (picked at random) amounts to roughly 35 kb.

I've attached full logs of all dependencies that were installed into our docker containers:
dlt-1.3.0.txt
dlt-1.5.0.txt

I also ran with 1.3.0 (and initial config) again which peaked at 1.7 GB (during extract). Only the source files have changed. Unsurprisingly, they have the biggest impact on memory consumption.

@sh-rp
Copy link
Collaborator

sh-rp commented Jan 28, 2025

Alright, some more questions and asks to help me reproduce this:

  • Are all entries in your jsonl file in the same structure? So do they have the same column names and subtables etc? And if not, are they slightly or wildly different?
  • Could you provide one example jsonl line? You can change all the values to something anonymized, you should keep the rough length of the values and definately the value types in tact.

@trin94
Copy link
Author

trin94 commented Jan 28, 2025

Hey @sh-rp,

Are all entries in your jsonl file in the same structure? So do they have the same column names and subtables etc? And if not, are they slightly or wildly different?

There are common fields but overall they are wildly different.

Could you provide one example jsonl line? You can change all the values to something anonymized, you should keep the rough length of the values and definately the value types in tact.

Yes. I've attached one. If you need more (and different) files, please tell me 🙂

anonym-single-line.json

@rudolfix
Copy link
Collaborator

rudolfix commented Feb 3, 2025

@trin94 could you also compare orjson versions? your json is decently nested and we already had issues with orjson leaking memory. we were pretty close to dump it but then issues got fixed.

OK I see that orjson are the same. still you can try swapping parser:

https://dlthub.com/docs/reference/performance#use-built-in-json-parser - you can try switching to simplejson

@rudolfix rudolfix assigned rudolfix and unassigned sh-rp Feb 3, 2025
@jjrugu
Copy link

jjrugu commented Feb 4, 2025

Hi @trin94 ,

I'm experiencing a similar issue. I will try to provide more details at the end of the week. Do you also experience a significant processing speed difference?

In my case, I'm using filesystem on Azure blob storage as source and also as destination. I experience also memory growth but I also experience a significant speed difference (version 1.3.0 being significantly faster than 1.6.0). I will try to prepare some tests later this week in case it's relevant for this issue.

@trin94
Copy link
Author

trin94 commented Feb 5, 2025

Hi,

@rudolfix

OK I see that orjson are the same. still you can try swapping parser:
https://dlthub.com/docs/reference/performance#use-built-in-json-parser - you can try switching to simplejson

I've tried setting DLT_USE_JSON=simplejson but that did not reduce memory consumption.
I've checked with 1.5.0 and 1.6.1 - no difference.

@jjrugu

Do you also experience a significant processing speed difference?

I can confirm that processing is noticeably slower on newer versions but I cannot tell whether it's connected to the increased memory consumption we're observing.

@rudolfix
Copy link
Collaborator

@trin94 as I mentioned we could not identify any obvious problems when comparing diffs of 1.3.0 and 1.4.1. If you still are willing to investigate - you could remove the incremental in here:

pipe.apply_hints(incremental=dlt.sources.incremental("modification_date"))

and run a full load as a test. we changed how the incremental is instantiated and when boundary deduplication happens but that should rather decrease the memory consumption.

if you could run it it would be very helpful

@rudolfix
Copy link
Collaborator

@trin94 also please double check if the problem happens in extract phase. This should be clear from logs. Also you could add progress="log" to your pipelines to regularly dump stats to stdout (including mem usage if psutil is installed)

@trin94
Copy link
Author

trin94 commented Feb 12, 2025

Hi @rudolfix,

you could remove the incremental in here:

pipe.apply_hints(incremental=dlt.sources.incremental("modification_date"))

That had no effect, unfortunately.

also please double check if the problem happens in extract phase. This should be clear from logs. Also you could add progress="log" to your pipelines to regularly dump stats to stdout (including mem usage if psutil is installed)

That's exactly what we do 😃

Thank you so much for looking into this! Upgrading isn’t a priority right now, so we’re fine with v1.3.0.
In a few weeks, we’ll have more time to look into the issue and will report back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

4 participants