Skip to content

Commit

Permalink
* Remove org.springframework.cloud dependencies from the project
Browse files Browse the repository at this point in the history
since we don't use `@PollableBean` anymore, which comes from the `spring-cloud-function-context`
* Simplify `JdbcSupplierConfiguration` and `MongodbSupplierConfiguration` code more: more injections to the respective bean method.
* Use `(__) ->` lambda syntax for unused argument
* Remove unused `ThreadLocalFluxSinkMessageChannel` internal class
* Update Copyrights of the classes in this change
  • Loading branch information
artembilan committed Jan 16, 2025
1 parent 5eea911 commit 2c65dfa
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 60 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ allprojects {
mavenBom "io.debezium:debezium-bom:$debeziumVersion"
mavenBom "io.awspring.cloud:spring-cloud-aws-dependencies:$springCloudAwsVersion"
mavenBom "org.springframework.boot:spring-boot-dependencies:$springBootVersion"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
mavenBom "ai.djl:bom:$djlVersion"
}
}
Expand Down
1 change: 0 additions & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
ext {
springBootVersion = '3.4.1'
springCloudVersion = '2024.0.0'
springCloudAwsVersion = '3.2.1'

debeziumVersion = '3.0.6.Final'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2024 the original author or authors.
* Copyright 2011-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,9 @@
package org.springframework.cloud.fn.splitter;

import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -33,13 +31,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

/**
* Auto-configuration for Splitter function.
Expand Down Expand Up @@ -67,7 +63,7 @@ public Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction(
inputChannel.subscribe(messageSplitter);
FluxMessageChannel outputChannel = new FluxMessageChannel();
messageSplitter.setOutputChannel(outputChannel);
return (messageFlux) -> Flux.from(outputChannel).doOnRequest((l) -> inputChannel.subscribeTo(messageFlux));
return (messageFlux) -> Flux.from(outputChannel).doOnRequest((__) -> inputChannel.subscribeTo(messageFlux));
}

@Bean
Expand Down Expand Up @@ -118,22 +114,4 @@ static class FileMarkers {

}

private static final class ThreadLocalFluxSinkMessageChannel
implements MessageChannel, ReactiveStreamsSubscribableChannel {

private final ThreadLocal<List<Message<?>>> publisherThreadLocal = new ThreadLocal<>();

@Override
@SuppressWarnings("unchecked")
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
this.publisherThreadLocal.set(Flux.from(publisher).collectList().cast(List.class).block());
}

@Override
public boolean send(Message<?> message, long l) {
throw new UnsupportedOperationException("This channel only supports a reactive 'subscribeTo()' ");
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2024 the original author or authors.
* Copyright 2011-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
1 change: 0 additions & 1 deletion supplier/spring-jdbc-supplier/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ dependencies {
api project(':spring-splitter-function')
api 'org.springframework.integration:spring-integration-jdbc'
api 'org.springframework.boot:spring-boot-starter-jdbc'
api 'org.springframework.cloud:spring-cloud-function-context'

runtimeOnly 'org.hsqldb:hsqldb'
runtimeOnly 'com.h2database:h2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,23 +46,14 @@
@EnableConfigurationProperties(JdbcSupplierProperties.class)
public class JdbcSupplierConfiguration {

private final JdbcSupplierProperties properties;

private final DataSource dataSource;

public JdbcSupplierConfiguration(JdbcSupplierProperties properties, DataSource dataSource) {
this.properties = properties;
this.dataSource = dataSource;
}

@Bean
public JdbcPollingChannelAdapter jdbcMessageSource(
public JdbcPollingChannelAdapter jdbcMessageSource(JdbcSupplierProperties properties, DataSource dataSource,
@Nullable ComponentCustomizer<JdbcPollingChannelAdapter> jdbcPollingChannelAdapterCustomizer) {

JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource,
this.properties.getQuery());
jdbcPollingChannelAdapter.setMaxRows(this.properties.getMaxRows());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
properties.getQuery());
jdbcPollingChannelAdapter.setMaxRows(properties.getMaxRows());
jdbcPollingChannelAdapter.setUpdateSql(properties.getUpdate());
if (jdbcPollingChannelAdapterCustomizer != null) {
jdbcPollingChannelAdapterCustomizer.customize(jdbcPollingChannelAdapter);
}
Expand Down
1 change: 0 additions & 1 deletion supplier/spring-mongodb-supplier/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ dependencies {
api project(':spring-splitter-function')
api 'org.springframework.integration:spring-integration-mongodb'
api 'org.mongodb:mongodb-driver-sync'
api 'org.springframework.cloud:spring-cloud-function-context'

testImplementation 'org.testcontainers:mongodb'
testImplementation project(':spring-mongodb-consumer').sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2024 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,15 +48,6 @@
@EnableConfigurationProperties({ MongodbSupplierProperties.class })
public class MongodbSupplierConfiguration {

private final MongodbSupplierProperties properties;

private final MongoTemplate mongoTemplate;

public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoTemplate mongoTemplate) {
this.properties = properties;
this.mongoTemplate = mongoTemplate;
}

@Bean(name = "mongodbSupplier")
@ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MongoDbMessageSource mongoDbSource,
Expand All @@ -72,15 +63,18 @@ public Supplier<Message<?>> mongodbSupplier(MongoDbMessageSource mongoDbSource)
}

@Bean
public MongoDbMessageSource mongoDbSource(
public MongoDbMessageSource mongoDbSource(MongodbSupplierProperties properties, MongoTemplate mongoTemplate,
@Nullable ComponentCustomizer<MongoDbMessageSource> mongoDbMessageSourceCustomizer) {

Expression queryExpression = (this.properties.getQueryExpression() != null)
? this.properties.getQueryExpression() : new LiteralExpression(this.properties.getQuery());
MongoDbMessageSource mongoDbMessageSource = new MongoDbMessageSource(this.mongoTemplate, queryExpression);
mongoDbMessageSource.setCollectionNameExpression(new LiteralExpression(this.properties.getCollection()));
Expression queryExpression = properties.getQueryExpression();
if (queryExpression == null) {
queryExpression = new LiteralExpression(properties.getQuery());
}

MongoDbMessageSource mongoDbMessageSource = new MongoDbMessageSource(mongoTemplate, queryExpression);
mongoDbMessageSource.setCollectionNameExpression(new LiteralExpression(properties.getCollection()));
mongoDbMessageSource.setEntityClass(String.class);
mongoDbMessageSource.setUpdateExpression(this.properties.getUpdateExpression());
mongoDbMessageSource.setUpdateExpression(properties.getUpdateExpression());

if (mongoDbMessageSourceCustomizer != null) {
mongoDbMessageSourceCustomizer.customize(mongoDbMessageSource);
Expand Down

0 comments on commit 2c65dfa

Please sign in to comment.