Skip to content

Commit ee1252c

Browse files
committed
Update examples
1 parent 67c8a20 commit ee1252c

File tree

4 files changed

+132
-228
lines changed

4 files changed

+132
-228
lines changed

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java

+7-23
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package tech.ydb.examples.topic.transactions;
22

33
import java.nio.charset.StandardCharsets;
4-
import java.time.Duration;
5-
import java.time.Instant;
64
import java.util.concurrent.CompletableFuture;
75
import java.util.concurrent.atomic.AtomicInteger;
86

@@ -38,7 +36,6 @@
3836
*/
3937
public class TransactionReadAsync extends SimpleTopicExample {
4038
private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class);
41-
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
4239
private static final int MESSAGES_COUNT = 1;
4340

4441
private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
@@ -47,42 +44,29 @@ public class TransactionReadAsync extends SimpleTopicExample {
4744

4845
@Override
4946
protected void run(GrpcTransport transport) {
47+
// WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change
5048
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
5149
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {
5250
this.queryClient = queryClient;
5351
ReaderSettings readerSettings = ReaderSettings.newBuilder()
5452
.setConsumerName(CONSUMER_NAME)
5553
.addTopic(TopicReadSettings.newBuilder()
5654
.setPath(TOPIC_NAME)
57-
.setReadFrom(Instant.now().minus(Duration.ofHours(24)))
58-
.setMaxLag(Duration.ofMinutes(30))
5955
.build())
60-
.setMaxMemoryUsageBytes(MAX_MEMORY_USAGE_BYTES)
6156
.build();
6257

6358
ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder()
6459
.setEventHandler(new Handler())
6560
.build();
6661

6762
reader = topicClient.createAsyncReader(readerSettings, handlerSettings);
68-
6963
reader.init();
70-
7164
messageReceivedFuture.join();
72-
7365
reader.shutdown().join();
7466
}
7567
}
7668
}
7769

