|
39 | 39 | public class BulkContext implements Closeable {
|
40 | 40 |
|
41 | 41 | private static final int MAX_BATCH_SIZE = 1024;
|
| 42 | + private static final int MAX_REQUEST_SIZE = 100_000_000; |
42 | 43 | private static final int RECOMMENDED_BATCH_SIZE = 256;
|
43 | 44 |
|
44 | 45 | private static final String KEY_INDEX = "_index";
|
@@ -128,17 +129,45 @@ private void update(ElasticEntity entity, boolean force) {
|
128 | 129 | return;
|
129 | 130 | }
|
130 | 131 |
|
131 |
| - commands.add(Json.createObject().set(COMMAND_INDEX, meta)); |
132 |
| - commands.add(data); |
| 132 | + ObjectNode metaCommand = Json.createObject().set(COMMAND_INDEX, meta); |
| 133 | + commitIfNeededAndAdd(metaCommand, data); |
133 | 134 | autocommit();
|
134 | 135 | }
|
135 | 136 |
|
| 137 | + /** |
| 138 | + * Commits all commands if the batch size exceeds {@link #MAX_BATCH_SIZE}. |
| 139 | + */ |
136 | 140 | private void autocommit() {
|
137 | 141 | if (commands.size() >= MAX_BATCH_SIZE) {
|
138 | 142 | commit().throwFailures();
|
139 | 143 | }
|
140 | 144 | }
|
141 | 145 |
|
| 146 | + /** |
| 147 | + * Commits queued commands if needed and adds the given nodes to the queue. |
| 148 | + * <p> |
| 149 | + * If the new nodes would exceed {@link #MAX_REQUEST_SIZE}, we commit them before adding new commands to the list. This is important |
| 150 | + * for bulk requests since Elasticsearch establishes a default limit of 100MB per request. |
| 151 | + * |
| 152 | + * @param nodes the nodes which size should be checked |
| 153 | + */ |
| 154 | + private void commitIfNeededAndAdd(ObjectNode... nodes) { |
| 155 | + int currentSize = commands.stream().map(this::calculateNodeLength).reduce(0, Integer::sum); |
| 156 | + for (ObjectNode node : nodes) { |
| 157 | + currentSize += calculateNodeLength(node); |
| 158 | + } |
| 159 | + if (currentSize >= MAX_REQUEST_SIZE) { |
| 160 | + // The given commands will exceed the maximum request size, so we commit the current queued commands |
| 161 | + // before adding new ones |
| 162 | + commit().throwFailures(); |
| 163 | + } |
| 164 | + commands.addAll(List.of(nodes)); |
| 165 | + } |
| 166 | + |
| 167 | + private int calculateNodeLength(ObjectNode node) { |
| 168 | + return node.toString().length(); |
| 169 | + } |
| 170 | + |
142 | 171 | private ObjectNode builtMetadata(ElasticEntity entity, boolean force, EntityDescriptor ed) {
|
143 | 172 | ObjectNode meta = Json.createObject();
|
144 | 173 |
|
@@ -167,7 +196,9 @@ private void delete(ElasticEntity entity, boolean force) {
|
167 | 196 | entityDescriptor.beforeDelete(entity);
|
168 | 197 |
|
169 | 198 | ObjectNode meta = builtMetadata(entity, force, entityDescriptor);
|
170 |
| - commands.add(Json.createObject().set(COMMAND_DELETE, meta)); |
| 199 | + |
| 200 | + ObjectNode metaCommand = Json.createObject().set(COMMAND_DELETE, meta); |
| 201 | + commitIfNeededAndAdd(metaCommand); |
171 | 202 | autocommit();
|
172 | 203 | }
|
173 | 204 |
|
|
0 commit comments