Skip to content

Commit

Permalink
deps: Upgraded spring batch redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 31, 2024
1 parent cf819e6 commit 2674632
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 69 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ config {
}

people {
clearDomainSet()
person {
id = 'jruaux'
name = 'Julien Ruaux'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,13 @@ public ConditionalDel(Operation<byte[], byte[], SinkRecord, Object> delegate,

@Override
public List<RedisFuture<Object>> execute(RedisAsyncCommands<byte[], byte[]> commands,
List<? extends SinkRecord> items) {
Chunk<? extends SinkRecord> items) {
List<RedisFuture<Object>> futures = new ArrayList<>();
List<SinkRecord> toRemove = items.stream().filter(delPredicate).collect(Collectors.toList());
futures.addAll(del.execute(commands, toRemove));
List<SinkRecord> toWrite = items.stream().filter(delPredicate.negate()).collect(Collectors.toList());
futures.addAll(write.execute(commands, toWrite));
List<SinkRecord> toRemove = items.getItems().stream().filter(delPredicate).collect(Collectors.toList());
futures.addAll(del.execute(commands, new Chunk<>(toRemove)));
List<SinkRecord> toWrite = items.getItems().stream().filter(delPredicate.negate())
.collect(Collectors.toList());
futures.addAll(write.execute(commands, new Chunk<>(toWrite)));
return futures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class RedisKeysSourceTask extends SourceTask {
private final Clock clock;

private AbstractRedisClient client;
private RedisItemReader<String, String, Object> reader;
private RedisItemReader<String, String> reader;
private int batchSize;
private String topic;

Expand All @@ -66,7 +66,7 @@ public String version() {
return ManifestVersionProvider.getVersion();
}

public RedisItemReader<String, String, Object> getReader() {
public RedisItemReader<String, String> getReader() {
return reader;
}

Expand Down Expand Up @@ -124,7 +124,7 @@ public void stop() {
}
}

private SourceRecord convert(KeyValue<String, Object> input) {
private SourceRecord convert(KeyValue<String> input) {
Map<String, ?> partition = new HashMap<>();
Map<String, ?> offset = new HashMap<>();
String key = input.getKey();
Expand All @@ -136,7 +136,7 @@ private SourceRecord convert(KeyValue<String, Object> input) {
@Override
public List<SourceRecord> poll() {
List<SourceRecord> records = new ArrayList<>();
KeyValue<String, Object> item;
KeyValue<String> item;
try {
while (records.size() < batchSize && (item = reader.read()) != null) {
records.add(convert(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
import org.apache.kafka.connect.data.Struct;

import com.redis.lettucemod.timeseries.Sample;
import com.redis.spring.batch.item.redis.common.DataType;
import com.redis.spring.batch.item.redis.common.KeyValue;

import io.lettuce.core.ScoredValue;

public class ToStructFunction implements Function<KeyValue<String, Object>, Struct> {
public class ToStructFunction implements Function<KeyValue<String>, Struct> {

public static final String FIELD_KEY = "key";

Expand Down Expand Up @@ -60,43 +59,45 @@ public class ToStructFunction implements Function<KeyValue<String, Object>, Stru
.field(FIELD_SET, SET_SCHEMA).field(FIELD_ZSET, ZSET_SCHEMA).name(VALUE_SCHEMA_NAME).build();

@Override
public Struct apply(KeyValue<String, Object> input) {
public Struct apply(KeyValue<String> input) {
Struct struct = new Struct(VALUE_SCHEMA);
struct.put(FIELD_KEY, input.getKey());
struct.put(FIELD_TTL, input.getTtl());
struct.put(FIELD_TYPE, input.getType());
switch (DataType.of(input.getType())) {
case HASH:
struct.put(FIELD_HASH, input.getValue());
break;
case JSON:
struct.put(FIELD_JSON, input.getValue());
break;
case LIST:
struct.put(FIELD_LIST, input.getValue());
break;
case SET:
struct.put(FIELD_SET, list(input));
break;
case STRING:
struct.put(FIELD_STRING, input.getValue());
break;
case ZSET:
struct.put(FIELD_ZSET, zsetMap(input));
break;
default:
break;
if (input.getType() != null) {
switch (input.getType()) {
case KeyValue.TYPE_HASH:
struct.put(FIELD_HASH, input.getValue());
break;
case KeyValue.TYPE_JSON:
struct.put(FIELD_JSON, input.getValue());
break;
case KeyValue.TYPE_LIST:
struct.put(FIELD_LIST, input.getValue());
break;
case KeyValue.TYPE_SET:
struct.put(FIELD_SET, list(input));
break;
case KeyValue.TYPE_STRING:
struct.put(FIELD_STRING, input.getValue());
break;
case KeyValue.TYPE_ZSET:
struct.put(FIELD_ZSET, zsetMap(input));
break;
default:
break;
}
}
return struct;
}

@SuppressWarnings("unchecked")
private Object list(KeyValue<String, Object> input) {
private Object list(KeyValue<String> input) {
return new ArrayList<>((Collection<String>) input.getValue());
}

@SuppressWarnings("unchecked")
public static Map<Double, String> zsetMap(KeyValue<String, Object> input) {
public static Map<Double, String> zsetMap(KeyValue<String> input) {
Collection<ScoredValue<String>> value = (Collection<ScoredValue<String>>) input.getValue();
return zsetMap(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.redis.kafka.connect;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -37,7 +36,6 @@
import com.redis.kafka.connect.source.ToStructFunction;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.spring.batch.item.redis.RedisItemWriter;
import com.redis.spring.batch.item.redis.common.DataType;
import com.redis.spring.batch.item.redis.common.KeyValue;
import com.redis.spring.batch.item.redis.gen.GeneratorItemReader;
import com.redis.spring.batch.test.AbstractTestBase;
Expand Down Expand Up @@ -417,7 +415,7 @@ void pollKeys(TestInfo info) throws Exception {
final List<SourceRecord> sourceRecords = new ArrayList<>();
Executors.newSingleThreadScheduledExecutor().execute(() -> {
GeneratorItemReader generator = generator(count);
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct();
RedisItemWriter<String, String, KeyValue<String>> writer = RedisItemWriter.struct();
writer.setClient(redisClient);
try {
run(info, step(info, 1, generator, null, writer));
Expand Down Expand Up @@ -455,21 +453,21 @@ public Compare(Object expected, Object actual) {

private Compare values(Struct struct) {
String key = struct.getString(ToStructFunction.FIELD_KEY);
DataType type = DataType.of(struct.getString(ToStructFunction.FIELD_TYPE));
Assertions.assertEquals(redisConnection.sync().type(key), type.getString());
String type = struct.getString(ToStructFunction.FIELD_TYPE);
Assertions.assertEquals(redisConnection.sync().type(key), type);
RedisModulesCommands<String, String> commands = redisConnection.sync();
switch (type) {
case HASH:
case KeyValue.TYPE_HASH:
return compare(commands.hgetall(key), struct.getMap(ToStructFunction.FIELD_HASH));
case JSON:
case KeyValue.TYPE_JSON:
return compare(commands.jsonGet(key, "."), struct.getString(ToStructFunction.FIELD_JSON));
case LIST:
case KeyValue.TYPE_LIST:
return compare(commands.lrange(key, 0, -1), struct.getArray(ToStructFunction.FIELD_LIST));
case SET:
case KeyValue.TYPE_SET:
return compare(commands.smembers(key), new HashSet<>(struct.getArray(ToStructFunction.FIELD_SET)));
case STRING:
case KeyValue.TYPE_STRING:
return compare(commands.get(key), struct.getString(ToStructFunction.FIELD_STRING));
case ZSET:
case KeyValue.TYPE_ZSET:
return compare(ToStructFunction.zsetMap(commands.zrangeWithScores(key, 0, -1)),
struct.getMap(ToStructFunction.FIELD_ZSET));
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@

import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.springframework.util.unit.DataSize;

import com.redis.enterprise.Database;
import com.redis.enterprise.RedisModule;
import com.redis.enterprise.testcontainers.RedisEnterpriseContainer;
import com.redis.testcontainers.RedisServer;

@EnabledOnOs(value = OS.LINUX)
class EnterpriseSinkIntegrationTests extends AbstractSinkIntegrationTests {

private static final RedisEnterpriseContainer container = new RedisEnterpriseContainer(
RedisEnterpriseContainer.DEFAULT_IMAGE_NAME.withTag(RedisEnterpriseContainer.DEFAULT_TAG))
.withDatabase(Database.builder().name("BatchTests").memory(DataSize.ofMegabytes(50).toBytes())
.ossCluster(true).modules(RedisModule.JSON, RedisModule.TIMESERIES, RedisModule.SEARCH).build());
private static final RedisEnterpriseContainer container = RedisContainerFactory.enterprise();

@Override
protected RedisServer getRedisServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@

import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.springframework.util.unit.DataSize;

import com.redis.enterprise.Database;
import com.redis.enterprise.RedisModule;
import com.redis.enterprise.testcontainers.RedisEnterpriseContainer;
import com.redis.testcontainers.RedisServer;

@EnabledOnOs(value = OS.LINUX)
class EnterpriseSourceIntegrationTests extends AbstractSourceIntegrationTests {

private static final RedisEnterpriseContainer container = new RedisEnterpriseContainer(
RedisEnterpriseContainer.DEFAULT_IMAGE_NAME.withTag(RedisEnterpriseContainer.DEFAULT_TAG))
.withDatabase(Database.builder().name("BatchTests").memory(DataSize.ofMegabytes(50).toBytes())
.ossCluster(true).modules(RedisModule.JSON, RedisModule.TIMESERIES, RedisModule.SEARCH).build());
private static final RedisEnterpriseContainer container = RedisContainerFactory.enterprise();

@Override
protected RedisServer getRedisServer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.redis.kafka.connect;

import com.redis.enterprise.Database;
import com.redis.enterprise.RedisModule;
import com.redis.enterprise.testcontainers.RedisEnterpriseContainer;
import com.redis.enterprise.testcontainers.RedisEnterpriseServer;
import com.redis.testcontainers.RedisStackContainer;

public interface RedisContainerFactory {

String ENTERPRISE_TAG = "7.4.6-102";
String STACK_TAG = "7.2.0-v13";

static RedisStackContainer stack() {
return new RedisStackContainer(RedisStackContainer.DEFAULT_IMAGE_NAME.withTag(STACK_TAG));
}

@SuppressWarnings("resource")
static RedisEnterpriseContainer enterprise() {
return new RedisEnterpriseContainer(RedisEnterpriseContainer.DEFAULT_IMAGE_NAME.withTag(ENTERPRISE_TAG))
.withDatabase(Database.builder().name("ConnectorTests").memoryMB(50).ossCluster(true)
.modules(RedisModule.TIMESERIES, RedisModule.JSON, RedisModule.SEARCH).build());
}

static RedisEnterpriseServer enterpriseServer() {
RedisEnterpriseServer server = new RedisEnterpriseServer();
server.withDatabase(Database.builder().shardCount(2).port(12001).ossCluster(true)
.modules(RedisModule.JSON, RedisModule.SEARCH, RedisModule.TIMESERIES).build());
return server;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

class StackSinkIntegrationTests extends AbstractSinkIntegrationTests {

private static final RedisStackContainer container = new RedisStackContainer(
RedisStackContainer.DEFAULT_IMAGE_NAME.withTag(RedisStackContainer.DEFAULT_TAG));
private static final RedisStackContainer container = RedisContainerFactory.stack();

@Override
protected RedisServer getRedisServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

class StackSourceIntegrationTests extends AbstractSourceIntegrationTests {

private static final RedisStackContainer container = new RedisStackContainer(
RedisStackContainer.DEFAULT_IMAGE_NAME.withTag(RedisStackContainer.DEFAULT_TAG));
private static final RedisStackContainer container = RedisContainerFactory.stack();

@Override
protected RedisServer getRedisServer() {
Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ targetCompatibility = 17
reproducibleBuild = true

asciidoctorVersion = 3.3.2
bootPluginVersion = 3.3.4
bootPluginVersion = 3.3.5
dependencyPluginVersion = 1.1.6
gitPluginVersion = 3.0.0
jacocoPluginVersion = 0.8.12
Expand All @@ -30,7 +30,7 @@ kordampPluginVersion = 0.54.0
shadowPluginVersion = 8.1.8

lettucemodVersion = 4.1.0
springBatchRedisVersion = 4.4.9-SNAPSHOT
springBatchRedisVersion = 4.5.1-SNAPSHOT
testcontainersRedisVersion = 2.2.2

org.gradle.daemon = false
Expand Down

0 comments on commit 2674632

Please sign in to comment.