78-
public static void analyzeCommitStatus(Status status) {
79-
if (status.isSuccess()) {
80-
logger.info("Transaction committed successfully");
81-
} else {
82-
logger.error("Failed to commit transaction: {}", status);
83-
}
84-
}
85-
8670
private class Handler extends AbstractReadEventHandler {
8771
private final AtomicInteger messageCounter = new AtomicInteger(0);
8872

@@ -92,7 +76,6 @@ public void onMessages(DataReceivedEvent event) {
9276
SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build();
9377

9478
retryCtx.supplyStatus(querySession -> {
95-
// Begin new transaction on server
9679
QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW)
9780
.join().getValue();
9881

@@ -108,11 +91,12 @@ public void onMessages(DataReceivedEvent event) {
10891

10992
// Execute a query in transaction
11093
Status queryStatus = transaction.createQuery(
111-
"$last = SELECT MAX(val) FROM table WHERE id=$id;\n" +
112-
"UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)",
113-
Params.of("$id", PrimitiveValue.newText(message.getMessageGroupId()),
114-
"$value", PrimitiveValue.newInt64(Long.parseLong(
115-
new String(message.getData(), StandardCharsets.UTF_8)))))
94+
"DECLARE $id AS Uint64; \n" +
95+
"DECLARE $value AS Text;\n" +
96+
"UPSERT INTO table (id, value) VALUES ($id, $value)",
97+
Params.of("$id", PrimitiveValue.newUint64(message.getOffset()),
98+
"$value", PrimitiveValue.newText(new String(message.getData(),
99+
StandardCharsets.UTF_8))))
116100
.execute().join().getStatus();
117101

118102
if (!queryStatus.isSuccess()) {

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java

+28-45
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package tech.ydb.examples.topic.transactions;
22

33
import java.nio.charset.StandardCharsets;
4-
import java.time.Duration;
5-
import java.time.Instant;
64
import java.util.concurrent.CompletableFuture;
75

86
import tech.ydb.common.transaction.TxMode;
@@ -29,66 +27,51 @@ public class TransactionReadSync extends SimpleTopicExample {
2927

3028
@Override
3129
protected void run(GrpcTransport transport) {
32-
30+
// WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change
3331
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
3432
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {
3533
ReaderSettings settings = ReaderSettings.newBuilder()
3634
.setConsumerName(CONSUMER_NAME)
3735
.addTopic(TopicReadSettings.newBuilder()
3836
.setPath(TOPIC_NAME)
39-
.setReadFrom(Instant.now().minus(Duration.ofHours(24)))
40-
.setMaxLag(Duration.ofMinutes(30))
4137
.build())
4238
.build();
4339

4440
SyncReader reader = topicClient.createSyncReader(settings);
45-
46-
// Init in background
4741
reader.init();
4842

49-
tableAndTopicWithinTransaction(topicClient, queryClient, reader);
50-
tableAndTopicWithinTransaction(topicClient, queryClient, reader);
51-
43+
SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build();
44+
45+
retryCtx.supplyStatus(querySession -> {
46+
QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW).join().getValue();
47+
Message message;
48+
try {
49+
message = reader.receive(ReceiveSettings.newBuilder()
50+
.setTransaction(transaction)
51+
.build());
52+
} catch (InterruptedException exception) {
53+
throw new RuntimeException("Interrupted exception while waiting for message");
54+
}
55+
56+
Status queryStatus = transaction.createQuery(
57+
"DECLARE $id AS Uint64; \n" +
58+
"DECLARE $value AS Text;\n" +
59+
"UPSERT INTO table (id, value) VALUES ($id, $value)",
60+
Params.of("$id", PrimitiveValue.newUint64(message.getOffset()),
61+
"$value", PrimitiveValue.newText(new String(message.getData(),
62+
StandardCharsets.UTF_8))))
63+
.execute().join().getStatus();
64+
if (!queryStatus.isSuccess()) {
65+
return CompletableFuture.completedFuture(queryStatus);
66+
}
67+
68+
return transaction.commit().thenApply(Result::getStatus);
69+
}).join().expectSuccess("Couldn't read from topic and write to table in transaction");
5270
reader.shutdown();
5371
}
5472
}
5573
}
5674

57-
private void tableAndTopicWithinTransaction(TopicClient topicClient, QueryClient queryClient, SyncReader reader) {
58-
SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build();
59-
60-
retryCtx.supplyStatus(querySession -> {
61-
// Begin new transaction on server
62-
QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW).join().getValue();
63-
64-
// Read message in transaction
65-
Message message;
66-
try {
67-
message = reader.receive(ReceiveSettings.newBuilder()
68-
.setTransaction(transaction)
69-
.build());
70-
} catch (InterruptedException exception) {
71-
throw new RuntimeException("Interrupted exception while waiting for message");
72-
}
73-
74-
// Execute a query in transaction
75-
Status queryStatus = transaction.createQuery(
76-
"$last = SELECT MAX(val) FROM table WHERE id=$id;\n" +
77-
"UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)",
78-
Params.of("$id", PrimitiveValue.newText(message.getMessageGroupId()),
79-
"$value", PrimitiveValue.newInt64(Long.parseLong(
80-
new String(message.getData(), StandardCharsets.UTF_8)))))
81-
.execute().join().getStatus();
82-
83-
if (!queryStatus.isSuccess()) {
84-
return CompletableFuture.completedFuture(queryStatus);
85-
}
86-
87-
// Return commit status to SessionRetryContext function
88-
return transaction.commit().thenApply(Result::getStatus);
89-
}).join().expectSuccess("Couldn't read from topic and write to table in transaction");
90-
}
91-
9275
public static void main(String[] args) {
9376
new TransactionReadSync().doMain(args);
9477
}

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java

+54-97
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import java.util.concurrent.TimeUnit;
66
import java.util.concurrent.TimeoutException;
77

8-
import org.slf4j.Logger;
9-
import org.slf4j.LoggerFactory;
108
import tech.ydb.common.transaction.TxMode;
119
import tech.ydb.core.Result;
1210
import tech.ydb.core.Status;
@@ -21,125 +19,84 @@
2119
import tech.ydb.table.result.ResultSetReader;
2220
import tech.ydb.table.values.PrimitiveValue;
2321
import tech.ydb.topic.TopicClient;
24-
import tech.ydb.topic.description.Codec;
2522
import tech.ydb.topic.settings.SendSettings;
2623
import tech.ydb.topic.settings.WriterSettings;
2724
import tech.ydb.topic.write.AsyncWriter;
2825
import tech.ydb.topic.write.Message;
2926
import tech.ydb.topic.write.QueueOverflowException;
30-
import tech.ydb.topic.write.WriteAck;
3127

3228
/**
3329
* @author Nikolay Perfilov
3430
*/
3531
public class TransactionWriteAsync extends SimpleTopicExample {
36-
private static final Logger logger = LoggerFactory.getLogger(TransactionWriteAsync.class);
37-
private static final String PRODUCER_ID = "messageGroup1";
38-
private static final String MESSAGE_GROUP_ID = "messageGroup1";
3932
private static final long SHUTDOWN_TIMEOUT_SECONDS = 10;
4033

4134
@Override
4235
protected void run(GrpcTransport transport) {
43-
36+
// WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change
4437
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
4538
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {
39+
long id = 2;
40+
String randomProducerId = "randomProducerId"; // Different for writers with different transactions
4641
WriterSettings writerSettings = WriterSettings.newBuilder()
4742
.setTopicPath(TOPIC_NAME)
48-
.setProducerId(PRODUCER_ID)
49-
.setMessageGroupId(MESSAGE_GROUP_ID)
50-
.setCodec(Codec.ZSTD)
43+
.setProducerId(randomProducerId)
44+
.setMessageGroupId(randomProducerId)
5145
.build();
5246

53-
writeFromTableToTopic(topicClient, queryClient, writerSettings, 1);
54-
writeFromTableToTopic(topicClient, queryClient, writerSettings, 2);
47+
SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build();
48+
retryCtx.supplyStatus(querySession -> {
49+
QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW)
50+
.join().getValue();
51+
52+
QueryStream queryStream = transaction.createQuery(
53+
"DECLARE $id AS Uint64;\n" +
54+
"SELECT value FROM table WHERE id=$id",
55+
Params.of("$id", PrimitiveValue.newUint64(id)));
56+
QueryReader queryReader = QueryReader.readFrom(queryStream).join().getValue();
57+
ResultSetReader resultSet = queryReader.getResultSet(0);
58+
if (!resultSet.next()) {
59+
throw new RuntimeException("Value for id=" + id + " not found");
60+
}
61+
String value = resultSet.getColumn("value").getText();
62+
63+
// Current implementation requires creating a writer for every transaction:
64+
AsyncWriter writer = topicClient.createAsyncWriter(writerSettings);
65+
writer.init();
66+
System.err.println("writer initialized, value: " + value);
67+
try {
68+
writer.send(Message.of(value.getBytes()),
69+
SendSettings.newBuilder()
70+
.setTransaction(transaction)
71+
.build())
72+
.join(); // Waiting for WriteAck before committing transaction
73+
} catch (QueueOverflowException exception) {
74+
// Send queue is full. Need to retry with backoff or skip
75+
throw new RuntimeException("Couldn't add message to SDK buffer", exception);
76+
}
77+
CompletableFuture<Status> commitStatus = transaction.commit().thenApply(Result::getStatus);
78+
commitStatus.join();
79+
try {
80+
System.err.println("Commit status: " + commitStatus.get());
81+
} catch (InterruptedException | ExecutionException e) {
82+
throw new RuntimeException(e);
83+
}
84+
85+
try {
86+
writer.shutdown().get(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
87+
} catch (TimeoutException exception) {
88+
throw new RuntimeException("Shutdown not finished within " + SHUTDOWN_TIMEOUT_SECONDS +
89+
" seconds");
90+
} catch (InterruptedException | ExecutionException exception) {
91+
throw new RuntimeException("Shutdown not finished due to exception: " + exception);
92+
}
93+
94+
return commitStatus;
95+
}).join().expectSuccess("Couldn't read from table and write to topic in transaction");
5596
}
5697
}
5798
}
5899

59-
private void writeFromTableToTopic(TopicClient topicClient, QueryClient queryClient,
60-
WriterSettings writerSettings, long id) {
61-
SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build();
62-
retryCtx.supplyStatus(querySession -> {
63-
// Create new transaction object. It is not yet started on server
64-
QueryTransaction transaction = querySession.createNewTransaction(TxMode.SERIALIZABLE_RW);
65-
66-
// Execute a query to start a new transaction
67-
QueryStream queryStream = transaction.createQuery(
68-
"DECLARE $id AS Uint64; " +
69-
"SELECT value FROM table WHERE id=$id",
70-
Params.of("$id", PrimitiveValue.newUint64(id)));
71-
72-
// Get query result
73-
QueryReader queryReader = QueryReader.readFrom(queryStream).join().getValue();
74-
ResultSetReader resultSet = queryReader.getResultSet(0);
75-
if (!resultSet.next()) {
76-
throw new RuntimeException("Value for id=" + id + " not found");
77-
}
78-
String value = resultSet.getColumn("value").getText();
79-
80-
// Create a writer
81-
AsyncWriter writer = topicClient.createAsyncWriter(writerSettings);
82-
83-
// Init in background
84-
writer.init();
85-
86-
// Write a message
87-
while (true) {
88-
try {
89-
// Blocks until the message is put into sending buffer
90-
writer.send(Message.of(value.getBytes()),
91-
SendSettings.newBuilder()
92-
.setTransaction(transaction)
93-
.build())
94-
.whenComplete((result, ex) -> {
95-
if (ex != null) {
96-
logger.error("Exception while sending a message: ", ex);
97-
} else {
98-
logger.info("Message ack received");
99-
100-
switch (result.getState()) {
101-
case WRITTEN:
102-
WriteAck.Details details = result.getDetails();
103-
logger.info("Message was written successfully, offset: " +
104-
details.getOffset());
105-
break;
106-
case ALREADY_WRITTEN:
107-
logger.warn("Message was already written");
108-
break;
109-
default:
110-
throw new RuntimeException("Unknown WriteAck state: " + result.getState());
111-
}
112-
}
113-
})
114-
// Waiting for the message to reach the server before committing the transaction
115-
.join();
116-
logger.info("Message is sent");
117-
break;
118-
} catch (QueueOverflowException exception) {
119-
logger.error("Queue overflow exception while sending a message");
120-
// Send queue is full. Need to retry with backoff or skip
121-
}
122-
}
123-
124-
// Commit transaction
125-
CompletableFuture<Status> commitStatus = transaction.commit().thenApply(Result::getStatus);
126-
127-
// Shutdown writer
128-
try {
129-
writer.shutdown().get(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
130-
} catch (TimeoutException exception) {
131-
logger.error("Timeout exception during writer termination ({} seconds): ", SHUTDOWN_TIMEOUT_SECONDS, exception);
132-
} catch (ExecutionException exception) {
133-
logger.error("Execution exception during writer termination: ", exception);
134-
} catch (InterruptedException exception) {
135-
logger.error("Writer termination was interrupted: ", exception);
136-
}
137-
138-
// Return commit status to SessionRetryContext function
139-
return commitStatus;
140-
}).join().expectSuccess("Couldn't read from table and write to topic in transaction");
141-
}
142-
143100
public static void main(String[] args) {
144101
new TransactionWriteAsync().doMain(args);
145102
}

0 commit comments

Comments
 (0)