Skip to content

Commit 48d1963

Browse files
joshlongphilwebb
authored andcommitted
Lesson 6: Dealing with Integration and Batch Processing
1 parent bfb4535 commit 48d1963

File tree

22 files changed

+659
-0
lines changed

22 files changed

+659
-0
lines changed

livelessons-integration/README.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
:compat-mode:
2+
= Lesson 6: Dealing With Integration and Batch Processing
3+
4+
_Connect Microservices using Spring Integration and Spring Batch._
5+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>livelessons</groupId>
7+
<artifactId>livelessons-integration</artifactId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
</parent>
10+
<artifactId>livelessons-integration-basic-messaging</artifactId>
11+
<properties>
12+
<main.basedir>../..</main.basedir>
13+
</properties>
14+
<dependencies>
15+
<dependency>
16+
<groupId>org.springframework.boot</groupId>
17+
<artifactId>spring-boot-starter-amqp</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>org.springframework</groupId>
21+
<artifactId>spring-messaging</artifactId>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-starter-test</artifactId>
26+
<scope>test</scope>
27+
</dependency>
28+
</dependencies>
29+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package demo;
2+
3+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
4+
import org.springframework.messaging.Message;
5+
import org.springframework.stereotype.Component;
6+
7+
@Component
8+
public class Consumer {
9+
10+
@RabbitListener(queues = MessagingApplication.NOTIFICATIONS)
11+
public void onNotification(Message<Notification> notification) {
12+
System.out.println("received " + notification.toString());
13+
System.out.println("received payload " + notification.getPayload());
14+
System.out.println("received headers " + notification.getHeaders().toString());
15+
}
16+
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package demo;
2+
3+
import org.springframework.amqp.core.AmqpAdmin;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.beans.factory.InitializingBean;
9+
import org.springframework.boot.SpringApplication;
10+
import org.springframework.boot.autoconfigure.SpringBootApplication;
11+
import org.springframework.context.annotation.Bean;
12+
13+
@SpringBootApplication
14+
public class MessagingApplication {
15+
16+
public static final String NOTIFICATIONS = "notifications";
17+
18+
@Bean
19+
public InitializingBean prepareQueues(AmqpAdmin amqpAdmin) {
20+
return () -> {
21+
Queue queue = new Queue(NOTIFICATIONS, true);
22+
DirectExchange exchange = new DirectExchange(NOTIFICATIONS);
23+
Binding binding = BindingBuilder.bind(queue).to(exchange).with(NOTIFICATIONS);
24+
amqpAdmin.declareQueue(queue);
25+
amqpAdmin.declareExchange(exchange);
26+
amqpAdmin.declareBinding(binding);
27+
28+
};
29+
}
30+
31+
public static void main(String[] args) {
32+
SpringApplication.run(MessagingApplication.class, args);
33+
}
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package demo;
2+
3+
import java.io.Serializable;
4+
import java.util.Date;
5+
6+
public class Notification implements Serializable {
7+
8+
private String id;
9+
10+
private final String message;
11+
12+
private final Date date;
13+
14+
public Notification(String id, String message, Date date) {
15+
this.message = message;
16+
this.id = id;
17+
this.date = date;
18+
}
19+
20+
public String getId() {
21+
return id;
22+
}
23+
24+
public String getMessage() {
25+
return message;
26+
}
27+
28+
public Date getDate() {
29+
return date;
30+
}
31+
32+
@Override
33+
public String toString() {
34+
return "Notification{" + "message='" + message + '\'' + ", date=" + date + '}';
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package demo;
2+
3+
import java.util.Date;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.UUID;
7+
8+
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.boot.CommandLineRunner;
11+
import org.springframework.stereotype.Component;
12+
13+
@Component
14+
public class Producer implements CommandLineRunner {
15+
16+
@Autowired
17+
private RabbitMessagingTemplate messagingTemplate;
18+
19+
@Override
20+
public void run(String... args) throws Exception {
21+
Notification notification = new Notification(UUID.randomUUID().toString(),
22+
"Hello, world!", new Date());
23+
24+
Map<String, Object> headers = new HashMap<>();
25+
headers.put("notification-id", notification.getId());
26+
27+
this.messagingTemplate.convertAndSend(MessagingApplication.NOTIFICATIONS,
28+
notification, headers, message -> {
29+
System.out.println("sending " + message.getPayload().toString());
30+
return message;
31+
});
32+
}
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>livelessons</groupId>
7+
<artifactId>livelessons-integration</artifactId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
</parent>
10+
<artifactId>livelessons-integration-batch</artifactId>
11+
<properties>
12+
<main.basedir>../..</main.basedir>
13+
</properties>
14+
<dependencies>
15+
<dependency>
16+
<groupId>org.springframework.boot</groupId>
17+
<artifactId>spring-boot-starter-batch</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>com.h2database</groupId>
21+
<artifactId>h2</artifactId>
22+
</dependency>
23+
</dependencies>
24+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package demo;
2+
3+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
4+
import org.springframework.boot.SpringApplication;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
7+
/**
8+
* This demonstrates the Spring Batch Java configuration DSL. Spring Batch automatically
9+
* runs jobs that are in the context for you (the poster example for the
10+
* {@link org.springframework.boot.CommandLineRunner}. Spring Batch emits events (via the
11+
* usual Spring {@link org.springframework.context.ApplicationEventPublisher} mechanism)
12+
* when a Spring Batch {@link org.springframework.batch.core.Job} is finished executing.
13+
*/
14+
@SpringBootApplication
15+
@EnableBatchProcessing
16+
public class BatchApplication {
17+
18+
public static void main(String[] args) {
19+
SpringApplication.run(BatchApplication.class, args);
20+
}
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package demo;
2+
3+
public class Contact {
4+
5+
private String firstName;
6+
7+
private String lastName;
8+
9+
private String email;
10+
11+
public Contact() {
12+
}
13+
14+
public Contact(String firstName, String lastName, String email) {
15+
this.firstName = firstName;
16+
this.lastName = lastName;
17+
this.email = email;
18+
}
19+
20+
public String getFirstName() {
21+
return firstName;
22+
}
23+
24+
public void setFirstName(String firstName) {
25+
this.firstName = firstName;
26+
}
27+
28+
public String getLastName() {
29+
return lastName;
30+
}
31+
32+
public void setLastName(String lastName) {
33+
this.lastName = lastName;
34+
}
35+
36+
public String getEmail() {
37+
return email;
38+
}
39+
40+
public void setEmail(String email) {
41+
this.email = email;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "Contact{" + "firstName='" + firstName + '\'' + ", lastName='" + lastName
47+
+ '\'' + ", email='" + email + '\'' + '}';
48+
}
49+
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package demo;
2+
3+
import javax.sql.DataSource;
4+
5+
import org.springframework.batch.core.Job;
6+
import org.springframework.batch.core.Step;
7+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
8+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
9+
import org.springframework.batch.core.launch.support.RunIdIncrementer;
10+
import org.springframework.batch.item.ItemProcessor;
11+
import org.springframework.batch.item.ItemReader;
12+
import org.springframework.batch.item.ItemWriter;
13+
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
14+
import org.springframework.batch.item.database.JdbcBatchItemWriter;
15+
import org.springframework.batch.item.file.FlatFileItemReader;
16+
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
17+
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
18+
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
19+
import org.springframework.beans.factory.annotation.Autowired;
20+
import org.springframework.boot.autoconfigure.batch.JobExecutionEvent;
21+
import org.springframework.context.ApplicationListener;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Configuration;
24+
import org.springframework.core.io.ClassPathResource;
25+
import org.springframework.jdbc.core.JdbcTemplate;
26+
import org.springframework.stereotype.Component;
27+
28+
@Configuration
29+
public class ContactBatchJobConfiguration {
30+
31+
@Bean
32+
public ItemProcessor<Contact, Contact> processor() {
33+
return (person) -> new Contact(person.getFirstName(), person.getLastName(),
34+
person.getEmail().toLowerCase());
35+
}
36+
37+
@Bean
38+
public ItemReader<Contact> reader() {
39+
FlatFileItemReader<Contact> reader = new FlatFileItemReader<>();
40+
reader.setResource(new ClassPathResource("data.csv"));
41+
reader.setLineMapper(new DefaultLineMapper<Contact>() {
42+
{
43+
setLineTokenizer(new DelimitedLineTokenizer() {
44+
{
45+
setNames(new String[] { "firstName", "lastName", "email" });
46+
}
47+
});
48+
setFieldSetMapper(new BeanWrapperFieldSetMapper<Contact>() {
49+
{
50+
setTargetType(Contact.class);
51+
}
52+
});
53+
}
54+
});
55+
return reader;
56+
}
57+
58+
@Bean
59+
public ItemWriter<Contact> writer(DataSource dataSource) {
60+
JdbcBatchItemWriter<Contact> writer = new JdbcBatchItemWriter<>();
61+
writer.setItemSqlParameterSourceProvider(
62+
new BeanPropertyItemSqlParameterSourceProvider<>());
63+
writer.setSql("INSERT INTO contact (first_name, last_name, email) "
64+
+ "VALUES (:firstName, :lastName, :email)");
65+
writer.setDataSource(dataSource);
66+
return writer;
67+
}
68+
69+
@Bean
70+
public Job importUserJob(JobBuilderFactory jobs, Step s1) {
71+
return jobs.get("importUserJob").incrementer(new RunIdIncrementer()).flow(s1)
72+
.end().build();
73+
}
74+
75+
@Bean
76+
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Contact> reader,
77+
ItemWriter<Contact> writer, ItemProcessor<Contact, Contact> processor) {
78+
return stepBuilderFactory.get("step1").<Contact, Contact>chunk(10).reader(reader)
79+
.processor(processor).writer(writer).build();
80+
}
81+
82+
@Bean
83+
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
84+
return new JdbcTemplate(dataSource);
85+
}
86+
87+
@Component
88+
public static class BatchJobFinishedListener
89+
implements ApplicationListener<JobExecutionEvent> {
90+
91+
@Autowired
92+
private JdbcTemplate jdbcTemplate;
93+
94+
@Override
95+
public void onApplicationEvent(JobExecutionEvent event) {
96+
System.out.println("finished " + event.getJobExecution().toString());
97+
jdbcTemplate
98+
.query("SELECT first_name, last_name, email FROM contact",
99+
(rs, i) -> new Contact(rs.getString("first_name"),
100+
rs.getString("last_name"), rs.getString("email")))
101+
.forEach(System.out::println);
102+
}
103+
104+
}
105+
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
3+
4+
Spencer,Gibb,[email protected]
5+
Juergen,Hoeller,[email protected]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
DROP TABLE contact IF EXISTS;
2+
3+
CREATE TABLE contact(
4+
id IDENTITY primary key,
5+
email VARCHAR (20),
6+
first_name VARCHAR(20),
7+
last_name VARCHAR(20)
8+
);

0 commit comments

Comments
 (0)