Skip to content

Commit ae0028d

Browse files
IGNITE-27207 Thin client: SqlFieldsQuery initiator ID support - Fixes #12550.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 24050d9 commit ae0028d

File tree

7 files changed

+58
-13
lines changed

7 files changed

+58
-13
lines changed

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import org.jetbrains.annotations.Nullable;
6363

6464
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.CACHE_STORAGES;
65+
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_INITIATOR_ID;
66+
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_PARTITIONS_BATCH_SIZE;
6567
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
6668
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.QUERY_ENTITY_PRECISION_AND_SCALE;
6769
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
@@ -519,7 +521,7 @@ ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, ProtocolContex
519521
}
520522

521523
/** Serialize SQL field query to stream. */
522-
void write(SqlFieldsQuery qry, BinaryOutputStream out) {
524+
void write(SqlFieldsQuery qry, BinaryOutputStream out, ProtocolContext protocolCtx) {
523525
writeObject(out, qry.getSchema());
524526
out.writeInt(qry.getPageSize());
525527
out.writeInt(-1); // do not limit
@@ -535,16 +537,21 @@ void write(SqlFieldsQuery qry, BinaryOutputStream out) {
535537
out.writeLong(qry.getTimeout());
536538
out.writeBoolean(true); // include column names
537539

538-
if (qry.getPartitions() != null) {
539-
out.writeInt(qry.getPartitions().length);
540+
if (protocolCtx.isFeatureSupported(QRY_PARTITIONS_BATCH_SIZE)) {
541+
if (qry.getPartitions() != null) {
542+
out.writeInt(qry.getPartitions().length);
540543

541-
for (int part : qry.getPartitions())
542-
out.writeInt(part);
544+
for (int part : qry.getPartitions())
545+
out.writeInt(part);
546+
}
547+
else
548+
out.writeInt(-1);
549+
550+
out.writeInt(qry.getUpdateBatchSize());
543551
}
544-
else
545-
out.writeInt(-1);
546552

547-
out.writeInt(qry.getUpdateBatchSize());
553+
if (protocolCtx.isFeatureSupported(QRY_INITIATOR_ID))
554+
writeObject(out, qry.getQueryInitiatorId());
548555
}
549556

550557
/** Write Ignite binary object to output stream. */

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ public enum ProtocolBitmaskFeature {
112112
SQL_CACHE_CREATION(21),
113113

114114
/** Data-center information. */
115-
DC_AWARE(22);
115+
DC_AWARE(22),
116+
117+
/** SqlFieldsQuery initiatorId property. */
118+
QRY_INITIATOR_ID(23);
116119

117120
/** */
118121
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,7 @@ else if (qry instanceof IndexQuery)
10841084
? transactions.tx()
10851085
: null
10861086
);
1087-
serDes.write(qry, payloadCh.out());
1087+
serDes.write(qry, payloadCh.out(), payloadCh.clientChannel().protocolCtx());
10881088
};
10891089

10901090
return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public <K, V> ClientCache<K, V> getOrCreateCache(ClientCacheConfiguration cfg, b
372372
else
373373
out.writeByte(flags);
374374

375-
serDes.write(qry, out);
375+
serDes.write(qry, out, payloadCh.clientChannel().protocolCtx());
376376
};
377377

378378
return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
109109
SQL_CACHE_CREATION(21),
110110

111111
/** Data-center information. */
112-
DC_AWARE(22);
112+
DC_AWARE(22),
113+
114+
/** SqlFieldsQuery initiatorId property. */
115+
QRY_INITIATOR_ID(23);
113116

114117
/** */
115118
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlFieldsQueryRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class ClientCacheSqlFieldsQueryRequest extends ClientCacheQueryRequest im
5353
/** Update batch size. */
5454
private final Integer updateBatchSize;
5555

56+
/** Query initiator ID. */
57+
private final String initiatorId;
58+
5659
/**
5760
* Ctor.
5861
*
@@ -120,6 +123,11 @@ public ClientCacheSqlFieldsQueryRequest(BinaryReaderEx reader,
120123
partitions = null;
121124
updateBatchSize = null;
122125
}
126+
127+
if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.QRY_INITIATOR_ID))
128+
initiatorId = reader.readString();
129+
else
130+
initiatorId = null;
123131
}
124132

125133
/** {@inheritDoc} */
@@ -132,7 +140,7 @@ public ClientCacheSqlFieldsQueryRequest(BinaryReaderEx reader,
132140
ctx.incrementCursors();
133141

134142
try {
135-
qry.setQueryInitiatorId(ctx.clientDescriptor());
143+
qry.setQueryInitiatorId(initiatorId == null ? ctx.clientDescriptor() : initiatorId);
136144

137145
// If cacheId is provided, we must check the cache for existence.
138146
if (cacheId() != 0) {

modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,30 @@ public void testEmptyQuery() {
317317
}
318318
}
319319

320+
/** Tests {@link SqlFieldsQuery} initiator ID parameter. */
321+
@Test
322+
public void testQueryInitiatorId() {
323+
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
324+
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
325+
) {
326+
String initiatorId = "test";
327+
328+
SqlFieldsQuery qry = new SqlFieldsQuery("SELECT INITIATOR_ID FROM SYS.SQL_QUERIES").setQueryInitiatorId(initiatorId);
329+
330+
List<List<?>> res = client.query(qry).getAll();
331+
332+
assertEquals(1, res.size());
333+
assertEquals(initiatorId, res.get(0).get(0));
334+
335+
ClientCache<Object, Object> cache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
336+
337+
res = cache.query(qry).getAll();
338+
339+
assertEquals(1, res.size());
340+
assertEquals(initiatorId, res.get(0).get(0));
341+
}
342+
}
343+
320344
/** */
321345
private static ClientConfiguration getClientConfiguration() {
322346
return new ClientConfiguration().setAddresses(Config.SERVER)

0 commit comments

Comments
 (0)