Skip to content

Commit ddf98e9

Browse files
committed
Make a separate base class for topic examples
1 parent 1b3d369 commit ddf98e9

13 files changed

+65
-52
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<log4j.version>2.22.1</log4j.version>
1919
<jcommander.version>1.82</jcommander.version>
2020

21-
<ydb.sdk.version>2.2.10</ydb.sdk.version>
21+
<ydb.sdk.version>2.3.5-SNAPSHOT</ydb.sdk.version>
2222
</properties>
2323

2424
<modules>

ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java

-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@
99
* @author Nikolay Perfilov
1010
*/
1111
public abstract class SimpleExample {
12-
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
13-
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");
14-
1512
protected void doMain(String[] args) {
1613
if (args.length > 1) {
1714
System.err.println("Too many arguments");

ydb-cookbook/src/main/java/tech/ydb/examples/topic/ControlPlane.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.slf4j.LoggerFactory;
1313
import tech.ydb.core.Result;
1414
import tech.ydb.core.grpc.GrpcTransport;
15-
import tech.ydb.examples.SimpleExample;
1615
import tech.ydb.topic.TopicClient;
1716
import tech.ydb.topic.description.Codec;
1817
import tech.ydb.topic.description.Consumer;
@@ -28,11 +27,11 @@
2827
/**
2928
* @author Nikolay Perfilov
3029
*/
31-
public class ControlPlane extends SimpleExample {
30+
public class ControlPlane extends SimpleTopicExample {
3231
private static final Logger logger = LoggerFactory.getLogger(ControlPlane.class);
3332

3433
@Override
35-
protected void run(GrpcTransport transport, String pathPrefix) {
34+
protected void run(GrpcTransport transport) {
3635
logger.info("ControlPlane run");
3736
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
3837

ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
1111
import tech.ydb.core.grpc.GrpcTransport;
12-
import tech.ydb.examples.SimpleExample;
1312
import tech.ydb.topic.TopicClient;
1413
import tech.ydb.topic.read.AsyncReader;
1514
import tech.ydb.topic.read.DecompressionException;
@@ -28,16 +27,15 @@
2827
/**
2928
* @author Nikolay Perfilov
3029
*/
31-
public class ReadAsync extends SimpleExample {
30+
public class ReadAsync extends SimpleTopicExample {
3231
private static final Logger logger = LoggerFactory.getLogger(ReadAsync.class);
3332
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
3433
private static final int MESSAGES_COUNT = 5;
3534

3635
private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
37-
private long lastSeqNo = -1;
3836

3937
@Override
40-
protected void run(GrpcTransport transport, String pathPrefix) {
38+
protected void run(GrpcTransport transport) {
4139

4240
try (TopicClient topicClient = TopicClient.newClient(transport)
4341
.setCompressionPoolThreadCount(8)
@@ -108,13 +106,6 @@ public void onMessages(DataReceivedEvent event) {
108106
} else {
109107
logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
110108
}
111-
if (lastSeqNo > message.getSeqNo()) {
112-
logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}",
113-
message.getSeqNo(), lastSeqNo);
114-
messageReceivedFuture.complete(null);
115-
} else {
116-
lastSeqNo = message.getSeqNo();
117-
}
118109
message.commit().thenRun(() -> {
119110
logger.info("Message committed");
120111
if (messageCounter.incrementAndGet() >= MESSAGES_COUNT) {

ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99
import tech.ydb.core.grpc.GrpcTransport;
10-
import tech.ydb.examples.SimpleExample;
1110
import tech.ydb.topic.TopicClient;
1211
import tech.ydb.topic.read.DecompressionException;
1312
import tech.ydb.topic.read.Message;
@@ -18,11 +17,11 @@
1817
/**
1918
* @author Nikolay Perfilov
2019
*/
21-
public class ReadSync extends SimpleExample {
20+
public class ReadSync extends SimpleTopicExample {
2221
private static final Logger logger = LoggerFactory.getLogger(ReadSync.class);
2322

2423
@Override
25-
protected void run(GrpcTransport transport, String pathPrefix) {
24+
protected void run(GrpcTransport transport) {
2625

2726
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
2827

ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121
import tech.ydb.core.grpc.GrpcTransport;
22-
import tech.ydb.examples.SimpleExample;
2322
import tech.ydb.topic.TopicClient;
2423
import tech.ydb.topic.description.Codec;
2524
import tech.ydb.topic.read.AsyncReader;
@@ -40,7 +39,7 @@
4039
/**
4140
* @author Nikolay Perfilov
4241
*/
43-
public class ReadWriteWorkload extends SimpleExample {
42+
public class ReadWriteWorkload extends SimpleTopicExample {
4443
private static final Logger logger = LoggerFactory.getLogger(ReadWriteWorkload.class);
4544
private static final int WRITE_TIMEOUT_SECONDS = 60;
4645
private static final int MESSAGE_LENGTH_BYTES = 10_000_000; // 10 Mb
@@ -53,12 +52,11 @@ public class ReadWriteWorkload extends SimpleExample {
5352
private final AtomicInteger messagesReceived = new AtomicInteger(0);
5453
private final AtomicInteger messagesCommitted = new AtomicInteger(0);
5554
private final AtomicLong bytesWritten = new AtomicLong(0);
56-
private long lastSeqNo = -1;
5755
CountDownLatch writeFinishedLatch = new CountDownLatch(1);
5856
CountDownLatch readFinishedLatch = new CountDownLatch(1);
5957

6058
@Override
61-
protected void run(GrpcTransport transport, String pathPrefix) {
59+
protected void run(GrpcTransport transport) {
6260

6361
ExecutorService compressionExecutor = Executors.newFixedThreadPool(10);
6462
AtomicBoolean timeToStopWriting = new AtomicBoolean(false);
@@ -154,9 +152,8 @@ protected void run(GrpcTransport transport, String pathPrefix) {
154152
};
155153

156154
Runnable readingThread = () -> {
157-
String consumerName = "consumer1";
158155
ReaderSettings readerSettings = ReaderSettings.newBuilder()
159-
.setConsumerName(consumerName)
156+
.setConsumerName(CONSUMER_NAME)
160157
.addTopic(TopicReadSettings.newBuilder()
161158
.setPath(TOPIC_NAME)
162159
.setReadFrom(Instant.now().minus(Duration.ofHours(24)))
@@ -274,12 +271,6 @@ public void onMessages(DataReceivedEvent event) {
274271
} else {
275272
logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
276273
}
277-
if (lastSeqNo > message.getSeqNo()) {
278-
logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}",
279-
message.getSeqNo(), lastSeqNo);
280-
} else {
281-
lastSeqNo = message.getSeqNo();
282-
}
283274
message.commit().thenRun(() -> {
284275
logger.trace("Message committed");
285276
unreadMessagesCount.decrementAndGet();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package tech.ydb.examples.topic;
2+
3+
import tech.ydb.auth.iam.CloudAuthHelper;
4+
import tech.ydb.core.grpc.GrpcTransport;
5+
6+
7+
/**
8+
* @author Nikolay Perfilov
9+
*/
10+
public abstract class SimpleTopicExample {
11+
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
12+
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");
13+
14+
protected void doMain(String[] args) {
15+
if (args.length > 1) {
16+
System.err.println("Too many arguments");
17+
return;
18+
}
19+
String connString;
20+
if (args.length == 1) {
21+
connString = args[0];
22+
} else {
23+
connString = "some.host.name.com:2135?database=/Root";
24+
System.err.println("Pass <connection-string> as argument to override connection settings\n");
25+
}
26+
27+
System.err.println("connection-string: " + connString + "\n");
28+
29+
try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
30+
.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
31+
.build()) {
32+
run(transport);
33+
} catch (Throwable t) {
34+
t.printStackTrace();
35+
}
36+
}
37+
38+
protected abstract void run(GrpcTransport transport);
39+
}

ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212
import tech.ydb.core.grpc.GrpcTransport;
13-
import tech.ydb.examples.SimpleExample;
1413
import tech.ydb.topic.TopicClient;
1514
import tech.ydb.topic.description.Codec;
1615
import tech.ydb.topic.settings.WriterSettings;
@@ -22,13 +21,13 @@
2221
/**
2322
* @author Nikolay Perfilov
2423
*/
25-
public class WriteAsync extends SimpleExample {
24+
public class WriteAsync extends SimpleTopicExample {
2625
private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class);
2726
private static final int MESSAGES_COUNT = 5;
2827
private static final int WAIT_TIMEOUT_SECONDS = 60;
2928

3029
@Override
31-
protected void run(GrpcTransport transport, String pathPrefix) {
30+
protected void run(GrpcTransport transport) {
3231
String producerId = "messageGroup1";
3332
String messageGroupId = "messageGroup1";
3433

ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99
import tech.ydb.core.grpc.GrpcTransport;
10-
import tech.ydb.examples.SimpleExample;
1110
import tech.ydb.topic.TopicClient;
1211
import tech.ydb.topic.description.Codec;
1312
import tech.ydb.topic.settings.WriterSettings;
@@ -17,16 +16,15 @@
1716
/**
1817
* @author Nikolay Perfilov
1918
*/
20-
public class WriteSync extends SimpleExample {
19+
public class WriteSync extends SimpleTopicExample {
2120
private static final Logger logger = LoggerFactory.getLogger(WriteSync.class);
2221

2322
@Override
24-
protected void run(GrpcTransport transport, String pathPrefix) {
23+
protected void run(GrpcTransport transport) {
2524
String producerId = "messageGroup1";
2625
String messageGroupId = "messageGroup1";
2726

2827
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
29-
3028
WriterSettings settings = WriterSettings.newBuilder()
3129
.setTopicPath(TOPIC_NAME)
3230
.setProducerId(producerId)

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import tech.ydb.core.Result;
1313
import tech.ydb.core.Status;
1414
import tech.ydb.core.grpc.GrpcTransport;
15-
import tech.ydb.examples.SimpleExample;
15+
import tech.ydb.examples.topic.SimpleTopicExample;
1616
import tech.ydb.query.QueryClient;
1717
import tech.ydb.query.QueryTransaction;
1818
import tech.ydb.query.tools.SessionRetryContext;
@@ -36,7 +36,7 @@
3636
/**
3737
* @author Nikolay Perfilov
3838
*/
39-
public class TransactionReadAsync extends SimpleExample {
39+
public class TransactionReadAsync extends SimpleTopicExample {
4040
private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class);
4141
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
4242
private static final int MESSAGES_COUNT = 1;
@@ -46,7 +46,7 @@ public class TransactionReadAsync extends SimpleExample {
4646
private AsyncReader reader;
4747

4848
@Override
49-
protected void run(GrpcTransport transport, String pathPrefix) {
49+
protected void run(GrpcTransport transport) {
5050
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
5151
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {
5252
this.queryClient = queryClient;

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import tech.ydb.core.Result;
1010
import tech.ydb.core.Status;
1111
import tech.ydb.core.grpc.GrpcTransport;
12-
import tech.ydb.examples.SimpleExample;
12+
import tech.ydb.examples.topic.SimpleTopicExample;
1313
import tech.ydb.query.QueryClient;
1414
import tech.ydb.query.QueryTransaction;
1515
import tech.ydb.query.tools.SessionRetryContext;
@@ -25,10 +25,10 @@
2525
/**
2626
* @author Nikolay Perfilov
2727
*/
28-
public class TransactionReadSync extends SimpleExample {
28+
public class TransactionReadSync extends SimpleTopicExample {
2929

3030
@Override
31-
protected void run(GrpcTransport transport, String pathPrefix) {
31+
protected void run(GrpcTransport transport) {
3232

3333
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
3434
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import tech.ydb.core.Result;
1212
import tech.ydb.core.Status;
1313
import tech.ydb.core.grpc.GrpcTransport;
14-
import tech.ydb.examples.SimpleExample;
14+
import tech.ydb.examples.topic.SimpleTopicExample;
1515
import tech.ydb.query.QueryClient;
1616
import tech.ydb.query.QueryStream;
1717
import tech.ydb.query.QueryTransaction;
@@ -32,14 +32,14 @@
3232
/**
3333
* @author Nikolay Perfilov
3434
*/
35-
public class TransactionWriteAsync extends SimpleExample {
35+
public class TransactionWriteAsync extends SimpleTopicExample {
3636
private static final Logger logger = LoggerFactory.getLogger(TransactionWriteAsync.class);
3737
private static final String PRODUCER_ID = "messageGroup1";
3838
private static final String MESSAGE_GROUP_ID = "messageGroup1";
3939
private static final long SHUTDOWN_TIMEOUT_SECONDS = 10;
4040

4141
@Override
42-
protected void run(GrpcTransport transport, String pathPrefix) {
42+
protected void run(GrpcTransport transport) {
4343

4444
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
4545
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import tech.ydb.core.Result;
1212
import tech.ydb.core.Status;
1313
import tech.ydb.core.grpc.GrpcTransport;
14-
import tech.ydb.examples.SimpleExample;
14+
import tech.ydb.examples.topic.SimpleTopicExample;
1515
import tech.ydb.query.QueryClient;
1616
import tech.ydb.query.QueryStream;
1717
import tech.ydb.query.QueryTransaction;
@@ -30,14 +30,14 @@
3030
/**
3131
* @author Nikolay Perfilov
3232
*/
33-
public class TransactionWriteSync extends SimpleExample {
33+
public class TransactionWriteSync extends SimpleTopicExample {
3434
private static final Logger logger = LoggerFactory.getLogger(TransactionWriteSync.class);
3535
private static final String PRODUCER_ID = "messageGroup1";
3636
private static final String MESSAGE_GROUP_ID = "messageGroup1";
3737
private static final long SHUTDOWN_TIMEOUT_SECONDS = 10;
3838

3939
@Override
40-
protected void run(GrpcTransport transport, String pathPrefix) {
40+
protected void run(GrpcTransport transport) {
4141
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {
4242
try (QueryClient queryClient = QueryClient.newClient(transport).build()) {
4343
WriterSettings writerSettings = WriterSettings.newBuilder()

0 commit comments

Comments
 (0)