13
13
import tech .ydb .core .Status ;
14
14
import tech .ydb .core .grpc .GrpcTransport ;
15
15
import tech .ydb .examples .SimpleExample ;
16
- import tech .ydb .table . Session ;
17
- import tech .ydb .table . TableClient ;
18
- import tech .ydb .table . impl . PooledTableClient ;
19
- import tech .ydb .table .rpc . grpc . GrpcTableRpc ;
20
- import tech .ydb .table .transaction . TableTransaction ;
16
+ import tech .ydb .query . QueryClient ;
17
+ import tech .ydb .query . QueryTransaction ;
18
+ import tech .ydb .query . tools . SessionRetryContext ;
19
+ import tech .ydb .table .query . Params ;
20
+ import tech .ydb .table .values . PrimitiveValue ;
21
21
import tech .ydb .topic .TopicClient ;
22
22
import tech .ydb .topic .read .AsyncReader ;
23
- import tech .ydb .topic .read .DecompressionException ;
24
23
import tech .ydb .topic .read .Message ;
25
24
import tech .ydb .topic .read .PartitionSession ;
26
25
import tech .ydb .topic .read .events .DataReceivedEvent ;
@@ -43,39 +42,36 @@ public class TransactionReadAsync extends SimpleExample {
43
42
private static final int MESSAGES_COUNT = 1 ;
44
43
45
44
private final CompletableFuture <Void > messageReceivedFuture = new CompletableFuture <>();
46
- private TableClient tableClient ;
45
+ private QueryClient queryClient ;
47
46
private AsyncReader reader ;
48
47
49
48
@ Override
50
49
protected void run (GrpcTransport transport , String pathPrefix ) {
51
- tableClient = PooledTableClient .newClient (GrpcTableRpc .useTransport (transport )).build ();
52
-
53
-
54
- try (TopicClient topicClient = TopicClient .newClient (transport )
55
- .setCompressionPoolThreadCount (8 )
56
- .build ()) {
57
- ReaderSettings readerSettings = ReaderSettings .newBuilder ()
58
- .setConsumerName (CONSUMER_NAME )
59
- .addTopic (TopicReadSettings .newBuilder ()
60
- .setPath (TOPIC_NAME )
61
- .setReadFrom (Instant .now ().minus (Duration .ofHours (24 )))
62
- .setMaxLag (Duration .ofMinutes (30 ))
63
- .build ())
64
- .setMaxMemoryUsageBytes (MAX_MEMORY_USAGE_BYTES )
65
- .build ();
66
-
67
- ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings .newBuilder ()
68
- .setEventHandler (new Handler ())
69
- .build ();
70
-
71
- reader = topicClient .createAsyncReader (readerSettings , handlerSettings );
72
-
73
- reader .init ();
74
-
75
- messageReceivedFuture .join ();
76
-
77
- reader .shutdown ().join ();
78
- tableClient .close ();
50
+ try (TopicClient topicClient = TopicClient .newClient (transport ).build ()) {
51
+ try (QueryClient queryClient = QueryClient .newClient (transport ).build ()) {
52
+ this .queryClient = queryClient ;
53
+ ReaderSettings readerSettings = ReaderSettings .newBuilder ()
54
+ .setConsumerName (CONSUMER_NAME )
55
+ .addTopic (TopicReadSettings .newBuilder ()
56
+ .setPath (TOPIC_NAME )
57
+ .setReadFrom (Instant .now ().minus (Duration .ofHours (24 )))
58
+ .setMaxLag (Duration .ofMinutes (30 ))
59
+ .build ())
60
+ .setMaxMemoryUsageBytes (MAX_MEMORY_USAGE_BYTES )
61
+ .build ();
62
+
63
+ ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings .newBuilder ()
64
+ .setEventHandler (new Handler ())
65
+ .build ();
66
+
67
+ reader = topicClient .createAsyncReader (readerSettings , handlerSettings );
68
+
69
+ reader .init ();
70
+
71
+ messageReceivedFuture .join ();
72
+
73
+ reader .shutdown ().join ();
74
+ }
79
75
}
80
76
}
81
77
@@ -93,69 +89,40 @@ private class Handler extends AbstractReadEventHandler {
93
89
@ Override
94
90
public void onMessages (DataReceivedEvent event ) {
95
91
for (Message message : event .getMessages ()) {
96
- StringBuilder str = new StringBuilder ("Message received" );
97
-
98
- if (logger .isTraceEnabled ()) {
99
- byte [] messageData ;
100
- try {
101
- messageData = message .getData ();
102
- } catch (DecompressionException e ) {
103
- logger .warn ("Decompression exception while receiving a message: " , e );
104
- messageData = e .getRawData ();
92
+ SessionRetryContext retryCtx = SessionRetryContext .create (queryClient ).build ();
93
+
94
+ retryCtx .supplyStatus (querySession -> {
95
+ // Begin new transaction on server
96
+ QueryTransaction transaction = querySession .beginTransaction (TxMode .SERIALIZABLE_RW )
97
+ .join ().getValue ();
98
+
99
+ // Update offsets in transaction
100
+ Status updateStatus = reader .updateOffsetsInTransaction (transaction , message .getPartitionOffsets (),
101
+ new UpdateOffsetsInTransactionSettings .Builder ().build ())
102
+ // Do not commit transaction without waiting for updateOffsetsInTransaction result
103
+ .join ();
104
+ if (!updateStatus .isSuccess ()) {
105
+ // Return update status to SessionRetryContext function
106
+ return CompletableFuture .completedFuture (updateStatus );
105
107
}
106
- str .append (": \" " ).append (new String (messageData , StandardCharsets .UTF_8 )).append ("\" " );
107
- }
108
- str .append ("\n " );
109
- if (logger .isDebugEnabled ()) {
110
- str .append (" offset: " ).append (message .getOffset ()).append ("\n " )
111
- .append (" seqNo: " ).append (message .getSeqNo ()).append ("\n " )
112
- .append (" createdAt: " ).append (message .getCreatedAt ()).append ("\n " )
113
- .append (" messageGroupId: " ).append (message .getMessageGroupId ()).append ("\n " )
114
- .append (" producerId: " ).append (message .getProducerId ()).append ("\n " )
115
- .append (" writtenAt: " ).append (message .getWrittenAt ()).append ("\n " )
116
- .append (" partitionSession: " ).append (message .getPartitionSession ().getId ()).append ("\n " )
117
- .append (" partitionId: " ).append (message .getPartitionSession ().getPartitionId ())
118
- .append ("\n " );
119
- if (!message .getWriteSessionMeta ().isEmpty ()) {
120
- str .append (" writeSessionMeta:\n " );
121
- message .getWriteSessionMeta ().forEach ((key , value ) ->
122
- str .append (" " ).append (key ).append (": " ).append (value ).append ("\n " ));
123
- }
124
- if (logger .isTraceEnabled ()) {
125
- logger .trace (str .toString ());
126
- } else {
127
- logger .debug (str .toString ());
128
- }
129
- } else {
130
- logger .info ("Message received. SeqNo={}, offset={}" , message .getSeqNo (), message .getOffset ());
131
- }
132
108
133
- // creating session and transaction
134
- Result <Session > sessionResult = tableClient .createSession (Duration .ofSeconds (10 )).join ();
135
- if (!sessionResult .isSuccess ()) {
136
- logger .error ("Couldn't get a session from the pool: {}" , sessionResult );
137
- return ; // retry or shutdown
138
- }
139
- Session session = sessionResult .getValue ();
140
- TableTransaction transaction = session .beginTransaction (TxMode .SERIALIZABLE_RW )
141
- .join ()
142
- .getValue ();
143
-
144
- // do something else in transaction
145
- transaction .executeDataQuery ("SELECT 1" ).join ();
146
- // analyzeQueryResultIfNeeded();
147
-
148
- Status updateStatus = reader .updateOffsetsInTransaction (transaction ,
149
- message .getPartitionOffsets (), new UpdateOffsetsInTransactionSettings .Builder ().build ())
150
- // Do not commit transaction without waiting for updateOffsetsInTransaction result
151
- .join ();
152
- if (!updateStatus .isSuccess ()) {
153
- logger .error ("Couldn't update offsets in transaction: {}" , updateStatus );
154
- return ; // retry or shutdown
155
- }
109
+ // Execute a query in transaction
110
+ Status queryStatus = transaction .createQuery (
111
+ "$last = SELECT MAX(val) FROM table WHERE id=$id;\n " +
112
+ "UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)" ,
113
+ Params .of ("$id" , PrimitiveValue .newText (message .getMessageGroupId ()),
114
+ "$value" , PrimitiveValue .newInt64 (Long .parseLong (
115
+ new String (message .getData (), StandardCharsets .UTF_8 )))))
116
+ .execute ().join ().getStatus ();
117
+
118
+ if (!queryStatus .isSuccess ()) {
119
+ // Return query status to SessionRetryContext function
120
+ return CompletableFuture .completedFuture (queryStatus );
121
+ }
156
122
157
- Status commitStatus = transaction .commit ().join ();
158
- analyzeCommitStatus (commitStatus );
123
+ // Return commit status to SessionRetryContext function
124
+ return transaction .commit ().thenApply (Result ::getStatus );
125
+ }).join ().expectSuccess ("Couldn't read from topic and write to table in transaction" );
159
126
160
127
if (messageCounter .incrementAndGet () >= MESSAGES_COUNT ) {
161
128
logger .info ("{} messages committed in transaction. Finishing reading." , MESSAGES_COUNT );
0 commit comments