Skip to content

Commit d0cbcc5

Browse files
Added API to retrieve expanded golden records for UID list and optimized backup logic
1 parent 75595fb commit d0cbcc5

File tree

4 files changed

+82
-13
lines changed

4 files changed

+82
-13
lines changed

JeMPI_Apps/JeMPI_BackupRestoreAPI/src/main/java/org/jembi/jempi/backuprestoreapi/Ask.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.jembi.jempi.shared.models.ApiModels;
1010
import org.jembi.jempi.shared.models.RestoreGoldenRecords;
1111

12+
import java.util.List;
1213
import java.util.concurrent.CompletionStage;
1314

1415
public final class Ask {
@@ -42,6 +43,18 @@ static CompletionStage<BackEnd.GetExpandedGoldenRecordResponse> getExpandedGolde
4243
return stage.thenApply(response -> response);
4344
}
4445

46+
static CompletionStage<BackEnd.GetExpandedGoldenRecordsResponse> getExpandedGoldenRecords(
47+
final ActorSystem<Void> actorSystem,
48+
final ActorRef<BackEnd.Event> backEnd,
49+
final List<String> gidList) {
50+
CompletionStage<BackEnd.GetExpandedGoldenRecordsResponse> stage = AskPattern
51+
.ask(backEnd,
52+
replyTo -> new BackEnd.GetExpandedGoldenRecordsRequest(replyTo, gidList),
53+
java.time.Duration.ofSeconds(GlobalConstants.TIMEOUT_DGRAPH_QUERY_SECS),
54+
actorSystem.scheduler());
55+
return stage.thenApply(response -> response);
56+
}
57+
4558
static CompletionStage<BackEnd.PostGoldenRecordResponse> postGoldenRecord(
4659
final ActorSystem<Void> actorSystem,
4760
final ActorRef<BackEnd.Event> backEnd,

JeMPI_Apps/JeMPI_BackupRestoreAPI/src/main/java/org/jembi/jempi/backuprestoreapi/BackEnd.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.jembi.jempi.libmpi.MpiServiceError;
1313
import org.jembi.jempi.shared.models.*;
1414

15+
import java.util.Collections;
1516
import java.util.List;
1617

1718
public final class BackEnd extends AbstractBehavior<BackEnd.Event> {
@@ -103,6 +104,7 @@ public Receive<Event> actor() {
103104
ReceiveBuilder<Event> builder = newReceiveBuilder();
104105
return builder.onMessage(GetGidsAllRequest.class, this::getGidsAllHandler)
105106
.onMessage(GetExpandedGoldenRecordRequest.class, this::getExpandedGoldenRecordHandler)
107+
.onMessage(GetExpandedGoldenRecordsRequest.class, this::getExpandedGoldenRecordsHandler)
106108
.onMessage(PostGoldenRecordRequest.class, this::postGoldenRecordRequestHandler)
107109
.build();
108110
}
@@ -135,6 +137,26 @@ private Behavior<Event> getExpandedGoldenRecordHandler(final GetExpandedGoldenRe
135137
return Behaviors.same();
136138
}
137139

140+
private Behavior<Event> getExpandedGoldenRecordsHandler(final GetExpandedGoldenRecordsRequest request) {
141+
List<ExpandedGoldenRecord> goldenRecords = null;
142+
try {
143+
goldenRecords = libMPI.findExpandedGoldenRecords(request.goldenIds);
144+
} catch (Exception exception) {
145+
LOGGER.error("libMPI.findExpandedGoldenRecords failed for goldenIds: {} with error: {}",
146+
request.goldenIds,
147+
exception.getMessage());
148+
}
149+
150+
if (goldenRecords == null) {
151+
request.replyTo.tell(new GetExpandedGoldenRecordsResponse(Either.left(new MpiServiceError.GoldenIdDoesNotExistError(
152+
"Golden Records do not exist",
153+
Collections.singletonList(request.goldenIds).toString()))));
154+
} else {
155+
request.replyTo.tell(new GetExpandedGoldenRecordsResponse(Either.right(goldenRecords)));
156+
}
157+
return Behaviors.same();
158+
}
159+
138160
private Behavior<Event> postGoldenRecordRequestHandler(final PostGoldenRecordRequest request) {
139161
String goldenRecords = null;
140162
try {
@@ -169,6 +191,13 @@ public record GetExpandedGoldenRecordResponse(Either<MpiGeneralError, ExpandedGo
169191
implements EventResponse {
170192
}
171193

194+
public record GetExpandedGoldenRecordsRequest(
195+
ActorRef<GetExpandedGoldenRecordsResponse> replyTo,
196+
List<String> goldenIds) implements Event { }
197+
198+
public record GetExpandedGoldenRecordsResponse(
199+
Either<MpiGeneralError, List<ExpandedGoldenRecord>> expandedGoldenRecords) implements EventResponse { }
200+
172201
public record PostGoldenRecordRequest(
173202
ActorRef<PostGoldenRecordResponse> replyTo,
174203
RestoreGoldenRecords goldenRecord) implements Event {

JeMPI_Apps/JeMPI_BackupRestoreAPI/src/main/java/org/jembi/jempi/backuprestoreapi/Routes.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,38 @@ private static Route getExpandedGoldenRecord(
9191
}));
9292
}
9393

94+
private static Route getExpandedGoldenRecordsForUidList(
95+
final ActorSystem<Void> actorSystem,
96+
final ActorRef<BackEnd.Event> backEnd) {
97+
return entity(Jackson.unmarshaller(ApiModels.ApiExpandedGoldenRecordsParameterList.class),
98+
request -> onComplete(Ask.getExpandedGoldenRecords(actorSystem, backEnd, request.uidList()),
99+
result -> {
100+
if (!result.isSuccess()) {
101+
final var e = result.failed().get();
102+
LOGGER.error(e.getLocalizedMessage(), e);
103+
return mapError(new MpiServiceError.InternalError(e.getLocalizedMessage()));
104+
}
105+
return result.get()
106+
.expandedGoldenRecords()
107+
.mapLeft(MapError::mapError)
108+
.fold(error -> error,
109+
expandedGoldenRecords -> complete(StatusCodes.OK,
110+
expandedGoldenRecords.stream()
111+
.map(ApiModels.ApiExpandedGoldenRecord::fromExpandedGoldenRecord)
112+
.toList(),
113+
JSON_MARSHALLER));
114+
}));
115+
}
116+
94117
public static Route createCoreAPIRoutes(
95118
final ActorSystem<Void> actorSystem,
96119
final ActorRef<BackEnd.Event> backEnd
97120
) {
98121
return concat(post(() -> concat(
99122
path(GlobalConstants.SEGMENT_POST_EXPANDED_GOLDEN_RECORD,
100123
() -> Routes.getExpandedGoldenRecord(actorSystem, backEnd)),
124+
path(GlobalConstants.SEGMENT_POST_EXPANDED_GOLDEN_RECORDS_FOR_UID_LIST,
125+
() -> Routes.getExpandedGoldenRecordsForUidList(actorSystem, backEnd)),
101126
path(GlobalConstants.SEGMENT_POST_GOLDEN_RECORD_RESTORE,
102127
() -> Routes.postGoldenRecord(actorSystem, backEnd))
103128
)),

devops/linux/docker/backup_restore/dgraph-backup.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ def create_folder_if_not_exists(folder_path):
2222

2323
# Function to fetch data for a single ID
2424
def fetch_data_for_id(gid):
25-
get_expanded_golden_record = f'http://{host}:{port}/JeMPI/expandedGoldenRecord'
26-
payload = json.dumps({"gid": gid})
25+
get_expanded_golden_record = f'http://{host}:{port}/JeMPI/expandedGoldenRecords'
26+
payload = json.dumps({"uidList": gid})
2727
headers = {'Content-Type': 'application/json'}
2828
response = requests.post(get_expanded_golden_record, headers=headers, data=payload)
29-
if response.status_code == 200:
30-
return response.json()
31-
else:
32-
print(f'Error fetching data for ID {gid}: {response.status_code}')
33-
return None
29+
return response.json() if response.status_code == 200 else None
30+
31+
def chunks(lst, n):
32+
for i in range(0, len(lst), n):
33+
yield lst[i:i + n]
3434

3535
# Function to backup Dgraph data
3636
def backup_dgraph_data():
@@ -40,20 +40,22 @@ def backup_dgraph_data():
4040
if response.status_code == 200:
4141
new_golden_records = response.json()
4242
backup_data = []
43-
for gid in new_golden_records.get("records", []):
44-
try:
45-
golden_record_data = fetch_data_for_id(gid)
46-
if golden_record_data:
43+
gids = new_golden_records.get("records")
44+
chunk_size = 100
45+
for gid_chunk in chunks(gids, chunk_size):
46+
golden_records_data = fetch_data_for_id(gid_chunk)
47+
if golden_records_data:
48+
for golden_record_data in golden_records_data:
4749
backup_data.append(golden_record_data)
48-
except Exception as e:
49-
print(f'Error processing ID {gid}: {e}')
50+
5051
file_name = f'dgraph_backup_{current_datetime}.json'
5152
print(f'Total {str(len(backup_data))} Golden records backed up.')
5253
backup_path_folder = create_backup_json(backup_data, file_name)
5354
print(f'All data saved to {backup_path_folder + "/" + file_name}')
5455
else:
5556
print('Failed to retrieve list of IDs from the API')
5657

58+
5759
def create_backup_json(backup_data, file_name):
5860
backup_path_folder = os.path.join(backup_path, current_datetime)
5961
create_folder_if_not_exists(backup_path_folder)

0 commit comments

Comments
 (0)