Skip to content

Commit 5a2bf1d

Browse files
wt0530githubgxll
authored andcommitted
[fix][dingo-exec] Fix optimistic transaction duplicate key
1 parent 99a8e48 commit 5a2bf1d

File tree

1 file changed

+180
-8
lines changed

1 file changed

+180
-8
lines changed

dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java

Lines changed: 180 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,180 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
285285
StoreInstance kvStore = Services.KV_STORE.getInstance(context.getIndexId(), partId);
286286
KeyValue indexKv = kvStore.txnGet(txnId.seq, keyValue.getKey(), param.getLockTimeOut());
287287
if (indexKv != null && indexKv.getValue() != null && index.isUnique()) {
288-
throw new DuplicateEntryException("Duplicate entry "
289-
+ TransactionUtil.duplicateEntryKey(tableId, keyValue.getKey(), txnId) + " for key 'PRIMARY'");
288+
byte[] txnIdByte = txnId.encode();
289+
byte[] tableIdByte = param.getTable().tableId.encode();
290+
byte[] partIdByte = context.getTablePartId().encode();
291+
int len = txnIdByte.length + tableIdByte.length + partIdByte.length;
292+
byte[] insertKey = ByteUtils.encode(
293+
CommonId.CommonType.TXN_CACHE_DATA,
294+
primaryKv.getKey(),
295+
Op.PUTIFABSENT.getCode(),
296+
len,
297+
txnIdByte,
298+
tableIdByte,
299+
partIdByte);
300+
byte[] deleteKey = Arrays.copyOf(insertKey, insertKey.length);
301+
deleteKey[deleteKey.length - 2] = (byte) Op.DELETE.getCode();
302+
byte[] updateKey = Arrays.copyOf(insertKey, insertKey.length);
303+
updateKey[updateKey.length - 2] = (byte) Op.PUT.getCode();
304+
List<byte[]> bytes = new ArrayList<>(3);
305+
bytes.add(insertKey);
306+
bytes.add(deleteKey);
307+
bytes.add(updateKey);
308+
List<KeyValue> keyValues = localStore.get(bytes);
309+
if (keyValues != null && !keyValues.isEmpty()) {
310+
KeyValue value = keyValues.get(0);
311+
byte[] oldKey = value.getKey();
312+
if (oldKey[oldKey.length - 2] == Op.PUTIFABSENT.getCode()
313+
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()) {
314+
localStore.delete(oldKey);
315+
Object[] oldTuple = codec.decode(indexKv);
316+
Object[] primaryKey = indexTable.getColumnIndices2(param.getTable().keyColumns()).stream().map(i -> oldTuple[i]).toArray();
317+
Object[] tableTuple = new Object[param.getTable().getColumns().size()];
318+
int[] keyMapping = param.getTable().keyMapping().getMappings();
319+
for (int i = 0; i < keyMapping.length; i++) {
320+
tableTuple[keyMapping[i]] = primaryKey[i];
321+
}
322+
byte[] key = param.getCodec().encodeKey(tableTuple);
323+
CodecService.getDefault().setId(key, context.getTablePartId());
324+
KeyValue oldKv = store.txnGet(txnId.seq, key, param.getLockTimeOut());
325+
long count = param.getTable().getColumnIndices2(indexTable.keyColumns()).stream().filter(i -> param.getUpdateMapping().findIdx(i) >= 0).count();
326+
Object[] tempTuple = param.getCodec().decode(oldKv);
327+
if (duplicate && count == 0) {
328+
// primary table
329+
schema = param.getSchema();
330+
codec = param.getCodec();
331+
tableId = param.getTable().tableId;
332+
partId = context.getTablePartId();
333+
indexTable = null;
334+
index = null;
335+
tuple = tempTuple;
336+
}
337+
context.setDuplicateKey(true);
338+
if (count > 0) {
339+
tuple = columnIndices.stream().map(i -> {
340+
if (i == -1) {
341+
return null;
342+
}
343+
return tempTuple[i];
344+
}).toArray();
345+
CodecService.getDefault().setId(indexKv.getKey(), partId.domain);
346+
byte[] indexKey = indexKv.getKey();
347+
byte[] indexTxnIdByte = txnId.encode();
348+
byte[] indexTableIdByte = tableId.encode();
349+
byte[] indexPartIdByte = partId.encode();
350+
int indexLen = indexTxnIdByte.length + indexTableIdByte.length + indexPartIdByte.length;
351+
byte[] indexDeleteKey = ByteUtils.encode(
352+
CommonId.CommonType.TXN_CACHE_DATA,
353+
indexKey,
354+
Op.DELETE.getCode(),
355+
indexLen,
356+
indexTxnIdByte,
357+
indexTableIdByte,
358+
indexPartIdByte);
359+
localStore.put(new KeyValue(indexDeleteKey, indexKv.getValue()));
360+
361+
start = insert(
362+
context,
363+
tuple,
364+
vertex,
365+
schema,
366+
codec,
367+
partId,
368+
txnId,
369+
tableId,
370+
profile,
371+
start,
372+
param,
373+
localStore,
374+
finalTuple,
375+
indexTable,
376+
isVector,
377+
isDocument,
378+
index,
379+
true);
380+
381+
// primary table
382+
schema = param.getSchema();
383+
codec = param.getCodec();
384+
tableId = param.getTable().tableId;
385+
partId = context.getTablePartId();
386+
indexTable = null;
387+
index = null;
388+
tuple = tempTuple;
389+
context.setIndexId(null);
390+
391+
start = insert(
392+
context,
393+
tuple,
394+
vertex,
395+
schema,
396+
codec,
397+
partId,
398+
txnId,
399+
tableId,
400+
profile,
401+
start,
402+
param,
403+
localStore,
404+
finalTuple,
405+
indexTable,
406+
isVector,
407+
isDocument,
408+
index,
409+
false);
410+
profile.step5(start);
411+
return true;
412+
}
413+
}
414+
} else {
415+
return true;
416+
}
290417
}
291418
}
292419
}
293420
}
421+
start = insert(
422+
context,
423+
tuple,
424+
vertex,
425+
schema,
426+
codec,
427+
partId,
428+
txnId,
429+
tableId,
430+
profile,
431+
start,
432+
param,
433+
localStore,
434+
primaryOldTuple,
435+
indexTable,
436+
isVector,
437+
isDocument,
438+
index,
439+
false);
440+
profile.step5(start);
441+
return true;
442+
}
443+
444+
private static long insert(Context context,
445+
Object[] tuple,
446+
Vertex vertex,
447+
DingoType schema,
448+
KeyValueCodec codec,
449+
CommonId partId,
450+
CommonId txnId,
451+
CommonId tableId,
452+
InsertProfile profile,
453+
long start,
454+
TxnPartInsertParam param,
455+
StoreInstance localStore,
456+
Object[] primaryOldTuple,
457+
Table indexTable,
458+
boolean isVector,
459+
boolean isDocument,
460+
IndexTable index,
461+
boolean uniqueDel) {
294462
if (context.isWithoutPrimary()) {
295463
schema.setCheckFieldCount(false);
296464
DingoType dingoType = codec.getDingoType();
@@ -552,7 +720,10 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
552720
KeyValue value = keyValues.get(0);
553721
byte[] oldKey = value.getKey();
554722
if (oldKey[oldKey.length - 2] == Op.PUTIFABSENT.getCode()
555-
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()) {
723+
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()
724+
|| (context.isDuplicateKey() && index != null
725+
&& index.isUnique() && oldKey[oldKey.length - 2] == Op.DELETE.getCode())
726+
) {
556727
if (param.getUpdateMapping() != null && param.getUpdates() != null) {
557728
pair = generateNewKv(
558729
primaryOldTuple,
@@ -626,7 +797,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
626797
start = System.currentTimeMillis();
627798
KeyValue insertUpKv = Optional.mapOrGet(pair, Pair::getKey, () -> null);
628799
if (insertUpKv != null && insertUpKv.getValue() != null) {
629-
if (index != null && index.isUnique()) {
800+
if (index != null && index.isUnique() && !uniqueDel) {
630801
keyValue.setKey(
631802
ByteUtils.getKeyByOp(CommonId.CommonType.TXN_CACHE_CHECK_DATA, Op.CheckNotExists, insertUpKv.getKey())
632803
);
@@ -638,7 +809,9 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
638809
} else {
639810
keyValue.setKey(insertKey);
640811
}
641-
localStore.delete(deleteKey);
812+
if (!uniqueDel) {
813+
localStore.delete(deleteKey);
814+
}
642815
profile.step4(start);
643816
// for optimistic transaction for update
644817
byte[] rollbackKey = ByteUtils.getKeyByOp(CommonId.CommonType.TXN_CACHE_DATA, Op.ROLLBACK, deleteKey);
@@ -674,8 +847,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
674847
}
675848
}
676849
}
677-
profile.step5(start);
678-
return true;
850+
return start;
679851
}
680852

681853
private static Pair<KeyValue, Long> generateNewKv(Object[] tuple,
@@ -736,7 +908,7 @@ private static Pair<KeyValue, Long> generateNewKv(Object[] tuple,
736908
newValue = sqlExpr == null ? newTuple[mapping.get(i)] : sqlExpr.eval(tuple);
737909
}
738910

739-
if (newValue == null || newValue.equals("NULL")) {
911+
if (newValue == null || String.valueOf(newValue).equalsIgnoreCase("NULL")) {
740912
newValue = null;
741913
}
742914
int index = mapping.get(i);

0 commit comments

Comments
 (0)