Skip to content

Commit f45d9a5

Browse files
authored
Merge pull request bcgov#150 from peggy-ntt/sftpFilter
add filter for sftp only process file once.
2 parents 983f014 + 3058c97 commit f45d9a5

File tree

12 files changed

+55
-27
lines changed

12 files changed

+55
-27
lines changed

README.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ Sets the maximum attempt to reprocess a message in the queue.
187187

188188
#### Description
189189

190-
Using this plugin you can receive messages from a specified Sftp server.
190+
Using this plugin you can receive messages from a specified Sftp server when there is a new file.
191+
This Sftp plugin will only receive a file once, it uses the server's file timestamp to detect if we've already 'processed' this file.
192+
It needs redis data structure store as Metadata Store.
191193

192194
#### Setup
193195

@@ -489,6 +491,10 @@ To view the message in a queue, login to [rabbitmq management console](http://lo
489491

490492
#### If you want to run the sample app using sftp do the following:
491493

494+
step 0. Create a redis container
495+
```bash
496+
docker run --name some-redis -p 6379:6379 -d redis
497+
```
492498
step 1. Create a sftp server container (from WindowsPowerShell or GitBash)
493499
```bash
494500
docker run -p 22:22 -d atmoz/sftp myname:pass:::upload

jrcc-access-spring-boot-autoconfigure/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
<groupId>org.springframework.integration</groupId>
4343
<artifactId>spring-integration-sftp</artifactId>
4444
</dependency>
45+
<dependency>
46+
<groupId>org.springframework.integration</groupId>
47+
<artifactId>spring-integration-redis</artifactId>
48+
</dependency>
4549
<dependency>
4650
<groupId>org.hibernate.validator</groupId>
4751
<artifactId>hibernate-validator</artifactId>

jrcc-access-spring-boot-autoconfigure/src/main/java/ca/bc/gov/open/jrccaccess/autoconfigure/plugins/rabbitmq/RabbitMqDocumentInput.java

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq;
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.common.Constants;
4+
import ca.bc.gov.open.jrccaccess.autoconfigure.redis.RedisStorageService;
45
import ca.bc.gov.open.jrccaccess.autoconfigure.services.DocumentReadyHandler;
56
import ca.bc.gov.open.jrccaccess.libs.DocumentReadyMessage;
67
import ca.bc.gov.open.jrccaccess.libs.DocumentStorageProperties;

jrcc-access-spring-boot-autoconfigure/src/main/java/ca/bc/gov/open/jrccaccess/autoconfigure/plugins/rabbitmq/RabbitMqDocumentOutput.java

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq;
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties;
4+
import ca.bc.gov.open.jrccaccess.autoconfigure.redis.RedisStorageService;
45
import ca.bc.gov.open.jrccaccess.libs.*;
56
import ca.bc.gov.open.jrccaccess.libs.services.exceptions.DocumentMessageException;
67
import org.slf4j.Logger;

jrcc-access-spring-boot-autoconfigure/src/main/java/ca/bc/gov/open/jrccaccess/autoconfigure/plugins/sftp/AutoConfiguration.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import ca.bc.gov.open.jrccaccess.autoconfigure.config.exceptions.KnownHostFileNotDefinedException;
55
import ca.bc.gov.open.jrccaccess.autoconfigure.config.exceptions.KnownHostFileNotFoundException;
66
import com.jcraft.jsch.ChannelSftp;
7+
import org.apache.commons.lang3.StringUtils;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
910
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -15,8 +16,11 @@
1516
import org.springframework.integration.annotation.Poller;
1617
import org.springframework.integration.annotation.ServiceActivator;
1718
import org.springframework.integration.core.MessageSource;
19+
import org.springframework.integration.file.filters.ChainFileListFilter;
1820
import org.springframework.integration.file.remote.session.CachingSessionFactory;
1921
import org.springframework.integration.file.remote.session.SessionFactory;
22+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
23+
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
2024
import org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter;
2125
import org.springframework.integration.sftp.inbound.SftpStreamingMessageSource;
2226
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
@@ -30,7 +34,7 @@
3034
@ComponentScan
3135
@EnableConfigurationProperties(SftpInputProperties.class)
3236
@ConditionalOnProperty(
33-
value="bcgov.access.input.plugin",
37+
value = "bcgov.access.input.plugin",
3438
havingValue = "sftp"
3539
)
3640
public class AutoConfiguration {
@@ -41,7 +45,6 @@ public class AutoConfiguration {
4145

4246
public AutoConfiguration(SftpInputProperties sftpInputProperties) {
4347
this.properties = sftpInputProperties;
44-
4548
logger.debug("SFTP Configuration: Host => [{}]", this.properties.getHost());
4649
logger.debug("SFTP Configuration: Port => [{}]", this.properties.getPort());
4750
logger.debug("SFTP Configuration: Username => [{}]", this.properties.getUsername());
@@ -50,10 +53,8 @@ public AutoConfiguration(SftpInputProperties sftpInputProperties) {
5053
logger.debug("SFTP Configuration: Cron => [{}]", this.properties.getCron());
5154
logger.debug("SFTP Configuration: Max Message Per Poll => [{}]", this.properties.getMaxMessagePerPoll());
5255
logger.debug("SFTP Configuration: Known Host File => [{}]", this.properties.getKnownHostFile());
53-
5456
}
5557

56-
5758
@Bean
5859
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() throws InvalidConfigException {
5960
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
@@ -68,13 +69,13 @@ public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() throws InvalidCo
6869
}
6970
boolean isAllowUnknownKeys = properties.isAllowUnknownKeys();
7071
factory.setAllowUnknownKeys(isAllowUnknownKeys);
71-
if(!isAllowUnknownKeys){
72+
if (!isAllowUnknownKeys) {
7273
String knownHostFileStr = properties.getKnownHostFile();
73-
if(knownHostFileStr == null || knownHostFileStr.equals("") )
74+
if (StringUtils.isBlank(knownHostFileStr))
7475
throw new KnownHostFileNotDefinedException("Must define known_hosts file when allow-unknown-keys is false. ");
7576

7677
File knownHostFile = new File(knownHostFileStr);
77-
if( ! knownHostFile.exists() )
78+
if (!knownHostFile.exists())
7879
throw new KnownHostFileNotFoundException("Cannot find known_hosts file when allow-unknown-keys is false.");
7980

8081
factory.setKnownHosts(properties.getKnownHostFile());
@@ -87,20 +88,23 @@ public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() throws InvalidCo
8788
public SftpRemoteFileTemplate template() {
8889
try {
8990
return new SftpRemoteFileTemplate(sftpSessionFactory());
90-
}catch(InvalidConfigException ex)
91-
{
91+
} catch (InvalidConfigException ex) {
9292
logger.error(ex.getMessage());
9393
}
9494
return null;
9595
}
9696

9797
@Bean
9898
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(cron = "${bcgov.access.input.sftp.cron}", maxMessagesPerPoll = "${bcgov.access.input.sftp.max-message-per-poll}"))
99-
public MessageSource<InputStream> sftpMessageSource() {
99+
public MessageSource<InputStream> sftpMessageSource(ConcurrentMetadataStore concurrentMetadataStore) {
100+
ChainFileListFilter<ChannelSftp.LsEntry> filterChain = new ChainFileListFilter<>();
101+
if (properties.getFilterPattern() != null && !"".equals(properties.getFilterPattern()))
102+
filterChain.addFilter(new SftpRegexPatternFileListFilter(properties.getFilterPattern()));
103+
filterChain.addFilter(new SftpPersistentAcceptOnceFileListFilter(concurrentMetadataStore, "sftpSource"));
100104
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
101105
messageSource.setRemoteDirectory(properties.getRemoteDirectory());
102-
if(properties.getFilterPattern() != null && !"".equals(properties.getFilterPattern()))
103-
messageSource.setFilter(new SftpRegexPatternFileListFilter(properties.getFilterPattern()));
106+
messageSource.setFilter(filterChain);
107+
104108
return messageSource;
105109
}
106110

Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
package ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq;
1+
package ca.bc.gov.open.jrccaccess.autoconfigure.redis;
22

3+
import ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq.RabbitMqOutputProperties;
34
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
45
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
56
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
67
import org.springframework.boot.autoconfigure.data.redis.RedisProperties.Sentinel;
78
import org.springframework.cache.CacheManager;
89
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.ComponentScan;
911
import org.springframework.context.annotation.Configuration;
1012
import org.springframework.data.redis.cache.RedisCacheConfiguration;
1113
import org.springframework.data.redis.cache.RedisCacheManager;
@@ -14,21 +16,22 @@
1416
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
1517
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
1618
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
19+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
20+
import org.springframework.integration.redis.metadata.RedisMetadataStore;
1721

1822
import javax.naming.OperationNotSupportedException;
1923
import java.time.Duration;
2024
import java.util.ArrayList;
2125
import java.util.List;
2226

23-
2427
/**
2528
* Redis configuration properties
2629
* @author alexjoybc
2730
* @since 0.4.0
2831
*/
2932
@Configuration
30-
@ConditionalOnExpression("'${bcgov.access.input.plugin}' == 'rabbitmq' || '${bcgov.access.output.plugin}' == 'rabbitmq'")
31-
public class RedisConfiguration {
33+
@ComponentScan
34+
public class AutoConfiguration {
3235

3336
/**
3437
* Configure the JedisConnectionFactory
@@ -38,9 +41,9 @@ public class RedisConfiguration {
3841
*/
3942
@Bean
4043
@ConditionalOnMissingBean(JedisConnectionFactory.class)
44+
@ConditionalOnExpression("'${bcgov.access.input.plugin}' == 'rabbitmq' || '${bcgov.access.output.plugin}' == 'rabbitmq' || '${bcgov.access.input.plugin}' == 'sftp'")
4145
public JedisConnectionFactory jedisConnectionFactory(RedisProperties properties) {
42-
43-
46+
4447
if(properties.getCluster() != null) {
4548
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(properties.getCluster().getNodes());
4649
redisClusterConfiguration.setPassword(properties.getPassword());
@@ -65,8 +68,7 @@ public JedisConnectionFactory jedisConnectionFactory(RedisProperties properties)
6568
redisStandaloneConfiguration.setPassword(properties.getPassword());
6669
return new JedisConnectionFactory(redisStandaloneConfiguration);
6770
}
68-
69-
71+
7072
private List<RedisNode> createSentinels(Sentinel sentinel) {
7173
List<RedisNode> nodes = new ArrayList<>();
7274
for (String node : sentinel.getNodes()) {
@@ -90,6 +92,7 @@ private List<RedisNode> createSentinels(Sentinel sentinel) {
9092
*/
9193
@Bean(name = "Document")
9294
@ConditionalOnMissingBean(CacheManager.class)
95+
@ConditionalOnExpression("'${bcgov.access.input.plugin}' == 'rabbitmq' || '${bcgov.access.output.plugin}' == 'rabbitmq'")
9396
public CacheManager cacheManager(JedisConnectionFactory jedisConnectionFactory, RabbitMqOutputProperties rabbitMqOutputProperties) {
9497

9598
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
@@ -100,5 +103,11 @@ public CacheManager cacheManager(JedisConnectionFactory jedisConnectionFactory,
100103
return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(jedisConnectionFactory)
101104
.cacheDefaults(redisCacheConfiguration).build();
102105
}
103-
106+
107+
@Bean
108+
@ConditionalOnMissingBean(ConcurrentMetadataStore.class)
109+
@ConditionalOnExpression("'${bcgov.access.input.plugin}' == 'sftp'")
110+
public ConcurrentMetadataStore redisMetadataStore(JedisConnectionFactory jedisConnectionFactory){
111+
return new RedisMetadataStore(jedisConnectionFactory);
112+
}
104113
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq;
1+
package ca.bc.gov.open.jrccaccess.autoconfigure.redis;
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties;
44
import ca.bc.gov.open.jrccaccess.libs.DocumentStorageProperties;
@@ -101,7 +101,7 @@ public String getString(String key, String digest) throws DocumentMessageExcepti
101101
public Boolean deleteString(String key) throws DocumentMessageException {
102102

103103
try {
104-
this.cacheManager.getCache(accessProperties.getInput().getDocumentType()).evict(key);
104+
this.cacheManager.getCache(accessProperties.getInput().getDocumentType()).evict(key);
105105
return true;
106106
} catch (RedisConnectionFailureException e) {
107107
throw new DocumentMessageException(serviceUnavailableMessage, e.getCause());

jrcc-access-spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
22
ca.bc.gov.open.jrccaccess.autoconfigure.AccessAutoConfiguration,\
3+
ca.bc.gov.open.jrccaccess.autoconfigure.redis.AutoConfiguration,\
34
ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq.AutoConfiguration,\
45
ca.bc.gov.open.jrccaccess.autoconfigure.plugins.console.AutoConfiguration,\
56
ca.bc.gov.open.jrccaccess.autoconfigure.plugins.http.AutoConfiguration, \

jrcc-access-spring-boot-autoconfigure/src/test/java/ca/bc/gov/open/jrccaccess/autoconfigure/plugins/rabbitmq/RabbitMqDocumentInputTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties;
44
import ca.bc.gov.open.jrccaccess.autoconfigure.helpers.RandomHelper;
5+
import ca.bc.gov.open.jrccaccess.autoconfigure.redis.RedisStorageService;
56
import ca.bc.gov.open.jrccaccess.autoconfigure.services.DocumentReadyHandler;
67
import ca.bc.gov.open.jrccaccess.libs.DocumentReadyMessage;
78
import ca.bc.gov.open.jrccaccess.libs.DocumentStorageProperties;

jrcc-access-spring-boot-autoconfigure/src/test/java/ca/bc/gov/open/jrccaccess/autoconfigure/plugins/rabbitmq/RabbitMqDocumentOutputTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties;
44
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties.PluginConfig;
5+
import ca.bc.gov.open.jrccaccess.autoconfigure.redis.RedisStorageService;
56
import ca.bc.gov.open.jrccaccess.libs.DocumentStorageProperties;
67
import ca.bc.gov.open.jrccaccess.libs.TransactionInfo;
78
import org.junit.Before;
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ca.bc.gov.open.jrccaccess.autoconfigure.plugins.rabbitmq;
1+
package ca.bc.gov.open.jrccaccess.autoconfigure.redis;
22

33
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties;
44
import ca.bc.gov.open.jrccaccess.autoconfigure.AccessProperties.PluginConfig;

jrcc-access-spring-boot-sample-app/src/main/resources/application.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ logging:
77
bc: DEBUG
88
pattern:
99
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} %X{transaction.filename} %X{transaction.id} - %msg%n"
10-
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} %X{transaction.filename} %X{transaction.id} - %msg%n"
10+
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} %X{transaction.filename} - %msg%n"
1111
bcgov:
1212
access:
1313
input:
@@ -16,4 +16,4 @@ bcgov:
1616
plugin: console
1717
output:
1818
document-type: test-doc
19-
plugin: console
19+
plugin: console

0 commit comments

Comments
 (0)