Skip to content

Commit 7725e54

Browse files
feat(LENS-1466): add support for partial updates in the UpdateStreamService (#127)
* Added partial update support for update streaming service * Formatted using spotless * Addressed PR comments
1 parent a6dd522 commit 7725e54

14 files changed

+927
-12
lines changed

samples/UpdateStreamDocuments.java

+17
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,23 @@ public static void main(String[] args) throws IOException, InterruptedException,
3434
DeleteDocument document3 = new DeleteDocument("https://my.document3.uri");
3535
updateStreamService.delete(document3);
3636

37+
PartialUpdateDocument document4 = new PartialUpdateDocument("https://my.document4.uri", PartialUpdateOperator.FIELD_VALUE_REPLACE, "title", "My new title");
38+
updateStreamService.addPartialUpdate(document4);
39+
40+
PartialUpdateDocument document5 = new PartialUpdateDocument("https://my.document5.uri", PartialUpdateOperator.DICTIONARY_PUT, "dictionaryAttribute", new HashMap<>() {{
41+
put("newkey", "newvalue");
42+
}});
43+
updateStreamService.addPartialUpdate(document5);
44+
45+
PartialUpdateDocument document6 = new PartialUpdateDocument("https://my.document6.uri", PartialUpdateOperator.ARRAY_APPEND, "arrayAttribute", new String[]{"newValue"});
46+
updateStreamService.addPartialUpdate(document6);
47+
48+
PartialUpdateDocument document7 = new PartialUpdateDocument("https://my.document7.uri", PartialUpdateOperator.ARRAY_REMOVE, "arrayAttribute", new String[]{"oldValue"});
49+
updateStreamService.addPartialUpdate(document7);
50+
51+
PartialUpdateDocument document8 = new PartialUpdateDocument("https://my.document8.uri", PartialUpdateOperator.DICIONARY_REMOVE, "dictionaryAttribute", "oldkey");
52+
updateStreamService.addPartialUpdate(document8);
53+
3754
updateStreamService.close();
3855
}
3956
}

src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
/** Represents a queue for uploading documents using a specified upload strategy */
99
class DocumentUploadQueue {
1010
private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class);
11-
private final UploadStrategy uploader;
12-
private final int maxQueueSize = 5 * 1024 * 1024;
13-
private ArrayList<DocumentBuilder> documentToAddList;
14-
private ArrayList<DeleteDocument> documentToDeleteList;
15-
private int size;
11+
protected final UploadStrategy uploader;
12+
protected final int maxQueueSize = 5 * 1024 * 1024;
13+
protected ArrayList<DocumentBuilder> documentToAddList;
14+
protected ArrayList<DeleteDocument> documentToDeleteList;
15+
protected int size;
1616

