Skip to content

Commit 06bee0d

Browse files
HaebukKadeRyu
andauthored
feat(ingest/mongodb) re-order aggregation logic (#12428)
Co-authored-by: Kade Ryu <[email protected]>
1 parent 8773ff5 commit 06bee0d

File tree

3 files changed

+4617
-16
lines changed

3 files changed

+4617
-16
lines changed

metadata-ingestion/src/datahub/ingestion/source/mongodb.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,26 +219,27 @@ def construct_schema_pymongo(
219219
"""
220220

221221
aggregations: List[Dict] = []
222+
223+
# The order of the aggregations impacts execution time. By setting the sample/limit aggregation first,
224+
# the subsequent aggregations process a much smaller dataset, improving performance.
225+
if sample_size:
226+
if use_random_sampling:
227+
aggregations.append({"$sample": {"size": sample_size}})
228+
else:
229+
aggregations.append({"$limit": sample_size})
230+
222231
if should_add_document_size_filter:
223232
doc_size_field = "temporary_doc_size_field"
224233
# create a temporary field to store the size of the document. filter on it and then remove it.
225-
aggregations = [
226-
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
227-
{"$match": {doc_size_field: {"$lt": max_document_size}}},
228-
{"$project": {doc_size_field: 0}},
229-
]
230-
if use_random_sampling:
231-
# get sample documents in collection
232-
if sample_size:
233-
aggregations.append({"$sample": {"size": sample_size}})
234-
documents = collection.aggregate(
235-
aggregations,
236-
allowDiskUse=True,
234+
aggregations.extend(
235+
[
236+
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
237+
{"$match": {doc_size_field: {"$lt": max_document_size}}},
238+
{"$project": {doc_size_field: 0}},
239+
]
237240
)
238-
else:
239-
if sample_size:
240-
aggregations.append({"$limit": sample_size})
241-
documents = collection.aggregate(aggregations, allowDiskUse=True)
241+
242+
documents = collection.aggregate(aggregations, allowDiskUse=True)
242243

243244
return construct_schema(list(documents), delimiter)
244245

0 commit comments

Comments
 (0)