Skip to content

Commit a249873

Browse files
author
Jinxin Ma
authored
fix: replace parallelStream with stream in backfill mae && add logging (#333)
* use stream && add logging * update log
1 parent d08c27d commit a249873

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityAgnosticResource.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,13 @@ public Task<BackfillItem[]> backfillMAE(@Nonnull BackfillItem[] backfillRequests
8383
log.warn("LocalDAO not found for entity type: " + entityType);
8484
continue;
8585
}
86-
final List<BackfillItem> items = entityTypeToRequestsMap.get(entityType);
87-
backfillResults.addAll(items.parallelStream()
88-
// immutable dao, should be thread-safe
86+
final List<BackfillItem> itemsToBackfill = entityTypeToRequestsMap.get(entityType);
87+
final List<BackfillItem> backfilledItems = itemsToBackfill.stream()
8988
.map(item -> backfillMAEForUrn(item.getUrn(), item.getAspects(), backfillMode, dao.get()).orElse(null))
9089
.filter(Objects::nonNull)
91-
.collect(Collectors.toList()));
90+
.collect(Collectors.toList());
91+
log.info(String.format("Given requests: %s, backfill results: %s", itemsToBackfill, backfilledItems));
92+
backfillResults.addAll(backfilledItems);
9293
}
9394
return backfillResults.toArray(new BackfillItem[0]); // insert order is not guaranteed the same as input
9495
});
@@ -97,9 +98,11 @@ public Task<BackfillItem[]> backfillMAE(@Nonnull BackfillItem[] backfillRequests
9798
protected Optional<BackfillItem> backfillMAEForUrn(@Nonnull String urn, @Nonnull List<String> aspectSet,
9899
@Nonnull BackfillMode backfillMode, @Nonnull BaseLocalDAO<? extends UnionTemplate, ? extends Urn> dao) {
99100
try {
101+
log.info(String.format("Attempt to backfill MAE for urn: %s, aspectSet: %s, backfillMode: %s", urn, aspectSet, backfillMode));
100102
// set aspectSetToUse to null if empty to backfill all aspects
101103
Set<String> aspectSetToUse = aspectSet.isEmpty() ? null : new HashSet<>(aspectSet);
102104
Set<String> backfilledAspects = dao.backfillMAE(backfillMode, aspectSetToUse, Collections.singleton(urn)).get(urn);
105+
log.info(String.format("Backfilled aspects: %s, for urn: %s, aspectSet: %s, backfillMode: %s", backfilledAspects, urn, aspectSet, backfillMode));
103106
if (backfilledAspects == null || backfilledAspects.isEmpty()) {
104107
return Optional.empty();
105108
}

restli-resources/src/test/java/com/linkedin/metadata/restli/BaseEntityAgnosticResourceTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.metadata.restli;
22

3+
import com.google.common.collect.ImmutableList;
34
import com.google.common.collect.ImmutableMap;
45
import com.google.common.collect.ImmutableSet;
56
import com.linkedin.data.template.StringArray;
@@ -15,6 +16,7 @@
1516
import com.linkedin.testing.urn.BarUrn;
1617
import com.linkedin.testing.urn.FooUrn;
1718
import java.util.Arrays;
19+
import java.util.Collection;
1820
import java.util.Collections;
1921
import java.util.HashSet;
2022
import java.util.List;
@@ -144,6 +146,26 @@ public void testBackfillMAEEmptyBackfillResult() {
144146
verify(_fooLocalDAO, times(3)).backfillMAE(any(), any(), any());
145147
}
146148

149+
@Test
150+
public void testBackfillMAEFilterEmptyAspectUrn() {
151+
TestResource testResource = new TestResource();
152+
Set<String> urnSet = ImmutableSet.of(makeFooUrn(1).toString(), makeFooUrn(2).toString());
153+
when(_fooLocalDAO.backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, null, Collections.singleton(makeFooUrn(1).toString())))
154+
.thenReturn(ImmutableMap.of(makeFooUrn(1).toString(), multiAspectsSet));
155+
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(urnSet, null), IngestionMode.BACKFILL));
156+
assertEqualBackfillItemArrays(result, provideBackfillItems(ImmutableSet.of(makeFooUrn(1).toString()), multiAspectsSet));
157+
}
158+
159+
@Test
160+
public void testBackfillMAEDuplicateUrn() {
161+
TestResource testResource = new TestResource();
162+
List<String> urnList = ImmutableList.of(makeFooUrn(1).toString(), makeFooUrn(1).toString());
163+
when(_fooLocalDAO.backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, null, Collections.singleton(makeFooUrn(1).toString())))
164+
.thenReturn(ImmutableMap.of(makeFooUrn(1).toString(), multiAspectsSet));
165+
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(urnList, null), IngestionMode.BACKFILL));
166+
assertEqualBackfillItemArrays(result, provideBackfillItems(ImmutableList.of(makeFooUrn(1).toString(), makeFooUrn(1).toString()), multiAspectsSet));
167+
}
168+
147169
@Test
148170
public void testBackfillMAENoSuchEntity() {
149171
TestResource testResource = new TestResource();
@@ -186,7 +208,7 @@ public void testBackfillMAEException() {
186208
assertEqualBackfillItemArrays(result, expectedItems);
187209
}
188210

189-
private BackfillItem[] provideBackfillItems(Set<String> urnSet, Set<String> aspects) {
211+
private BackfillItem[] provideBackfillItems(Collection<String> urnSet, Set<String> aspects) {
190212
return urnSet.stream().map(urn -> {
191213
BackfillItem item = new BackfillItem();
192214
item.setUrn(urn);

0 commit comments

Comments
 (0)