1717
/**
1818
* Constructs a new DocumentUploadQueue object with a default maximum queue size limit of 5MB.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonObject;
5+
import java.util.Map;
6+
7+
public class PartialUpdateDocument {
8+
9+
/** The documentId of the document. */
10+
public String documentId;
11+
12+
/** The operator of the document. */
13+
public PartialUpdateOperator operator;
14+
15+
/** The field to update. */
16+
public String field;
17+
18+
/** The value of the field to be updated. */
19+
public Object value;
20+
21+
public JsonObject marshalJsonObject() {
22+
return new Gson().toJsonTree(this).getAsJsonObject();
23+
}
24+
25+
/**
26+
* Creates a new PartialUpdateDocument. The type of the value provided is constrained by the
27+
* operator.
28+
*
29+
* <ul>
30+
* <li>PartialUpdateOperator.ARRAY_APPEND: value must be an array
31+
* <li>PartialUpdateOperator.ARRAY_REMOVE: value must be an array
32+
* <li>PartialUpdateOperator.FIELD_VALUE_REPLACE: value can be any type
33+
* <li>PartialUpdateOperator.DICTIONARY_PUT: value must be a Map
34+
* <li>PartialUpdateOperator.DICTIONARY_REMOVE: value must be a String or an Array
35+
* </ul>
36+
*
37+
* @param documentId The id of the document.
38+
* @param operator The operator to use.
39+
* @param field The field to update.
40+
* @param value The value to update the field with.
41+
*/
42+
public PartialUpdateDocument(
43+
String documentId, PartialUpdateOperator operator, String field, Object value) {
44+
if (operator == null) throw new IllegalArgumentException("Operator cannot be null");
45+
if (field == null) throw new IllegalArgumentException("Field cannot be null");
46+
if (documentId == null) throw new IllegalArgumentException("DocumentId cannot be null");
47+
48+
this.documentId = documentId;
49+
this.operator = operator;
50+
this.field = field;
51+
52+
switch (operator) {
53+
case ARRAYAPPEND:
54+
case ARRAYREMOVE:
55+
if (!value.getClass().isArray())
56+
throw new IllegalArgumentException("Value must be an array for operator " + operator);
57+
break;
58+
case FIELDVALUEREPLACE:
59+
break;
60+
case DICTIONARYPUT:
61+
if (!(value instanceof Map<?, ?>))
62+
throw new IllegalArgumentException("Value must be a Map for operator " + operator);
63+
break;
64+
case DICTIONARYREMOVE:
65+
if (!(value instanceof String) && !value.getClass().isArray())
66+
throw new IllegalArgumentException(
67+
"Value must be a String or an Array for operator " + operator);
68+
break;
69+
default:
70+
throw new IllegalArgumentException("Invalid operator " + operator);
71+
}
72+
this.value = value;
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.coveo.pushapiclient;
2+
3+
public enum PartialUpdateOperator {
4+
ARRAYAPPEND {
5+
public String toString() {
6+
return "arrayAppend";
7+
}
8+
},
9+
ARRAYREMOVE {
10+
public String toString() {
11+
return "arrayRemove";
12+
}
13+
},
14+
FIELDVALUEREPLACE {
15+
public String toString() {
16+
return "fieldValueReplace";
17+
}
18+
},
19+
DICTIONARYPUT {
20+
public String toString() {
21+
return "dictionaryPut";
22+
}
23+
},
24+
DICTIONARYREMOVE {
25+
public String toString() {
26+
return "dictionaryRemove";
27+
}
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.coveo.pushapiclient;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import org.apache.logging.log4j.LogManager;
6+
import org.apache.logging.log4j.Logger;
7+
8+
public class StreamDocumentUploadQueue extends DocumentUploadQueue {
9+
10+
private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class);
11+
protected ArrayList<PartialUpdateDocument> documentToPartiallyUpdateList;
12+
13+
public StreamDocumentUploadQueue(UploadStrategy uploader) {
14+
super(uploader);
15+
this.documentToPartiallyUpdateList = new ArrayList<>();
16+
}
17+
18+
/**
19+
* Flushes the accumulated documents by applying the upload strategy.
20+
*
21+
* @throws IOException If an I/O error occurs during the upload.
22+
* @throws InterruptedException If the upload process is interrupted.
23+
*/
24+
@Override
25+
public void flush() throws IOException, InterruptedException {
26+
if (this.isEmpty()) {
27+
logger.debug("Empty batch. Skipping upload");
28+
return;
29+
}
30+
// TODO: LENS-871: support concurrent requests
31+
StreamUpdate stream = this.getStream();
32+
logger.info("Uploading document Stream");
33+
this.uploader.apply(stream);
34+
35+
this.size = 0;
36+
this.documentToAddList.clear();
37+
this.documentToDeleteList.clear();
38+
this.documentToPartiallyUpdateList.clear();
39+
}
40+
41+
/**
42+
* Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds
43+
* the maximum content length. See {@link PartialUpdateDocument#flush}.
44+
*
45+
* @param document The document to be deleted from the index.
46+
* @throws IOException If an I/O error occurs during the upload.
47+
* @throws InterruptedException If the upload process is interrupted.
48+
*/
49+
public void add(PartialUpdateDocument document) throws IOException, InterruptedException {
50+
if (document == null) {
51+
return;
52+
}
53+
54+
final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
55+
if (this.size + sizeOfDoc >= this.maxQueueSize) {
56+
this.flush();
57+
}
58+
documentToPartiallyUpdateList.add(document);
59+
logger.info("Adding document to batch: " + document.documentId);
60+
this.size += sizeOfDoc;
61+
}
62+
63+
public StreamUpdate getStream() {
64+
return new StreamUpdate(
65+
new ArrayList<>(this.documentToAddList),
66+
new ArrayList<>(this.documentToDeleteList),
67+
new ArrayList<>(this.documentToPartiallyUpdateList));
68+
}
69+
70+
@Override
71+
public BatchUpdate getBatch() {
72+
throw new UnsupportedOperationException("StreamDocumentUploadQueue does not support getBatch");
73+
}
74+
75+
@Override
76+
public boolean isEmpty() {
77+
return super.isEmpty() && documentToPartiallyUpdateList.isEmpty();
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.google.gson.JsonObject;
4+
import java.util.List;
5+
6+
public class StreamUpdate extends BatchUpdate {
7+
8+
private final List<PartialUpdateDocument> partialUpdate;
9+
10+
public StreamUpdate(
11+
List<DocumentBuilder> addOrUpdate,
12+
List<DeleteDocument> delete,
13+
List<PartialUpdateDocument> partialUpdate) {
14+
super(addOrUpdate, delete);
15+
this.partialUpdate = partialUpdate;
16+
}
17+
18+
@Override
19+
public StreamUpdateRecord marshal() {
20+
return new StreamUpdateRecord(
21+
this.getAddOrUpdate().stream()
22+
.map(DocumentBuilder::marshalJsonObject)
23+
.toArray(JsonObject[]::new),
24+
this.getDelete().stream().map(DeleteDocument::marshalJsonObject).toArray(JsonObject[]::new),
25+
this.partialUpdate.stream()
26+
.map(PartialUpdateDocument::marshalJsonObject)
27+
.toArray(JsonObject[]::new));
28+
}
29+
30+
public List<PartialUpdateDocument> getPartialUpdate() {
31+
return partialUpdate;
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "StreamUpdate["
37+
+ "addOrUpdate="
38+
+ getAddOrUpdate()
39+
+ ", delete="
40+
+ getDelete()
41+
+ ", partialUpdate="
42+
+ partialUpdate
43+
+ ']';
44+
}
45+
46+
@Override
47+
public boolean equals(Object obj) {
48+
if (this == obj) return true;
49+
if (obj == null || getClass() != obj.getClass()) return false;
50+
StreamUpdate that = (StreamUpdate) obj;
51+
return getAddOrUpdate().equals(that.getAddOrUpdate())
52+
&& getDelete().equals(that.getDelete())
53+
&& partialUpdate.equals(that.partialUpdate);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return super.hashCode() + partialUpdate.hashCode();
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.google.gson.JsonObject;
4+
import java.util.Arrays;
5+
6+
public class StreamUpdateRecord extends BatchUpdateRecord {
7+
8+
private final JsonObject[] partialUpdate;
9+
10+
public StreamUpdateRecord(
11+
JsonObject[] addOrUpdate, JsonObject[] delete, JsonObject[] partialUpdate) {
12+
super(addOrUpdate, delete);
13+
this.partialUpdate = partialUpdate;
14+
}
15+
16+
public JsonObject[] getPartialUpdate() {
17+
return partialUpdate;
18+
}
19+
20+
@Override
21+
public String toString() {
22+
return "StreamUpdateRecord["
23+
+ "addOrUpdate="
24+
+ Arrays.toString(this.getAddOrUpdate())
25+
+ ", delete="
26+
+ Arrays.toString(this.getDelete())
27+
+ ", partialUpdate="
28+
+ Arrays.toString(partialUpdate)
29+
+ ']';
30+
}
31+
32+
@Override
33+
public boolean equals(Object obj) {
34+
if (this == obj) return true;
35+
if (obj == null || getClass() != obj.getClass()) return false;
36+
StreamUpdateRecord that = (StreamUpdateRecord) obj;
37+
return Arrays.equals(this.getAddOrUpdate(), that.getAddOrUpdate())
38+
&& Arrays.equals(this.getDelete(), that.getDelete())
39+
&& Arrays.equals(partialUpdate, that.partialUpdate);
40+
}
41+
42+
@Override
43+
public int hashCode() {
44+
int result = super.hashCode();
45+
result = 31 * result + Arrays.hashCode(partialUpdate);
46+
return result;
47+
}
48+
}

0 commit comments

Comments
 (0)