Skip to content

Commit cdf8e3a

Browse files
maksaskaalex-plekhanov
authored andcommitted
IGNITE-23416 Fix thin client multi-key operations unordered map/set warnings in server logs - Fixes #12128.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 09d70a4 commit cdf8e3a

File tree

13 files changed

+884
-134
lines changed

13 files changed

+884
-134
lines changed

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java

Lines changed: 111 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.ignite.internal.client.thin;
1919

2020
import java.io.IOException;
21+
import java.util.Collection;
2122
import java.util.HashMap;
2223
import java.util.LinkedHashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Set;
27+
import java.util.SortedMap;
28+
import java.util.SortedSet;
2629
import java.util.concurrent.CompletableFuture;
2730
import java.util.function.Consumer;
2831
import java.util.function.Function;
@@ -37,6 +40,7 @@
3740
import javax.cache.processor.EntryProcessor;
3841
import javax.cache.processor.EntryProcessorException;
3942
import javax.cache.processor.EntryProcessorResult;
43+
import org.apache.ignite.IgniteLogger;
4044
import org.apache.ignite.binary.BinaryObjectException;
4145
import org.apache.ignite.cache.CachePeekMode;
4246
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -71,11 +75,17 @@
7175
import org.apache.ignite.internal.util.typedef.X;
7276
import org.apache.ignite.internal.util.typedef.internal.A;
7377
import org.apache.ignite.internal.util.typedef.internal.U;
78+
import org.apache.ignite.transactions.TransactionConcurrency;
79+
import org.apache.ignite.transactions.TransactionIsolation;
7480
import org.jetbrains.annotations.Nullable;
7581

7682
import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
7783
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
7884
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
85+
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
86+
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
87+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
88+
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
7989

8090
/**
8191
* Implementation of {@link ClientCache} over TCP protocol.
@@ -123,19 +133,23 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
123133
/** JCache adapter. */
124134
private final Cache<K, V> jCacheAdapter;
125135

136+
/** */
137+
private final IgniteLogger log;
138+
126139
/** Exception thrown when a non-transactional ClientCache operation is invoked within a transaction. */
127140
public static final String NON_TRANSACTIONAL_CLIENT_CACHE_IN_TX_ERROR_MESSAGE = "Failed to invoke a " +
128141
"non-transactional ClientCache %s operation within a transaction.";
129142

130143
/** Constructor. */
131144
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
132-
ClientCacheEntryListenersRegistry lsnrsRegistry) {
133-
this(name, ch, marsh, transactions, lsnrsRegistry, false, null);
145+
ClientCacheEntryListenersRegistry lsnrsRegistry, IgniteLogger log) {
146+
this(name, ch, marsh, transactions, lsnrsRegistry, false, null, log);
134147
}
135148

136149
/** Constructor. */
137-
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
138-
ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary, ExpiryPolicy expiryPlc) {
150+
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh,
151+
TcpClientTransactions transactions, ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary,
152+
ExpiryPolicy expiryPlc, IgniteLogger log) {
139153
this.name = name;
140154
this.cacheId = ClientUtils.cacheId(name);
141155
this.ch = ch;
@@ -151,6 +165,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
151165
jCacheAdapter = new ClientJCacheAdapter<>(this);
152166

153167
this.ch.registerCacheIfCustomAffinity(this.name);
168+
169+
this.log = log;
154170
}
155171

156172
/** {@inheritDoc} */
@@ -324,6 +340,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
324340
if (keys.isEmpty())
325341
return new HashMap<>();
326342

343+
warnIfUnordered(keys, true);
344+
327345
TcpClientTransaction tx = transactions.tx();
328346

