Skip to content

Commit 51dcf91

Browse files
perf(ingest/dataplex): stream list_entries / search_entries instead of eager list() (#17730)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent b321ca4 commit 51dcf91

1 file changed

Lines changed: 32 additions & 36 deletions

File tree

metadata-ingestion/src/datahub/ingestion/source/dataplex/dataplex_entries.py

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -260,20 +260,20 @@ def _list_entry_stubs(self, project_id: str, location: str) -> List[str]:
260260

261261
request = dataplex_v1.ListEntriesRequest(parent=entry_group.name)
262262
with PerfTimer() as timer:
263-
entries = list(self.catalog_client.list_entries(request=request))
264-
self.report.report_catalog_api_call("list_entries", timer.elapsed_seconds())
265-
266-
for entry in entries:
267-
logger.info(f"Listing entry {entry.name} from group {entry_group.name}")
268-
logger.info(f"ListEntries payload: {entry}")
269-
if self._report_and_should_process_entry(entry):
270-
entry_names.append(entry.name)
271-
else:
272-
logger.debug(
273-
"Skipping entry stub for filtered entry %s from group %s",
274-
entry.name,
275-
entry_group.name,
263+
for entry in self.catalog_client.list_entries(request=request):
264+
logger.info(
265+
f"Listing entry {entry.name} from group {entry_group.name}"
276266
)
267+
logger.info(f"ListEntries payload: {entry}")
268+
if self._report_and_should_process_entry(entry):
269+
entry_names.append(entry.name)
270+
else:
271+
logger.debug(
272+
"Skipping entry stub for filtered entry %s from group %s",
273+
entry.name,
274+
entry_group.name,
275+
)
276+
self.report.report_catalog_api_call("list_entries", timer.elapsed_seconds())
277277
return entry_names
278278

279279
def _fetch_entry_detail(self, entry_name: str) -> dataplex_v1.Entry:
@@ -347,9 +347,25 @@ def _process_spanner_entries(
347347
)
348348
try:
349349
with PerfTimer() as timer:
350-
search_results = list(
351-
self.catalog_client.search_entries(request=request)
352-
)
350+
for result in self.catalog_client.search_entries(request=request):
351+
logger.info(f"SearchEntries result payload: {result}")
352+
dataplex_entry = getattr(result, "dataplex_entry", None)
353+
if dataplex_entry is None:
354+
continue
355+
if not self._report_and_should_process_entry(dataplex_entry):
356+
logger.debug(
357+
"Skipping filtered spanner entry %s from search_entries",
358+
dataplex_entry.name,
359+
)
360+
continue
361+
try:
362+
detailed_entry = self._fetch_entry_detail(dataplex_entry.name)
363+
except Exception as exc:
364+
logger.warning(
365+
f"Failed to fetch detail for Spanner entry {dataplex_entry.name}: {exc}"
366+
)
367+
continue
368+
yield from self._build_entities_for_entry(detailed_entry, location)
353369
self.report.report_catalog_api_call(
354370
"search_entries", timer.elapsed_seconds()
355371
)
@@ -359,26 +375,6 @@ def _process_spanner_entries(
359375
)
360376
return
361377

362-
for result in search_results:
363-
logger.info(f"SearchEntries result payload: {result}")
364-
dataplex_entry = getattr(result, "dataplex_entry", None)
365-
if dataplex_entry is None:
366-
continue
367-
if not self._report_and_should_process_entry(dataplex_entry):
368-
logger.debug(
369-
"Skipping filtered spanner entry %s from search_entries",
370-
dataplex_entry.name,
371-
)
372-
continue
373-
try:
374-
detailed_entry = self._fetch_entry_detail(dataplex_entry.name)
375-
except Exception as exc:
376-
logger.warning(
377-
f"Failed to fetch detail for Spanner entry {dataplex_entry.name}: {exc}"
378-
)
379-
continue
380-
yield from self._build_entities_for_entry(detailed_entry, location)
381-
382378
def _track_entry_for_lineage(
383379
self, dataplex_location: str, entry: dataplex_v1.Entry
384380
) -> None:

0 commit comments

Comments
 (0)