File tree 2 files changed +9
-6
lines changed
JeMPI_AsyncReceiver/src/main/java/org/jembi/jempi/async_receiver
JeMPI_Linker/src/main/java/org/jembi/jempi/linker
2 files changed +9
-6
lines changed Original file line number Diff line number Diff line change @@ -164,13 +164,15 @@ private void apacheReadCSV(
164
164
.findFirst ()
165
165
.orElse ("" );
166
166
167
- String partitionKey ;
167
+ String partitionKey = "" ;
168
168
if (!givenName .isEmpty ()) {
169
- partitionKey = new Soundex ().soundex (givenName );
170
- } else if (!familyName .isEmpty ()) {
171
- partitionKey = new Soundex ().soundex (familyName );
172
- } else {
173
- partitionKey = "Unknown" ;
169
+ partitionKey += new Soundex ().soundex (givenName );
170
+ }
171
+ if (!familyName .isEmpty ()) {
172
+ partitionKey += new Soundex ().soundex (familyName );
173
+ }
174
+ if (givenName .isEmpty () && familyName .isEmpty ()) {
175
+ partitionKey += "Unknown" ;
174
176
}
175
177
LOGGER .info ("Kafka topic/partition for patient: " + partitionKey );
176
178
Original file line number Diff line number Diff line change @@ -95,6 +95,7 @@ private void closeInteractionStream() {
95
95
96
96
private Properties loadConfig () {
97
97
final Properties props = new Properties ();
98
+ props .put (StreamsConfig .PROCESSING_GUARANTEE_CONFIG , StreamsConfig .EXACTLY_ONCE_V2 );
98
99
props .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , AppConfig .KAFKA_BOOTSTRAP_SERVERS );
99
100
props .put (StreamsConfig .APPLICATION_ID_CONFIG , AppConfig .KAFKA_APPLICATION_ID_INTERACTIONS + topic );
100
101
return props ;
You can’t perform that action at this time.
0 commit comments