329347
return txAwareService(null, tx,
@@ -340,6 +358,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
340358
if (keys.isEmpty())
341359
return IgniteClientFutureImpl.completedFuture(new HashMap<>());
342360

361+
warnIfUnordered(keys, true);
362+
343363
TcpClientTransaction tx = transactions.tx();
344364

345365
return txAwareServiceAsync(null, tx,
@@ -357,6 +377,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
357377
if (map.isEmpty())
358378
return;
359379

380+
warnIfUnordered(map);
381+
360382
TcpClientTransaction tx = transactions.tx();
361383

362384
txAwareService(null, tx,
@@ -373,6 +395,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
373395
if (map.isEmpty())
374396
return IgniteClientFutureImpl.completedFuture(null);
375397

398+
warnIfUnordered(map);
399+
376400
TcpClientTransaction tx = transactions.tx();
377401

378402
return txAwareServiceAsync(null, tx,
@@ -523,6 +547,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
523547
if (keys.isEmpty())
524548
return;
525549

550+
warnIfUnordered(keys, false);
551+
526552
TcpClientTransaction tx = transactions.tx();
527553

528554
txAwareService(null, tx,
@@ -542,6 +568,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
542568
if (keys.isEmpty())
543569
return IgniteClientFutureImpl.completedFuture(null);
544570

571+
warnIfUnordered(keys, false);
572+
545573
TcpClientTransaction tx = transactions.tx();
546574

547575
return txAwareServiceAsync(null, tx,
@@ -931,6 +959,8 @@ else if (err != null)
931959
if (entryProc == null)
932960
throw new NullPointerException("entryProc");
933961

962+
warnIfUnordered(keys, false);
963+
934964
TcpClientTransaction tx = transactions.tx();
935965

936966
return txAwareService(null, tx,
@@ -954,6 +984,8 @@ else if (err != null)
954984
if (entryProc == null)
955985
throw new NullPointerException("entryProc");
956986

987+
warnIfUnordered(keys, false);
988+
957989
TcpClientTransaction tx = transactions.tx();
958990

959991
return txAwareServiceAsync(null, tx,
@@ -1006,12 +1038,12 @@ private <T> Map<K, EntryProcessorResult<T>> readEntryProcessorResult(PayloadInpu
10061038
/** {@inheritDoc} */
10071039
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
10081040
return keepBinary ? (ClientCache<K1, V1>)this :
1009-
new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, true, expiryPlc);
1041+
new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, true, expiryPlc, log);
10101042
}
10111043

10121044
/** {@inheritDoc} */
10131045
@Override public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc) {
1014-
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, keepBinary, expirePlc);
1046+
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, keepBinary, expirePlc, log);
10151047
}
10161048

10171049
/** {@inheritDoc} */
@@ -1616,4 +1648,77 @@ private void checkDataReplicationSupported(ProtocolContext protocolCtx)
16161648
if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
16171649
throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
16181650
}
1651+
1652+
/**
1653+
* Warns if an unordered map is used in an operation that may lead to a distributed deadlock
1654+
* during an explicit transaction.
1655+
* <p>
1656+
* This check is relevant only for explicit user-managed transactions. Implicit transactions
1657+
* (such as those started automatically by the system) are not inspected by this method.
1658+
* </p>
1659+
*
1660+
* @param m The map being used in the cache operation.
1661+
*/
1662+
protected void warnIfUnordered(Map<?, ?> m) {
1663+
if (m == null || m.size() <= 1)
1664+
return;
1665+
1666+
TcpClientTransaction tx = transactions.tx();
1667+
1668+
// Only explicit transactions are checked.
1669+
if (tx == null)
1670+
return;
1671+
1672+
if (m instanceof SortedMap)
1673+
return;
1674+
1675+
if (!canBlockTx(false, tx.concurrency(), tx.isolation()))
1676+
return;
1677+
1678+
log.warning("Unordered map " + m.getClass().getName() + " is used for putAll operation on cache " +
1679+
name + ". This can lead to a distributed deadlock. Switch to a sorted map like TreeMap instead.");
1680+
}
1681+
1682+
/**
1683+
* Warns if an unordered map is used in an operation that may lead to a distributed deadlock
1684+
* during an explicit transaction.
1685+
* <p>
1686+
* This check is relevant only for explicit user-managed transactions. Implicit transactions
1687+
* (such as those started automatically by the system) are not inspected by this method.
1688+
* </p>
1689+
*
1690+
* @param coll The collection being used in the cache operation.
1691+
* @param isGetOp {@code true} if the operation is a get (e.g., {@code getAll}).
1692+
*/
1693+
protected void warnIfUnordered(Collection<?> coll, boolean isGetOp) {
1694+
if (coll == null || coll.size() <= 1)
1695+
return;
1696+
1697+
TcpClientTransaction tx = transactions.tx();
1698+
1699+
// Only explicit transactions are checked.
1700+
if (tx == null)
1701+
return;
1702+
1703+
if (coll instanceof SortedSet)
1704+
return;
1705+
1706+
if (!canBlockTx(isGetOp, tx.concurrency(), tx.isolation()))
1707+
return;
1708+
1709+
log.warning("Unordered collection " + coll.getClass().getName() +
1710+
" is used for " + (isGetOp ? "getAll" : "") + " operation on cache " + name + ". " +
1711+
"This can lead to a distributed deadlock. Switch to a sorted set like TreeSet instead.");
1712+
}
1713+
1714+
/** */
1715+
private boolean canBlockTx(boolean isGetOp, TransactionConcurrency concurrency, TransactionIsolation isolation) {
1716+
if (concurrency == OPTIMISTIC && isolation == SERIALIZABLE)
1717+
return false;
1718+
1719+
if (isGetOp && concurrency == PESSIMISTIC && isolation == READ_COMMITTED)
1720+
return false;
1721+
1722+
return true;
1723+
}
16191724
}

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ private ClientTransaction txStart0(TransactionConcurrency concurrency, Transacti
106106
writer.writeString(lb);
107107
}
108108
},
109-
res -> new TcpClientTransaction(res.in().readInt(), res.clientChannel())
109+
res -> new TcpClientTransaction(
110+
res.in().readInt(),
111+
res.clientChannel(),
112+
concurrency == null ? txCfg.getDefaultTxConcurrency() : concurrency,
113+
isolation == null ? txCfg.getDefaultTxIsolation() : isolation
114+
)
110115
);
111116

