Skip to content

Commit 7439648

Browse files
Merge branch 'dev' into CU-86c0dgwg9_API-query-endpoint
2 parents eafa50b + 76b7f24 commit 7439648

File tree

14 files changed

+151
-44
lines changed

14 files changed

+151
-44
lines changed

JeMPI_Apps/JeMPI_AsyncReceiver/src/main/java/org/jembi/jempi/async_receiver/Main.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.jembi.jempi.shared.serdes.JsonPojoSerializer;
1616
import org.jembi.jempi.shared.utils.AppUtils;
1717
import org.apache.commons.codec.language.Soundex;
18+
import org.jetbrains.annotations.NotNull;
1819

1920
import java.io.IOException;
2021
import java.nio.file.*;
@@ -153,28 +154,7 @@ private void apacheReadCSV(
153154

154155
for (CSVRecord csvRecord : csvParser) {
155156
final var patientRecord = demographicData(csvRecord);
156-
String givenName = patientRecord.fields.stream()
157-
.filter(field -> "given_name".equals(field.ccTag()))
158-
.map(DemographicData.DemographicField::value)
159-
.findFirst()
160-
.orElse("");
161-
String familyName = patientRecord.fields.stream()
162-
.filter(field -> "family_name".equals(field.ccTag()))
163-
.map(DemographicData.DemographicField::value)
164-
.findFirst()
165-
.orElse("");
166-
167-
String partitionKey = "";
168-
if (!givenName.isEmpty()) {
169-
partitionKey += new Soundex().soundex(givenName);
170-
}
171-
if (!familyName.isEmpty()) {
172-
partitionKey += new Soundex().soundex(familyName);
173-
}
174-
if (givenName.isEmpty() && familyName.isEmpty()) {
175-
partitionKey += "Unknown";
176-
}
177-
LOGGER.info("Using Kafka topic/partition: " + partitionKey);
157+
String partitionKey = generateKafkaPartitionKey(patientRecord);
178158

179159
final var interactionEnvelop = new InteractionEnvelop(InteractionEnvelop.ContentType.BATCH_INTERACTION,
180160
tag,
@@ -203,6 +183,30 @@ private void apacheReadCSV(
203183
}
204184
}
205185

186+
private static @NotNull String generateKafkaPartitionKey(final DemographicData patientRecord) {
187+
StringBuilder partitionKey = new StringBuilder();
188+
189+
for (String field : FIELDS_CONFIG.fieldsForKafkaKeyGen) {
190+
String value = patientRecord.fields.stream()
191+
.filter(fieldData -> field.equals(fieldData.ccTag()))
192+
.map(DemographicData.DemographicField::value)
193+
.findFirst()
194+
.orElse("");
195+
196+
if (!value.isEmpty()) {
197+
partitionKey.append(new Soundex().soundex(value));
198+
}
199+
}
200+
201+
if (partitionKey.length() == 0) {
202+
partitionKey.append("Unknown");
203+
}
204+
205+
String partitionKeyStr = partitionKey.toString();
206+
LOGGER.info("Using Kafka topic/partition: {}", partitionKeyStr);
207+
return partitionKeyStr;
208+
}
209+
206210
private String updateStan(
207211
final String stanDate,
208212
final int recCount) {

JeMPI_Apps/JeMPI_EM_Scala/src/main/scala/org/jembi/jempi/em/config/Config.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ case class Rules(
6161
)
6262

6363
case class Config(
64-
auxInteractionFields: Option[Array[auxField]],
65-
auxGoldenRecordFields: Option[Array[auxField]],
66-
additionalNodes: Option[Array[AdditionalNode]],
67-
demographicFields: Array[DemographicField],
68-
rules: Rules
64+
auxInteractionFields: Option[Array[auxField]],
65+
auxGoldenRecordFields: Option[Array[auxField]],
66+
additionalNodes: Option[Array[AdditionalNode]],
67+
demographicFields: Array[DemographicField],
68+
rules: Rules,
69+
fieldsForKafkaKeyGen: Option[List[String]],
70+
nameFieldsForNotificationDisplay: Option[List[String]]
6971
)

JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/config/FieldsConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public final class FieldsConfig {
3131
public final List<AuxField> userAuxInteractionFields;
3232
public final List<AuxField> userAuxGoldenRecordFields;
3333
public final List<DemographicField> demographicFields;
34+
public final List<String> fieldsForKafkaKeyGen;
35+
public final List<String> nameFieldsForNotificationDisplay;
3436
public final List<AdditionalNode> additionalNodes;
3537

3638
public FieldsConfig(final JsonConfig jsonConfig) {
@@ -88,6 +90,13 @@ public FieldsConfig(final JsonConfig jsonConfig) {
8890
);
8991
optionalInteractionAuxIdIdx = auxInteractionAuxIdIdx[0];
9092

93+
fieldsForKafkaKeyGen = jsonConfig.fieldsForKafkaKeyGen() != null
94+
? List.copyOf(jsonConfig.fieldsForKafkaKeyGen())
95+
: List.of();
96+
97+
nameFieldsForNotificationDisplay = jsonConfig.nameFieldsForNotificationDisplay() != null
98+
? List.copyOf(jsonConfig.nameFieldsForNotificationDisplay())
99+
: List.of();
91100

92101
demographicFields = jsonConfig.demographicFields()
93102
.stream()

JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/config/input/JsonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public record JsonConfig(
1919
@JsonProperty("auxGoldenRecordFields") List<AuxGoldenRecordField> auxGoldenRecordFields,
2020
List<AdditionalNode> additionalNodes,
2121
List<DemographicField> demographicFields,
22+
@JsonProperty("fieldsForKafkaKeyGen") List<String> fieldsForKafkaKeyGen,
23+
@JsonProperty("nameFieldsForNotificationDisplay") List<String> nameFieldsForNotificationDisplay,
2224
Rules rules) {
2325

2426
private static final Logger LOGGER = LogManager.getLogger(JsonConfig.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.jembi.jempi.shared.config.input;
2+
import com.fasterxml.jackson.annotation.JsonInclude;
3+
4+
import java.util.List;
5+
6+
@JsonInclude(JsonInclude.Include.NON_NULL)
7+
public record NameFieldsForKafkaKeyGen(
8+
List<String> nameFieldsForKafkaKeyGen
9+
) {
10+
}

JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/ConfigurationModel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public record Configuration(
1616
List<Field> auxGoldenRecordFields,
1717
List<Node> additionalNodes,
1818
List<DemographicField> demographicFields,
19+
List<String> fieldsForKafkaKeyGen,
20+
List<String> nameFieldsForNotificationDisplay,
1921
Rules rules) {
2022
}
2123

JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,17 @@ private static boolean helperUpdateGoldenRecordField(
9595

9696
private static String patientName(final Interaction interaction) {
9797
var patientRecord = interaction.demographicData();
98-
String givenName = patientRecord.fields.stream()
99-
.filter(field -> "given_name".equals(field.ccTag()))
100-
.map(DemographicData.DemographicField::value)
101-
.findFirst()
102-
.orElse("");
103-
String familyName = patientRecord.fields.stream()
104-
.filter(field -> "family_name".equals(field.ccTag()))
105-
.map(DemographicData.DemographicField::value)
106-
.findFirst()
107-
.orElse("");
108-
return (givenName + " " + familyName).trim();
98+
Map<String, String> fieldMap = patientRecord.fields.stream()
99+
.collect(Collectors.toMap(DemographicData.DemographicField::ccTag,
100+
DemographicData.DemographicField::value));
101+
102+
String patientDisplayName = FIELDS_CONFIG.nameFieldsForNotificationDisplay.stream()
103+
.map(fieldName -> fieldMap.getOrDefault(fieldName,
104+
""))
105+
.filter(StringUtils::isNotBlank)
106+
.collect(Collectors.joining(" "))
107+
.trim();
108+
return patientDisplayName;
109109
}
110110

111111
/**

devops/linux/docker/backup_restore/dgraph-restore-api.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,47 @@
99

1010
host = env_vars['NODE1_IP']
1111
port = "50010"
12+
endpoint = "http://" + host + ":6080"
13+
new_lease_value = 50000000000000
14+
15+
def get_current_lease(endpoint):
16+
"""Fetches the current maxLeasedUid from the Dgraph Zero /state endpoint."""
17+
try:
18+
response = requests.get(f"{endpoint}/state")
19+
response.raise_for_status() # Raise an exception for HTTP errors
20+
data = response.json()
21+
# Extracting the maxLeasedUid from the JSON response
22+
max_leased_uid = data.get('maxUID', None)
23+
if max_leased_uid is not None:
24+
print(f"Current maxLeasedUid: {max_leased_uid}")
25+
return max_leased_uid
26+
else:
27+
print("Error: maxLeasedUid not found in the response.")
28+
return None
29+
except requests.RequestException as e:
30+
print(f"Error fetching current lease: {e}")
31+
return None
32+
33+
def increase_lease(endpoint, new_value):
34+
"""Increases the lease value if it's below the new_value."""
35+
current_value = get_current_lease(endpoint)
36+
current_value = int(current_value)
37+
if current_value is None:
38+
print("Unable to fetch the current lease value. Exiting.")
39+
return
40+
41+
if current_value < new_value:
42+
try:
43+
# Assuming a POST request with form data to increase lease size
44+
data = {'what': 'uids', 'num': new_value}
45+
response = requests.get(f"{endpoint}/assign", params=data)
46+
response.raise_for_status() # Raise an exception for HTTP errors
47+
print(f"Lease increased to {new_value}.")
48+
except requests.RequestException as e:
49+
print(f"Error increasing lease: {e}")
50+
print(f"Response Content: {response.content}") # Debug print to see response content
51+
else:
52+
print(f"Current lease ({current_value}) is already greater than or equal to {new_value}. No action taken.")
1253

1354

1455
def main(json_file):
@@ -54,16 +95,15 @@ def convert_datetime_format(date_str):
5495
continue
5596
else:
5697
return date_str # If the format is not correct, return the original string
57-
98+
5899
output_format = "%Y-%m-%dT%H:%M:%S.%fZ"
59100
output_str = dt.strftime(output_format)
60101
output_str = output_str[:26] + 'Z' # Keep only the first 2 decimal places of the seconds part
61102
return output_str
62103

63104
def process_json_data(golden_records):
64-
105+
increase_lease(endpoint, new_lease_value)
65106
for golden_record in golden_records:
66-
67107
golden_record['goldenRecord']['uniqueGoldenRecordData']['auxDateCreated'] = convert_datetime_format(golden_record['goldenRecord']['uniqueGoldenRecordData']['auxDateCreated'])
68108
for interaction in golden_record['interactionsWithScore']:
69109
interaction['interaction']['uniqueInteractionData']['auxDateCreated'] = convert_datetime_format(
@@ -73,7 +113,7 @@ def process_json_data(golden_records):
73113
response = send_golden_record_to_api(golden_record)
74114
if response:
75115
print("After Restore Golden ID--"+ response.text)
76-
116+
77117
def send_golden_record_to_api(golden_record_payload):
78118
get_expanded_golden_record_url = f'http://{host}:{port}/JeMPI/restoreGoldenRecord'
79119
# Normalize date fields in the payload

devops/linux/docker/data-config/config-reference-auto-generate-fields.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@
135135
"indexGoldenRecord": "@index(exact)"
136136
}
137137
],
138+
"fieldsForKafkaKeyGen": [
139+
"given_name",
140+
"family_name"
141+
],
142+
"nameFieldsForNotificationDisplay": [
143+
"given_name",
144+
"family_name"
145+
],
138146
"rules": {
139147
"link": {
140148
"deterministic": [

devops/linux/docker/data-config/config-reference-link-d-validate-dp-match-dp.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@
222222
"matchMetaData": null
223223
}
224224
],
225+
"fieldsForKafkaKeyGen": [
226+
"given_name",
227+
"family_name"
228+
],
229+
"nameFieldsForNotificationDisplay": [
230+
"given_name",
231+
"family_name"
232+
],
225233
"rules": {
226234
"link": {
227235
"deterministic": [

0 commit comments

Comments
 (0)