Skip to content

Commit eb21d71

Browse files
authored
Merge pull request #504 from alex268/master
Fixed double compression of one message
2 parents 4822d6e + 2cb5c87 commit eb21d71

File tree

4 files changed

+131
-139
lines changed

4 files changed

+131
-139
lines changed

topic/src/main/java/tech/ydb/topic/write/Message.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@ public class Message {
1515
private byte[] data;
1616
private final Long seqNo;
1717
private final Instant createTimestamp;
18-
private List<MetadataItem> metadataItems;
18+
private final List<MetadataItem> metadataItems;
1919

2020
private Message(Builder builder) {
2121
this.data = builder.data;
2222
this.seqNo = builder.seqNo;
2323
this.createTimestamp = builder.createTimestamp != null ? builder.createTimestamp : Instant.now();
24-
this.metadataItems = builder.metadataItems;
24+
this.metadataItems = builder.metadataItems != null ? builder.metadataItems : new ArrayList<>();
2525
}
2626

2727
private Message(byte[] data) {
2828
this.data = data;
2929
this.seqNo = null;
3030
this.createTimestamp = Instant.now();
31+
this.metadataItems = new ArrayList<>();
3132
}
3233

3334
public static Message of(byte[] data) {
@@ -42,6 +43,7 @@ public byte[] getData() {
4243
return data;
4344
}
4445

46+
@Deprecated
4547
public void setData(byte[] data) {
4648
this.data = data;
4749
}
@@ -50,10 +52,12 @@ public Long getSeqNo() {
5052
return seqNo;
5153
}
5254

55+
@Nonnull
5356
public Instant getCreateTimestamp() {
5457
return createTimestamp;
5558
}
5659

60+
@Nonnull
5761
public List<MetadataItem> getMetadataItems() {
5862
return metadataItems;
5963
}
Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,114 @@
11
package tech.ydb.topic.write.impl;
22

3+
import java.io.IOException;
4+
import java.time.Instant;
5+
import java.util.List;
36
import java.util.concurrent.CompletableFuture;
4-
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.stream.Collectors;
8+
9+
import com.google.protobuf.UnsafeByteOperations;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
512

613
import tech.ydb.common.transaction.YdbTransaction;
14+
import tech.ydb.core.utils.ProtobufUtils;
15+
import tech.ydb.proto.topic.YdbTopic;
16+
import tech.ydb.topic.description.Codec;
17+
import tech.ydb.topic.description.MetadataItem;
718
import tech.ydb.topic.settings.SendSettings;
19+
import tech.ydb.topic.utils.Encoder;
820
import tech.ydb.topic.write.Message;
921
import tech.ydb.topic.write.WriteAck;
1022

1123
public class EnqueuedMessage {
12-
private final Message message;
24+
25+
// use logger from WriterImpl
26+
private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);
27+
28+
private Long seqNo;
29+
private byte[] bytes;
30+
private final long originLength;
31+
private final Instant createdAt;
32+
private final List<MetadataItem> items;
33+
1334
private final CompletableFuture<WriteAck> future = new CompletableFuture<>();
14-
private final AtomicBoolean isCompressed = new AtomicBoolean();
15-
private final AtomicBoolean isProcessingFailed = new AtomicBoolean();
16-
private final long uncompressedSizeBytes;
1735
private final YdbTransaction transaction;
18-
private long compressedSizeBytes;
19-
private Long seqNo;
2036

21-
public EnqueuedMessage(Message message, SendSettings sendSettings) {
22-
this.message = message;
23-
this.uncompressedSizeBytes = message.getData().length;
24-
this.transaction = sendSettings != null ? sendSettings.getTransaction() : null;
25-
}
37+
private volatile boolean isReady = false;
38+
private volatile IOException compressError = null;
2639

27-
public Message getMessage() {
28-
return message;
29-
}
40+
public EnqueuedMessage(Message message, SendSettings sendSettings, boolean noCompression) {
41+
this.bytes = message.getData();
42+
this.createdAt = message.getCreateTimestamp();
43+
this.items = message.getMetadataItems();
44+
this.seqNo = message.getSeqNo();
3045

31-
public CompletableFuture<WriteAck> getFuture() {
32-
return future;
33-
}
34-
35-
public boolean isCompressed() {
36-
return isCompressed.get();
46+
this.originLength = bytes.length;
47+
this.transaction = sendSettings != null ? sendSettings.getTransaction() : null;
48+
this.isReady = noCompression;
3749
}
3850

39-
public void setCompressed(boolean compressed) {
40-
this.isCompressed.set(compressed);
51+
public boolean isReady() {
52+
return isReady;
4153
}
4254

43-
public boolean isProcessingFailed() {
44-
return isProcessingFailed.get();
55+
public long getOriginalSize() {
56+
return originLength;
4557
}
4658

47-
public void setProcessingFailed(boolean processingFailed) {
48-
isProcessingFailed.set(processingFailed);
49-
}
50-
51-
public long getUncompressedSizeBytes() {
52-
return uncompressedSizeBytes;
59+
public long getSize() {
60+
return bytes.length;
5361
}
5462

55-
public long getCompressedSizeBytes() {
56-
return compressedSizeBytes;
63+
public IOException getCompressError() {
64+
return compressError;
5765
}
5866

59-
public void setCompressedSizeBytes(long compressedSizeBytes) {
60-
this.compressedSizeBytes = compressedSizeBytes;
67+
public void encode(String writeId, Codec codec) {
68+
logger.trace("[{}] Started encoding message", writeId);
69+
70+
try {
71+
bytes = Encoder.encode(codec, bytes);
72+
isReady = true;
73+
logger.trace("[{}] Successfully finished encoding message", writeId);
74+
} catch (IOException ex) {
75+
logger.error("[{}] Exception while encoding message: ", writeId, ex);
76+
isReady = true;
77+
future.completeExceptionally(ex);
78+
}
6179
}
6280

63-
public long getSizeBytes() {
64-
return isCompressed() ? getCompressedSizeBytes() : getUncompressedSizeBytes();
81+
public CompletableFuture<WriteAck> getFuture() {
82+
return future;
6583
}
6684

6785
public Long getSeqNo() {
6886
return seqNo;
6987
}
7088

71-
public void setSeqNo(long seqNo) {
72-
this.seqNo = seqNo;
73-
}
74-
7589
public YdbTransaction getTransaction() {
7690
return transaction;
7791
}
92+
93+
long updateSeqNo(long lastSeqNo) {
94+
if (seqNo == null) {
95+
seqNo = lastSeqNo + 1;
96+
return seqNo;
97+
}
98+
return Math.max(lastSeqNo, seqNo);
99+
}
100+
101+
YdbTopic.StreamWriteMessage.WriteRequest.MessageData toMessageData() {
102+
return YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
103+
.setSeqNo(seqNo)
104+
.setData(UnsafeByteOperations.unsafeWrap(bytes))
105+
.setCreatedAt(ProtobufUtils.instantToProto(createdAt))
106+
.setUncompressedSize(originLength)
107+
.addAllMetadataItems(items.stream().map(it -> YdbTopic.MetadataItem.newBuilder()
108+
.setKey(it.getKey())
109+
.setValue(UnsafeByteOperations.unsafeWrap(it.getValue()))
110+
.build()
111+
).collect(Collectors.toList()))
112+
.build();
113+
}
78114
}

topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.Arrays;
55
import java.util.List;
66
import java.util.Queue;
7-
import java.util.stream.Collectors;
87

98
import com.google.protobuf.ByteString;
109
import org.slf4j.Logger;
@@ -13,7 +12,6 @@
1312
import tech.ydb.common.transaction.YdbTransaction;
1413
import tech.ydb.core.utils.ProtobufUtils;
1514
import tech.ydb.proto.topic.YdbTopic;
16-
import tech.ydb.topic.description.MetadataItem;
1715
import tech.ydb.topic.settings.WriterSettings;
1816
import tech.ydb.topic.utils.ProtoUtils;
1917

@@ -146,46 +144,23 @@ public void tryAddMessageToRequest(EnqueuedMessage message) {
146144
}
147145
currentTransaction = message.getTransaction();
148146
}
149-
long messageSeqNo = message.getSeqNo() == null
150-
? (message.getMessage().getSeqNo() == null ? ++seqNo : message.getMessage().getSeqNo())
151-
: message.getSeqNo();
152-
if (message.getSeqNo() == null) {
153-
message.setSeqNo(messageSeqNo);
154-
}
155-
156-
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder =
157-
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
158-
.setSeqNo(messageSeqNo)
159-
.setData(ByteString.copyFrom(message.getMessage().getData()))
160-
.setCreatedAt(ProtobufUtils.instantToProto(message.getMessage().getCreateTimestamp()))
161-
.setUncompressedSize(message.getUncompressedSizeBytes());
162-
163-
List<MetadataItem> metadataItems = message.getMessage().getMetadataItems();
164-
if (metadataItems != null && !metadataItems.isEmpty()) {
165-
messageDataBuilder.addAllMetadataItems(metadataItems
166-
.stream()
167-
.map(metadataItem -> YdbTopic.MetadataItem.newBuilder()
168-
.setKey(metadataItem.getKey())
169-
.setValue(ByteString.copyFrom(metadataItem.getValue()))
170-
.build())
171-
.collect(Collectors.toList()));
172-
}
173147

174-
YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData = messageDataBuilder.build();
148+
seqNo = message.updateSeqNo(seqNo);
175149

176-
long sizeWithCurrentMessage = getCurrentRequestSize() + messageData.getSerializedSize() + messageOverheadBytes;
150+
YdbTopic.StreamWriteMessage.WriteRequest.MessageData pb = message.toMessageData();
151+
long sizeWithCurrentMessage = getCurrentRequestSize() + pb.getSerializedSize() + messageOverheadBytes;
177152
if (sizeWithCurrentMessage <= MAX_GRPC_MESSAGE_SIZE) {
178-
addMessage(messageData);
153+
addMessage(pb);
179154
} else {
180155
if (messageCount > 0) {
181156
logger.debug("Adding next message to the same request would lead to grpc request size overflow. " +
182157
"Sending previous {} messages...", messageCount);
183158
sendWriteRequest();
184159
reset();
185-
addMessage(messageData);
160+
addMessage(pb);
186161
} else {
187162
logger.error("A single message is larger than grpc size limit. Sending it anyway...");
188-
addMessage(messageData);
163+
addMessage(pb);
189164
sendWriteRequest();
190165
reset();
191166
}

0 commit comments

Comments
 (0)