112117
threadLocTxUid.set(tx0.txUid);
@@ -193,17 +198,30 @@ class TcpClientTransaction implements ClientTransaction {
193198
/** Client channel. */
194199
private final ClientChannel clientCh;
195200

201+
/** */
202+
private final TransactionConcurrency concurrency;
203+
204+
/** */
205+
private final TransactionIsolation isolation;
206+
196207
/** Transaction is closed. */
197208
private volatile boolean closed;
198209

199210
/**
200211
* @param id Transaction ID.
201212
* @param clientCh Client channel.
202213
*/
203-
private TcpClientTransaction(int id, ClientChannel clientCh) {
214+
private TcpClientTransaction(
215+
int id,
216+
ClientChannel clientCh,
217+
TransactionConcurrency concurrency,
218+
TransactionIsolation isolation
219+
) {
204220
txUid = txCnt.incrementAndGet();
205221
txId = id;
206222
this.clientCh = clientCh;
223+
this.concurrency = concurrency;
224+
this.isolation = isolation;
207225
}
208226

209227
/** {@inheritDoc} */
@@ -280,5 +298,15 @@ ClientChannel clientChannel() {
280298
boolean isClosed() {
281299
return closed;
282300
}
301+
302+
/** */
303+
public TransactionConcurrency concurrency() {
304+
return concurrency;
305+
}
306+
307+
/** */
308+
public TransactionIsolation isolation() {
309+
return isolation;
310+
}
283311
}
284312
}

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
200200

201201
ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> writeString(name, req.out()));
202202

203-
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry);
203+
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, log);
204204
}
205205

206206
/** {@inheritDoc} */
@@ -209,7 +209,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
209209

210210
return new IgniteClientFutureImpl<>(
211211
ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> writeString(name, req.out()))
212-
.thenApply(x -> new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry)));
212+
.thenApply(x -> new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, log)));
213213
}
214214

215215
/** {@inheritDoc} */
@@ -220,7 +220,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
220220
ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
221221
req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().protocolCtx()));
222222

223-
return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry);
223+
return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry, log);
224224
}
225225

226226
/** {@inheritDoc} */
@@ -231,14 +231,14 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
231231
return new IgniteClientFutureImpl<>(
232232
ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
233233
req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().protocolCtx()))
234-
.thenApply(x -> new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry)));
234+
.thenApply(x -> new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry, log)));
235235
}
236236

237237
/** {@inheritDoc} */
238238
@Override public <K, V> ClientCache<K, V> cache(String name) {
239239
ensureCacheName(name);
240240

241-
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry);
241+
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, log);
242242
}
243243

244244
/** {@inheritDoc} */
@@ -277,7 +277,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
277277

278278
ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> writeString(name, req.out()));
279279

280-
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry);
280+
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, log);
281281
}
282282

283283
/** {@inheritDoc} */
@@ -286,7 +286,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
286286

287287
return new IgniteClientFutureImpl<>(
288288
ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_NAME, req -> writeString(name, req.out()))
289-
.thenApply(x -> new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry)));
289+
.thenApply(x -> new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, log)));
290290
}
291291

292292
/** {@inheritDoc} */
@@ -296,7 +296,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
296296
ch.request(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
297297
req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().protocolCtx()));
298298

299-
return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry);
299+
return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry, log);
300300
}
301301

302302
/** {@inheritDoc} */
@@ -307,7 +307,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
307307
return new IgniteClientFutureImpl<>(
308308
ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
309309
req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().protocolCtx()))
310-
.thenApply(x -> new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry)));
310+
.thenApply(x -> new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, lsnrsRegistry, log)));
311311
}
312312

313313
/** {@inheritDoc} */

0 commit comments

Comments
 (0)