Skip to content

Commit ae075fe

Browse files
authoredOct 10, 2022
Merge pull request #2 from IoT-Technology/lesson13/ssl-encrypted
Lesson13/ssl encrypted merge to master
2 parents c54b1f7 + b231053 commit ae075fe

32 files changed

+1288
-44
lines changed
 

‎pom.xml

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!--
2-
Copyright © 2021 IOT Technical Guide - The MQTTv5 brokerAuthors
2+
Copyright © 2021 IOT Technical Guide - The MQTT broker Authors
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
<maven.compiler.source>8</maven.compiler.source>
3535
<maven.compiler.target>8</maven.compiler.target>
3636
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
37+
<docker.image.prefix>iot-technology</docker.image.prefix>
3738
<lombok.version>1.18.18</lombok.version>
3839
<mapstruct.version>1.3.1.Final</mapstruct.version>
3940
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
@@ -42,6 +43,8 @@
4243
<slf4j.version>1.7.32</slf4j.version>
4344
<logback.version>1.2.6</logback.version>
4445
<apache.commons-lang3.version>3.12.0</apache.commons-lang3.version>
46+
<guava.version>30.0-jre</guava.version>
47+
<bouncycastle.version>1.67</bouncycastle.version>
4548
<redisson.version>3.13.6</redisson.version>
4649
</properties>
4750

@@ -100,6 +103,21 @@
100103
<artifactId>commons-lang3</artifactId>
101104
<version>${apache.commons-lang3.version}</version>
102105
</dependency>
106+
<dependency>
107+
<groupId>com.google.guava</groupId>
108+
<artifactId>guava</artifactId>
109+
<version>${guava.version}</version>
110+
</dependency>
111+
<dependency>
112+
<groupId>org.bouncycastle</groupId>
113+
<artifactId>bcprov-jdk15on</artifactId>
114+
<version>${bouncycastle.version}</version>
115+
</dependency>
116+
<dependency>
117+
<groupId>org.bouncycastle</groupId>
118+
<artifactId>bcpkix-jdk15on</artifactId>
119+
<version>${bouncycastle.version}</version>
120+
</dependency>
103121
<dependency>
104122
<groupId>org.springframework</groupId>
105123
<artifactId>spring-context</artifactId>

‎server/pom.xml

+64-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
<?xml version="1.0" encoding="UTF-8"?>
1+
<!--
2+
Copyright © 2021 IOT Technical Guide - The MQTT broker Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
-->
214
<project xmlns="http://maven.apache.org/POM/4.0.0"
315
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
416
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -11,6 +23,7 @@
1123

1224
<name>IoT Technology MQTT Broker :: Server</name>
1325
<artifactId>server</artifactId>
26+
<packaging>jar</packaging>
1427

1528
<properties>
1629
<maven.compiler.source>8</maven.compiler.source>
@@ -31,22 +44,67 @@
3144
<artifactId>spring-boot-starter-test</artifactId>
3245
<scope>test</scope>
3346
</dependency>
34-
<dependency>
35-
<groupId>io.netty</groupId>
36-
<artifactId>netty-all</artifactId>
37-
</dependency>
3847
<dependency>
3948
<groupId>org.apache.commons</groupId>
4049
<artifactId>commons-lang3</artifactId>
4150
</dependency>
51+
<dependency>
52+
<groupId>com.google.guava</groupId>
53+
<artifactId>guava</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.bouncycastle</groupId>
57+
<artifactId>bcprov-jdk15on</artifactId>
58+
</dependency>
59+
<dependency>
60+
<groupId>org.bouncycastle</groupId>
61+
<artifactId>bcpkix-jdk15on</artifactId>
62+
</dependency>
4263
</dependencies>
4364

4465
<build>
4566
<plugins>
4667
<plugin>
4768
<groupId>org.springframework.boot</groupId>
4869
<artifactId>spring-boot-maven-plugin</artifactId>
49-
<version>2.3.12.RELEASE</version>
70+
<version>${spring-boot.version}</version>
71+
<configuration>
72+
<mainClass>iot.technology.mqtt.server.MQTTServerApplication</mainClass>
73+
</configuration>
74+
<executions>
75+
<execution>
76+
<id>repackage</id>
77+
<goals>
78+
<goal>repackage</goal>
79+
</goals>
80+
</execution>
81+
</executions>
82+
</plugin>
83+
<plugin>
84+
<groupId>com.spotify</groupId>
85+
<artifactId>docker-maven-plugin</artifactId>
86+
<version>1.0.0</version>
87+
<dependencies>
88+
<dependency>
89+
<groupId>javax.activation</groupId>
90+
<artifactId>activation</artifactId>
91+
<version>1.1.1</version>
92+
</dependency>
93+
</dependencies>
94+
<configuration>
95+
<!--镜像名称-->
96+
<imageName>${docker.image.prefix}/mqtt-${project.artifactId}</imageName>
97+
<dockerDirectory>src/main/docker</dockerDirectory>
98+
<resources>
99+
<resource>
100+
<targetPath>/</targetPath>
101+
<!--jar 包所在目录,缺省为target-->
102+
<directory>${project.build.directory}</directory>
103+
<!--jar 包名,缺省为 $project.artifactId}-${project.version}-->
104+
<include>${project.build.finalName}.jar</include>
105+
</resource>
106+
</resources>
107+
</configuration>
50108
</plugin>
51109
</plugins>
52110
</build>

‎server/src/main/docker/Dockerfile

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM alpine:3.10
2+
# 使用 JDK 8 环境为基础环境,如果镜像不是本地的将会从 DockerHub 进行下载
3+
FROM openjdk:8-jdk-alpine
4+
MAINTAINER mushuwei "lovewsic@gmail.com"
5+
# 在宿主机的 /var/lib/docker 目录下创建一个临时文件并把它链接到 tomcat 容器的工作目录 /tmp目录
6+
VOLUME /tmp
7+
# 复制文件并重命名 spring-boot-docker-1.0.jar 表示打包后的 jar 包名称
8+
ADD server-1.0-SNAPSHOT.jar iot-technology-mqtt-broker-1.0.jar
9+
# 为了缩短Tomcat启动时间,添加 java.security.egd 的系统属性指向 /dev/urandom 作为ENTRYPOINT
10+
ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/iot-technology-mqtt-broker-1.0.jar"]

‎server/src/main/java/iot/technology/mqtt/server/MqttServer.java

+33-24
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
11
package iot.technology.mqtt.server;
22

33
import io.netty.bootstrap.ServerBootstrap;
4-
import io.netty.channel.Channel;
5-
import io.netty.channel.ChannelInitializer;
6-
import io.netty.channel.ChannelPipeline;
7-
import io.netty.channel.EventLoopGroup;
4+
import io.netty.channel.*;
85
import io.netty.channel.nio.NioEventLoopGroup;
9-
import io.netty.channel.socket.SocketChannel;
106
import io.netty.channel.socket.nio.NioServerSocketChannel;
11-
import io.netty.handler.codec.mqtt.MqttDecoder;
12-
import io.netty.handler.codec.mqtt.MqttEncoder;
137
import io.netty.util.ResourceLeakDetector;
14-
import iot.technology.mqtt.server.protocol.MqttIdleStateHandler;
15-
import iot.technology.mqtt.server.protocol.ProtocolProcess;
168
import lombok.extern.slf4j.Slf4j;
179
import org.springframework.beans.factory.annotation.Value;
18-
import org.springframework.boot.context.properties.ConfigurationProperties;
10+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
1911
import org.springframework.stereotype.Component;
2012

2113
import javax.annotation.PostConstruct;
@@ -26,14 +18,23 @@
2618
* @author mushuwei
2719
*/
2820
@Component
29-
@ConfigurationProperties(prefix = "mqtt")
21+
@ConditionalOnExpression("'${mqtt.enabled}'=='true'")
3022
@Slf4j
3123
public class MqttServer {
3224
@Value("${mqtt.bind_address}")
3325
private String host;
3426
@Value("${mqtt.bind_port}")
3527
private Integer port;
3628

29+
@Value("${mqtt.ssl.enabled}")
30+
private boolean sslEnabled;
31+
32+
@Value("${mqtt.ssl.bind_address}")
33+
private String sslHost;
34+
35+
@Value("${mqtt.ssl.bind_port}")
36+
private Integer sslPort;
37+
3738
@Value("${mqtt.netty.leak_detector_level}")
3839
private String leakDetectorLevel;
3940
@Value("${mqtt.netty.boss_group_thread_count}")
@@ -43,10 +44,15 @@ public class MqttServer {
4344
@Value("${mqtt.netty.max_payload_size}")
4445
private Integer maxPayloadSize;
4546

47+
@Value("${mqtt.netty.so_keep_alive}")
48+
private boolean keepAlive;
49+
4650
@Resource
47-
private ProtocolProcess protocolProcess;
51+
private MqttTransportContext context;
4852

4953
private Channel serverChannel;
54+
55+
private Channel sslServerChannel;
5056
private EventLoopGroup bossGroup;
5157
private EventLoopGroup workerGroup;
5258

@@ -57,25 +63,25 @@ public void init() throws Exception {
5763
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
5864
log.info("Starting MQTT transport...");
5965

60-
log.info("Starting MQTT transport server");
6166
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
6267
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
6368
ServerBootstrap b = new ServerBootstrap();
6469
b.group(bossGroup, workerGroup)
6570
.channel(NioServerSocketChannel.class)
66-
.childHandler(new ChannelInitializer<SocketChannel>() {
67-
@Override
68-
protected void initChannel(SocketChannel socketChannel) throws Exception {
69-
ChannelPipeline pipeline = socketChannel.pipeline();
70-
pipeline.addLast("idle", new MqttIdleStateHandler());
71-
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
72-
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
73-
MqttTransportHandler handler = new MqttTransportHandler(protocolProcess);
74-
pipeline.addLast(handler);
75-
}
76-
});
71+
.childHandler(new MqttTransportServerInitializer(context, false))
72+
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
7773

7874
serverChannel = b.bind(host, port).sync().channel();
75+
log.info("Mqtt started on {}", port);
76+
if (sslEnabled) {
77+
b = new ServerBootstrap();
78+
b.group(bossGroup, workerGroup)
79+
.channel(NioServerSocketChannel.class)
80+
.childHandler(new MqttTransportServerInitializer(context, true))
81+
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
82+
sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
83+
log.info("Mqtt TLS/SSL started on {}", sslPort);
84+
}
7985
log.info("Mqtt transport started!");
8086
}
8187

@@ -84,6 +90,9 @@ public void shutdown() throws InterruptedException {
8490
log.info("Stopping MQTT transport!");
8591
try {
8692
serverChannel.close().sync();
93+
if (sslEnabled) {
94+
sslServerChannel.close().sync();
95+
}
8796
} finally {
8897
workerGroup.shutdownGracefully();
8998
bossGroup.shutdownGracefully();

‎server/src/main/java/iot/technology/mqtt/server/MqttSystemContext.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package iot.technology.mqtt.server;
22

33
import iot.technology.mqtt.storage.session.service.SessionStoreService;
4+
import iot.technology.mqtt.storage.subscribe.service.SubscribeStoreService;
45
import lombok.Getter;
56
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.stereotype.Component;
@@ -11,10 +12,14 @@
1112
* @author mushuwei
1213
*/
1314
@Slf4j
14-
@Component
15+
@Component("mqttSystemContext")
1516
public class MqttSystemContext {
1617

1718
@Getter
1819
@Resource
1920
private SessionStoreService sessionStoreService;
21+
22+
@Getter
23+
@Resource
24+
private SubscribeStoreService subscribeStoreService;
2025
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package iot.technology.mqtt.server;
2+
3+
import io.netty.handler.ssl.SslHandler;
4+
import iot.technology.mqtt.server.protocol.ProtocolProcess;
5+
import iot.technology.mqtt.server.ssl.MqttSslHandlerProvider;
6+
import lombok.Getter;
7+
import lombok.Setter;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
12+
import org.springframework.stereotype.Component;
13+
14+
@Slf4j
15+
@ConditionalOnExpression("'${mqtt.enabled}'=='true'")
16+
@Component
17+
public class MqttTransportContext {
18+
19+
@Getter
20+
@Autowired(required = false)
21+
private MqttSslHandlerProvider sslHandlerProvider;
22+
23+
@Getter
24+
@Value("${mqtt.netty.max_payload_size}")
25+
private Integer maxPayloadSize;
26+
27+
@Getter
28+
@Setter
29+
private SslHandler sslHandler;
30+
31+
@Getter
32+
@Value("${transport.mqtt.timeout:10000}")
33+
private long timeout;
34+
35+
@Getter
36+
@Autowired(required = false)
37+
private ProtocolProcess protocolProcess;
38+
}

‎server/src/main/java/iot/technology/mqtt/server/MqttTransportHandler.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import io.netty.channel.ChannelHandlerContext;
55
import io.netty.channel.SimpleChannelInboundHandler;
66
import io.netty.handler.codec.mqtt.MqttMessage;
7-
import iot.technology.mqtt.server.protocol.ProtocolProcess;
8-
import lombok.AllArgsConstructor;
97
import lombok.extern.slf4j.Slf4j;
108

119
import java.io.IOException;
@@ -15,14 +13,17 @@
1513
*/
1614
@Slf4j
1715
@ChannelHandler.Sharable
18-
@AllArgsConstructor
1916
public class MqttTransportHandler extends SimpleChannelInboundHandler<MqttMessage> {
2017

21-
private ProtocolProcess protocolProcess;
18+
private final MqttTransportContext transportContext;
19+
20+
public MqttTransportHandler(MqttTransportContext transportContext) {
21+
this.transportContext = transportContext;
22+
}
2223

2324
@Override
2425
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
25-
protocolProcess.process(ctx, msg);
26+
transportContext.getProtocolProcess().process(ctx, msg);
2627
}
2728

2829
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package iot.technology.mqtt.server;
2+
3+
import io.netty.channel.ChannelInitializer;
4+
import io.netty.channel.ChannelPipeline;
5+
import io.netty.channel.socket.SocketChannel;
6+
import io.netty.handler.codec.mqtt.MqttDecoder;
7+
import io.netty.handler.codec.mqtt.MqttEncoder;
8+
import io.netty.handler.ssl.SslHandler;
9+
10+
public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
11+
12+
private final MqttTransportContext context;
13+
private final boolean sslEnabled;
14+
15+
public MqttTransportServerInitializer(MqttTransportContext context, boolean sslEnabled) {
16+
this.context = context;
17+
this.sslEnabled = sslEnabled;
18+
}
19+
20+
@Override
21+
protected void initChannel(SocketChannel ch) throws Exception {
22+
ChannelPipeline pipeline = ch.pipeline();
23+
SslHandler sslHandler = null;
24+
if (sslEnabled && context.getSslHandlerProvider() != null) {
25+
sslHandler = context.getSslHandlerProvider().getSslHandler();
26+
pipeline.addLast(sslHandler);
27+
}
28+
pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
29+
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
30+
31+
MqttTransportHandler handler = new MqttTransportHandler(context);
32+
pipeline.addLast(handler);
33+
}
34+
}

‎server/src/main/java/iot/technology/mqtt/server/protocol/PublishProcessor.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package iot.technology.mqtt.server.protocol;
22

3+
import io.netty.buffer.Unpooled;
34
import io.netty.channel.Channel;
45
import io.netty.handler.codec.mqtt.*;
56
import io.netty.util.AttributeKey;
7+
import iot.technology.mqtt.server.MqttSystemContext;
8+
import iot.technology.mqtt.storage.subscribe.domain.SubscribeStore;
69
import lombok.extern.slf4j.Slf4j;
710
import org.springframework.stereotype.Component;
811

12+
import javax.annotation.Resource;
13+
import java.util.Map;
14+
915
/**
1016
* <pre>
1117
* **********************************************************************
@@ -30,6 +36,9 @@ public class PublishProcessor implements AbstractProtocolProcessor {
3036
//剩余长度
3137
private static final Integer REPLY_REMAINING_LENGTH = 2;
3238

39+
@Resource(name = "mqttSystemContext")
40+
private MqttSystemContext mqttSystemContext;
41+
3342
@Override
3443
public void processMqttProtocol(Channel channel, MqttMessage msg) {
3544
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
@@ -41,18 +50,37 @@ public void processMqttProtocol(Channel channel, MqttMessage msg) {
4150
* qosLevel 消息级别
4251
*/
4352
MqttPublishVariableHeader mqttPubVariableHeader = mqttPubMessage.variableHeader();
53+
String topicName = mqttPubVariableHeader.topicName();
4454
byte[] messageBytes = new byte[mqttPubMessage.payload().readableBytes()];
4555
mqttPubMessage.payload().getBytes(mqttPubMessage.payload().readerIndex(), messageBytes);
4656
String messageStr = new String(messageBytes);
4757
MqttQoS qosLevel = msg.fixedHeader().qosLevel();
48-
log.info("clientId:{}, qos:{}, topicName:{}, message:{}", clientId, qosLevel, mqttPubVariableHeader.topicName(), messageStr);
58+
log.info("clientId:{}, qos:{}, topicName:{}, message:{}", clientId, qosLevel, topicName, messageStr);
4959

5060
if (qosLevel == MqttQoS.AT_MOST_ONCE) {
5161
} else if (qosLevel == MqttQoS.AT_LEAST_ONCE) {
5262
this.sendPubAckMessage(channel, mqttPubVariableHeader.packetId());
5363
} else if (qosLevel == MqttQoS.EXACTLY_ONCE) {
5464
this.sendPubRecMessage(channel, mqttPubVariableHeader.packetId());
5565
}
66+
this.deliveryMessage(topicName, qosLevel, messageBytes, false, false);
67+
}
68+
69+
private void deliveryMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
70+
Map<String, SubscribeStore> subscribeStoreMap = mqttSystemContext.getSubscribeStoreService().search(topic);
71+
for (Map.Entry<String, SubscribeStore> entry : subscribeStoreMap.entrySet()) {
72+
SubscribeStore subscribeStore = entry.getValue();
73+
//看某个设备是否存活,如果存活进行投递
74+
if (mqttSystemContext.getSessionStoreService().containsKey(entry.getKey())) {
75+
MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
76+
MqttPublishMessage deliveryMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
77+
new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0),
78+
new MqttPublishVariableHeader(topic, 0),
79+
Unpooled.buffer().writeBytes(messageBytes));
80+
log.info("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value());
81+
mqttSystemContext.getSessionStoreService().get(entry.getKey()).getChannel().writeAndFlush(deliveryMessage);
82+
}
83+
}
5684
}
5785

5886
private void sendPubAckMessage(Channel channel, int messageId) {

‎server/src/main/java/iot/technology/mqtt/server/protocol/SubScribeProcessor.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import io.netty.channel.Channel;
44
import io.netty.handler.codec.mqtt.*;
55
import io.netty.util.AttributeKey;
6+
import iot.technology.mqtt.server.MqttSystemContext;
7+
import iot.technology.mqtt.storage.subscribe.domain.SubscribeStore;
68
import lombok.extern.slf4j.Slf4j;
79
import org.apache.commons.lang3.StringUtils;
810
import org.springframework.stereotype.Service;
911

12+
import javax.annotation.Resource;
1013
import java.util.ArrayList;
1114
import java.util.List;
1215

@@ -17,6 +20,9 @@
1720
@Service("subscribeProcessor")
1821
public class SubScribeProcessor implements AbstractProtocolProcessor {
1922

23+
@Resource(name = "mqttSystemContext")
24+
private MqttSystemContext mqttSystemContext;
25+
2026
@Override
2127
public void processMqttProtocol(Channel channel, MqttMessage msg) {
2228
MqttSubscribeMessage mqttSubMessage = (MqttSubscribeMessage) msg;
@@ -25,17 +31,22 @@ public void processMqttProtocol(Channel channel, MqttMessage msg) {
2531
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
2632
List<Integer> mqttQoSList = new ArrayList<Integer>();
2733
for (MqttTopicSubscription mqttTopicSubscription : topicSubscriptions) {
28-
String topicFilter = mqttTopicSubscription.topicName();
34+
String topicName = mqttTopicSubscription.topicName();
2935
MqttQoS mqttQoS = mqttTopicSubscription.qualityOfService();
3036
mqttQoSList.add(mqttQoS.value());
3137

38+
SubscribeStore subscribeStore =
39+
new SubscribeStore().setTopicName(topicName).setClientId(clientId).setMqttQoS(mqttQoS.value());
40+
mqttSystemContext.getSubscribeStoreService().put(topicName, subscribeStore);
3241
}
3342
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
3443
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
3544
MqttMessageIdVariableHeader.from(mqttSubMessage.variableHeader().messageId()),
3645
new MqttSubAckPayload(mqttQoSList));
3746
channel.writeAndFlush(subAckMessage);
3847
log.info("SUBSCRIBE - clientId: {}, topicSubscriptions: {}", clientId, topicSubscriptions);
48+
} else {
49+
channel.close();
3950
}
4051
}
4152

‎server/src/main/java/iot/technology/mqtt/server/protocol/UnSubScribeProcessor.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import io.netty.channel.Channel;
44
import io.netty.handler.codec.mqtt.*;
55
import io.netty.util.AttributeKey;
6+
import iot.technology.mqtt.server.MqttSystemContext;
67
import lombok.extern.slf4j.Slf4j;
78
import org.springframework.stereotype.Service;
89

10+
import javax.annotation.Resource;
911
import java.util.List;
1012

1113
/**
@@ -17,12 +19,19 @@
1719
@Service("unsubscribeProcessor")
1820
public class UnSubScribeProcessor implements AbstractProtocolProcessor {
1921

22+
@Resource(name = "mqttSystemContext")
23+
private MqttSystemContext mqttSystemContext;
24+
2025
@Override
2126
public void processMqttProtocol(Channel channel, MqttMessage msg) {
2227
MqttUnsubscribeMessage mqttUnsubMessage = (MqttUnsubscribeMessage) msg;
2328
String clinetId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
2429
List<String> unSubTopics = mqttUnsubMessage.payload().topics();
25-
30+
if (!unSubTopics.isEmpty()) {
31+
unSubTopics.forEach(topic -> {
32+
mqttSystemContext.getSubscribeStoreService().remove(topic, clinetId);
33+
});
34+
}
2635
MqttUnsubAckMessage unsubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
2736
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2),
2837
MqttMessageIdVariableHeader.from(mqttUnsubMessage.variableHeader().messageId()),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import org.springframework.util.StringUtils;
4+
5+
import javax.net.ssl.KeyManagerFactory;
6+
import java.security.KeyStore.PrivateKeyEntry;
7+
import javax.net.ssl.TrustManagerFactory;
8+
import java.io.IOException;
9+
import java.security.*;
10+
import java.security.cert.Certificate;
11+
import java.security.cert.X509Certificate;
12+
import java.util.*;
13+
14+
public abstract class AbstractSslCredentials implements SslCredentials {
15+
16+
private char[] keyPasswordArray;
17+
18+
private KeyStore keyStore;
19+
20+
private PrivateKey privateKey;
21+
22+
private PublicKey publicKey;
23+
24+
private X509Certificate[] chain;
25+
26+
private X509Certificate[] trusts;
27+
28+
@Override
29+
public void init(boolean trustsOnly) throws IOException, GeneralSecurityException {
30+
String keyPassword = getKeyPassword();
31+
if (StringUtils.isEmpty(keyPassword)) {
32+
this.keyPasswordArray = new char[0];
33+
} else {
34+
this.keyPasswordArray = keyPassword.toCharArray();
35+
}
36+
this.keyStore = this.loadKeyStore(trustsOnly, this.keyPasswordArray);
37+
Set<X509Certificate> trustedCerts = getTrustedCerts(this.keyStore, trustsOnly);
38+
this.trusts = trustedCerts.toArray(new X509Certificate[0]);
39+
if (!trustsOnly) {
40+
PrivateKeyEntry privateKeyEntry = null;
41+
String keyAlias = this.getKeyAlias();
42+
if (!StringUtils.isEmpty(keyAlias)) {
43+
privateKeyEntry = tryGetPrivateKeyEntry(this.keyStore, keyAlias, this.keyPasswordArray);
44+
} else {
45+
for (Enumeration<String> e = this.keyStore.aliases(); e.hasMoreElements(); ) {
46+
String alias = e.nextElement();
47+
privateKeyEntry = tryGetPrivateKeyEntry(this.keyStore, alias, this.keyPasswordArray);
48+
if (privateKeyEntry != null) {
49+
this.updateKeyAlias(alias);
50+
break;
51+
}
52+
}
53+
}
54+
if (privateKeyEntry == null) {
55+
throw new IllegalArgumentException("Failed to get private key from the keystore or pem files. " +
56+
"Please check if the private key exists in the keystore or pem files and if the provided private key password is valid.");
57+
}
58+
this.chain = asX509Certificates(privateKeyEntry.getCertificateChain());
59+
this.privateKey = privateKeyEntry.getPrivateKey();
60+
if (this.chain.length > 0) {
61+
this.publicKey = this.chain[0].getPublicKey();
62+
}
63+
}
64+
65+
}
66+
67+
@Override
68+
public KeyStore getKeyStore() {
69+
return this.keyStore;
70+
}
71+
72+
@Override
73+
public PrivateKey getPrivateKey() {
74+
return this.privateKey;
75+
}
76+
77+
@Override
78+
public PublicKey getPublicKey() {
79+
return this.publicKey;
80+
}
81+
82+
@Override
83+
public X509Certificate[] getCertificateChain() {
84+
return this.chain;
85+
}
86+
87+
@Override
88+
public X509Certificate[] getTrustedCertificates() {
89+
return this.trusts;
90+
}
91+
92+
@Override
93+
public TrustManagerFactory createTrustManagerFactory() throws NoSuchAlgorithmException, KeyStoreException {
94+
TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
95+
tmFactory.init(this.keyStore);
96+
return tmFactory;
97+
}
98+
99+
@Override
100+
public KeyManagerFactory createKeyManagerFactory() throws NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException {
101+
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
102+
kmf.init(this.keyStore, this.keyPasswordArray);
103+
return kmf;
104+
}
105+
106+
@Override
107+
public String getValueFromSubjectNameByKey(String subjectName, String key) {
108+
String[] dns = subjectName.split(",");
109+
Optional<String> cn = (Arrays.stream(dns).filter(dn -> dn.contains(key + "="))).findFirst();
110+
String value = cn.isPresent() ? cn.get().replace(key + "=", "") : null;
111+
return StringUtils.isEmpty(value) ? null : value;
112+
}
113+
114+
protected abstract boolean canUse();
115+
116+
protected abstract KeyStore loadKeyStore(boolean isPrivateKeyRequired, char[] keyPasswordArray) throws IOException, GeneralSecurityException;
117+
118+
protected abstract void updateKeyAlias(String keyAlias);
119+
120+
private static X509Certificate[] asX509Certificates(Certificate[] certificates) {
121+
if (null == certificates || 0 == certificates.length) {
122+
throw new IllegalArgumentException("certificates missing!");
123+
}
124+
X509Certificate[] x509Certificates = new X509Certificate[certificates.length];
125+
for (int index = 0; certificates.length > index; ++index) {
126+
if (null == certificates[index]) {
127+
throw new IllegalArgumentException("[" + index + "] is null!");
128+
}
129+
try {
130+
x509Certificates[index] = (X509Certificate) certificates[index];
131+
} catch (ClassCastException e) {
132+
throw new IllegalArgumentException("[" + index + "] is not a x509 certificate! Instead it's a "
133+
+ certificates[index].getClass().getName());
134+
}
135+
}
136+
return x509Certificates;
137+
}
138+
139+
private static PrivateKeyEntry tryGetPrivateKeyEntry(KeyStore keyStore, String alias, char[] pwd) {
140+
PrivateKeyEntry entry = null;
141+
try {
142+
if (keyStore.entryInstanceOf(alias, PrivateKeyEntry.class)) {
143+
try {
144+
entry = (PrivateKeyEntry) keyStore
145+
.getEntry(alias, new KeyStore.PasswordProtection(pwd));
146+
} catch (UnsupportedOperationException e) {
147+
PrivateKey key = (PrivateKey) keyStore.getKey(alias, pwd);
148+
Certificate[] certs = keyStore.getCertificateChain(alias);
149+
entry = new KeyStore.PrivateKeyEntry(key, certs);
150+
}
151+
}
152+
} catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableEntryException e) {
153+
}
154+
return entry;
155+
}
156+
157+
private static Set<X509Certificate> getTrustedCerts(KeyStore ks, boolean trustsOnly) {
158+
Set<X509Certificate> set = new HashSet<>();
159+
try {
160+
for (Enumeration<String> e = ks.aliases(); e.hasMoreElements(); ) {
161+
String alias = e.nextElement();
162+
if (ks.isCertificateEntry(alias)) {
163+
Certificate cert = ks.getCertificate(alias);
164+
if (cert instanceof X509Certificate) {
165+
if (trustsOnly) {
166+
// is CA certificate
167+
if (((X509Certificate) cert).getBasicConstraints() >= 0) {
168+
set.add((X509Certificate) cert);
169+
}
170+
} else {
171+
set.add((X509Certificate) cert);
172+
}
173+
}
174+
} else if (ks.isKeyEntry(alias)) {
175+
Certificate[] certs = ks.getCertificateChain(alias);
176+
if ((certs != null) && (certs.length > 0) && (certs[0] instanceof X509Certificate)) {
177+
if (trustsOnly) {
178+
for (Certificate cert : certs) {
179+
// is CA certificate
180+
if (((X509Certificate) cert).getBasicConstraints() >= 0) {
181+
set.add((X509Certificate) cert);
182+
}
183+
}
184+
} else {
185+
set.add((X509Certificate) certs[0]);
186+
}
187+
}
188+
189+
}
190+
}
191+
} catch (KeyStoreException ignored) {}
192+
return Collections.unmodifiableSet(set);
193+
}
194+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import iot.technology.mqtt.server.utils.ResourceUtils;
4+
import lombok.Data;
5+
import lombok.EqualsAndHashCode;
6+
import org.springframework.util.StringUtils;
7+
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.security.GeneralSecurityException;
11+
import java.security.KeyStore;
12+
13+
@Data
14+
@EqualsAndHashCode(callSuper = false)
15+
public class KeystoreSslCredentials extends AbstractSslCredentials {
16+
17+
private String type;
18+
19+
private String storeFile;
20+
21+
private String storePassword;
22+
23+
private String keyPassword;
24+
25+
private String keyAlias;
26+
27+
@Override
28+
protected boolean canUse() {
29+
return ResourceUtils.resourceExists(this, this.storeFile);
30+
}
31+
32+
@Override
33+
protected KeyStore loadKeyStore(boolean isPrivateKeyRequired, char[] keyPasswordArray) throws IOException, GeneralSecurityException {
34+
String keyStoreType = StringUtils.isEmpty(this.type) ? KeyStore.getDefaultType() : this.type;
35+
KeyStore keyStore = KeyStore.getInstance(keyStoreType);
36+
try (InputStream tsFileInputStream = ResourceUtils.getInputStream(this, this.storeFile)) {
37+
keyStore.load(tsFileInputStream, StringUtils.isEmpty(this.storePassword) ? new char[0] : this.storePassword.toCharArray());
38+
}
39+
return keyStore;
40+
}
41+
42+
@Override
43+
protected void updateKeyAlias(String keyAlias) {
44+
this.keyAlias = keyAlias;
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import io.netty.handler.ssl.SslHandler;
4+
import iot.technology.mqtt.server.utils.EncryptionUtil;
5+
import iot.technology.mqtt.server.utils.SslUtil;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.beans.factory.annotation.Qualifier;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
11+
import org.springframework.boot.context.properties.ConfigurationProperties;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.stereotype.Component;
14+
import org.springframework.util.StringUtils;
15+
16+
import javax.net.ssl.*;
17+
import java.security.cert.CertificateException;
18+
import java.security.cert.X509Certificate;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
22+
/**
23+
* @author mushuwei
24+
*/
25+
@Slf4j
26+
@Component("mqttSslHandlerProvider")
27+
@ConditionalOnProperty(prefix = "mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false)
28+
public class MqttSslHandlerProvider {
29+
30+
@Value("${mqtt.ssl.protocol}")
31+
private String sslProtocol;
32+
33+
@Bean
34+
@ConfigurationProperties(prefix = "mqtt.ssl.credentials")
35+
public SslCredentialsConfig mqttSslCredentials() {
36+
return new SslCredentialsConfig("MQTT SSL Credentials", false);
37+
}
38+
39+
@Autowired
40+
@Qualifier("mqttSslCredentials")
41+
private SslCredentialsConfig mqttSslCredentialsConfig;
42+
43+
private SSLContext sslContext;
44+
45+
public SslHandler getSslHandler() {
46+
if (sslContext == null) {
47+
sslContext = createSslContext();
48+
}
49+
SSLEngine sslEngine = sslContext.createSSLEngine();
50+
sslEngine.setUseClientMode(false);
51+
sslEngine.setNeedClientAuth(false);
52+
sslEngine.setWantClientAuth(true);
53+
sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
54+
sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
55+
sslEngine.setEnableSessionCreation(true);
56+
return new SslHandler(sslEngine);
57+
}
58+
59+
private SSLContext createSslContext() {
60+
try {
61+
SslCredentials sslCredentials = this.mqttSslCredentialsConfig.getCredentials();
62+
TrustManagerFactory tmFactory = sslCredentials.createTrustManagerFactory();
63+
KeyManagerFactory kmf = sslCredentials.createKeyManagerFactory();
64+
65+
KeyManager[] km = kmf.getKeyManagers();
66+
TrustManager x509wrapped = getX509TrustManager(tmFactory);
67+
TrustManager[] tm = {x509wrapped};
68+
if (StringUtils.isEmpty(sslProtocol)) {
69+
sslProtocol = "TLS";
70+
}
71+
SSLContext sslContext = SSLContext.getInstance(sslProtocol);
72+
sslContext.init(km, tm, null);
73+
return sslContext;
74+
} catch (Exception e) {
75+
log.error("Unable to set up SSL context. Reason: " + e.getMessage(), e);
76+
throw new RuntimeException("Failed to get SSL context", e);
77+
}
78+
}
79+
80+
private TrustManager getX509TrustManager(TrustManagerFactory tmf) {
81+
X509TrustManager x509Tm = null;
82+
for (TrustManager tm : tmf.getTrustManagers()) {
83+
if (tm instanceof X509TrustManager) {
84+
x509Tm = (X509TrustManager) tm;
85+
break;
86+
}
87+
}
88+
return new MqttX509TrustManager(x509Tm);
89+
}
90+
91+
static class MqttX509TrustManager implements X509TrustManager {
92+
93+
private final X509TrustManager trustManager;
94+
95+
MqttX509TrustManager(X509TrustManager trustManager) {
96+
this.trustManager = trustManager;
97+
}
98+
99+
@Override
100+
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
101+
String credentialsBody = null;
102+
for (X509Certificate cert : chain) {
103+
try {
104+
String strCert = SslUtil.getCertificateString(cert);
105+
String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
106+
final String[] credentialsBodyHolder = new String[1];
107+
CountDownLatch latch = new CountDownLatch(1);
108+
//TODO 处理业务逻辑
109+
latch.await(10, TimeUnit.SECONDS);
110+
if (strCert.equals(credentialsBodyHolder[0])) {
111+
credentialsBody = credentialsBodyHolder[0];
112+
break;
113+
}
114+
} catch (InterruptedException e) {
115+
throw new RuntimeException(e);
116+
}
117+
}
118+
if (credentialsBody == null) {
119+
throw new CertificateException("Invalid Device Certificate");
120+
}
121+
}
122+
123+
@Override
124+
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
125+
trustManager.checkServerTrusted(chain, authType);
126+
}
127+
128+
@Override
129+
public X509Certificate[] getAcceptedIssuers() {
130+
return trustManager.getAcceptedIssuers();
131+
}
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import iot.technology.mqtt.server.utils.ResourceUtils;
4+
import lombok.Data;
5+
import lombok.EqualsAndHashCode;
6+
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
7+
import org.bouncycastle.cert.X509CertificateHolder;
8+
import org.bouncycastle.jce.provider.BouncyCastleProvider;
9+
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
10+
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
11+
import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
12+
import org.bouncycastle.openssl.PEMParser;
13+
import org.bouncycastle.openssl.PEMKeyPair;
14+
import org.bouncycastle.openssl.PEMDecryptorProvider;
15+
import org.bouncycastle.openssl.PEMEncryptedKeyPair;
16+
import org.springframework.util.StringUtils;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.InputStreamReader;
21+
import java.security.GeneralSecurityException;
22+
import java.security.KeyStore;
23+
import java.security.PrivateKey;
24+
import java.security.Security;
25+
import java.security.cert.CertPath;
26+
import java.security.cert.Certificate;
27+
import java.security.cert.CertificateFactory;
28+
import java.security.cert.X509Certificate;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.stream.Collectors;
32+
33+
@Data
34+
@EqualsAndHashCode(callSuper = false)
35+
public class PemSslCredentials extends AbstractSslCredentials {
36+
37+
public static final String DEFAULT_KEY_ALIAS = "server";
38+
39+
private String certFile;
40+
private String keyFile;
41+
private String keyPassword;
42+
43+
@Override
44+
protected boolean canUse() {
45+
return ResourceUtils.resourceExists(this, this.certFile);
46+
}
47+
48+
@Override
49+
protected KeyStore loadKeyStore(boolean trustsOnly, char[] keyPasswordArray) throws IOException, GeneralSecurityException {
50+
if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
51+
Security.addProvider(new BouncyCastleProvider());
52+
}
53+
List<X509Certificate> certificates = new ArrayList<>();
54+
PrivateKey privateKey = null;
55+
JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter();
56+
JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter();
57+
try (InputStream inStream = ResourceUtils.getInputStream(this, this.certFile)) {
58+
try (PEMParser pemParser = new PEMParser(new InputStreamReader(inStream))) {
59+
Object object;
60+
while ((object = pemParser.readObject()) != null) {
61+
if (object instanceof X509CertificateHolder) {
62+
X509Certificate x509Cert = certConverter.getCertificate((X509CertificateHolder) object);
63+
certificates.add(x509Cert);
64+
} else if (object instanceof PEMEncryptedKeyPair) {
65+
PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(keyPasswordArray);
66+
privateKey = keyConverter.getKeyPair(((PEMEncryptedKeyPair) object).decryptKeyPair(decProv)).getPrivate();
67+
} else if (object instanceof PEMKeyPair) {
68+
privateKey = keyConverter.getKeyPair((PEMKeyPair) object).getPrivate();
69+
} else if (object instanceof PrivateKeyInfo) {
70+
privateKey = keyConverter.getPrivateKey((PrivateKeyInfo) object);
71+
}
72+
}
73+
}
74+
}
75+
if (privateKey == null && !StringUtils.isEmpty(this.keyFile)) {
76+
if (ResourceUtils.resourceExists(this, this.keyFile)) {
77+
try (InputStream inStream = ResourceUtils.getInputStream(this, this.keyFile)) {
78+
try (PEMParser pemParser = new PEMParser(new InputStreamReader(inStream))) {
79+
Object object;
80+
while ((object = pemParser.readObject()) != null) {
81+
if (object instanceof PEMEncryptedKeyPair) {
82+
PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(keyPasswordArray);
83+
privateKey = keyConverter.getKeyPair(((PEMEncryptedKeyPair) object).decryptKeyPair(decProv)).getPrivate();
84+
break;
85+
} else if (object instanceof PEMKeyPair) {
86+
privateKey = keyConverter.getKeyPair((PEMKeyPair) object).getPrivate();
87+
break;
88+
} else if (object instanceof PrivateKeyInfo) {
89+
privateKey = keyConverter.getPrivateKey((PrivateKeyInfo) object);
90+
}
91+
}
92+
}
93+
}
94+
}
95+
}
96+
if (certificates.isEmpty()) {
97+
throw new IllegalArgumentException("No certificates found in certFile: " + this.certFile);
98+
}
99+
if (privateKey == null && !trustsOnly) {
100+
throw new IllegalArgumentException("Unable to load private key neither from certFile: " + this.certFile
101+
+ " nor form keyFile: " + this.keyFile);
102+
}
103+
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
104+
keyStore.load(null);
105+
if (trustsOnly) {
106+
List<Certificate> unique = certificates.stream().distinct().collect(Collectors.toList());
107+
for (int i = 0; i < unique.size(); i++) {
108+
keyStore.setCertificateEntry("root-" + i, unique.get(i));
109+
}
110+
}
111+
if (privateKey != null) {
112+
CertificateFactory factory = CertificateFactory.getInstance("X.509");
113+
CertPath certPath = factory.generateCertPath(certificates);
114+
List<? extends Certificate> path = certPath.getCertificates();
115+
Certificate[] x509Certificates = path.toArray(new Certificate[0]);
116+
keyStore.setKeyEntry(DEFAULT_KEY_ALIAS, privateKey, keyPasswordArray, x509Certificates);
117+
}
118+
return keyStore;
119+
}
120+
121+
@Override
122+
protected void updateKeyAlias(String keyAlias) {
123+
}
124+
125+
@Override
126+
public String getKeyAlias() {
127+
return DEFAULT_KEY_ALIAS;
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import javax.net.ssl.KeyManagerFactory;
4+
import javax.net.ssl.TrustManagerFactory;
5+
import java.io.IOException;
6+
import java.security.*;
7+
import java.security.cert.X509Certificate;
8+
9+
public interface SslCredentials {
10+
11+
void init(boolean trustsOnly) throws IOException, GeneralSecurityException;
12+
13+
KeyStore getKeyStore();
14+
15+
String getKeyPassword();
16+
17+
String getKeyAlias();
18+
19+
PrivateKey getPrivateKey();
20+
21+
PublicKey getPublicKey();
22+
23+
X509Certificate[] getCertificateChain();
24+
25+
X509Certificate[] getTrustedCertificates();
26+
27+
TrustManagerFactory createTrustManagerFactory() throws NoSuchAlgorithmException, KeyStoreException;
28+
29+
KeyManagerFactory createKeyManagerFactory() throws NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException;
30+
31+
String getValueFromSubjectNameByKey(String subjectName, String key);
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
import lombok.Data;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
import javax.annotation.PostConstruct;
7+
8+
@Slf4j
9+
@Data
10+
public class SslCredentialsConfig {
11+
12+
private boolean enabled = true;
13+
14+
private SslCredentialsType type;
15+
16+
private PemSslCredentials pem;
17+
18+
private KeystoreSslCredentials keystore;
19+
20+
private SslCredentials credentials;
21+
22+
private final String name;
23+
24+
private final boolean trustsOnly;
25+
26+
public SslCredentialsConfig(String name, boolean trustsOnly) {
27+
this.name = name;
28+
this.trustsOnly = trustsOnly;
29+
}
30+
31+
@PostConstruct
32+
public void init() {
33+
if (this.enabled) {
34+
log.info("{}: Initializing SSL credentials.", name);
35+
if (SslCredentialsType.PEM.equals(type) && pem.canUse()) {
36+
this.credentials = this.pem;
37+
} else if (keystore.canUse()) {
38+
if (SslCredentialsType.PEM.equals(type)) {
39+
log.warn("{}: Specified PEM configuration is not valid. Using SSL keystore configuration as fallback.", name);
40+
}
41+
this.credentials = this.keystore;
42+
} else {
43+
throw new RuntimeException(name + ": Invalid SSL credentials configuration. None of the PEM or KEYSTORE configurations can be used!");
44+
}
45+
try {
46+
this.credentials.init(this.trustsOnly);
47+
} catch (Exception e) {
48+
throw new RuntimeException(name + ": Failed to init SSL credentials configuration.", e);
49+
}
50+
} else {
51+
log.info("{}: Skipping initialization of disabled SSL credentials.", name);
52+
}
53+
}
54+
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package iot.technology.mqtt.server.ssl;
2+
3+
public enum SslCredentialsType {
4+
5+
PEM,
6+
7+
KEYSTORE
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package iot.technology.mqtt.server.utils;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.bouncycastle.crypto.digests.SHA3Digest;
5+
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
6+
7+
@Slf4j
8+
public class EncryptionUtil {
9+
10+
private EncryptionUtil() {
11+
12+
}
13+
14+
public static String certTrimNewLines(String input) {
15+
return input.replaceAll("-----BEGIN CERTIFICATE-----", "")
16+
.replaceAll("\n", "")
17+
.replaceAll("\r", "")
18+
.replaceAll("-----END CERTIFICATE-----", "");
19+
}
20+
21+
public static String pubkTrimNewLines(String input) {
22+
return input.replaceAll("-----BEGIN PUBLIC KEY-----", "")
23+
.replaceAll("\n", "")
24+
.replaceAll("\r", "")
25+
.replaceAll("-----END PUBLIC KEY-----", "");
26+
}
27+
28+
public static String prikTrimNewLines(String input) {
29+
return input.replaceAll("-----BEGIN EC PRIVATE KEY-----", "")
30+
.replaceAll("\n", "")
31+
.replaceAll("\r", "")
32+
.replaceAll("-----END EC PRIVATE KEY-----", "");
33+
}
34+
35+
public static String getSha3Hash(String data) {
36+
String trimmedData = certTrimNewLines(data);
37+
byte[] dataBytes = trimmedData.getBytes();
38+
SHA3Digest md = new SHA3Digest(256);
39+
md.reset();
40+
md.update(dataBytes, 0 , dataBytes.length);
41+
byte[] hashedBytes = new byte[256 / 8];
42+
md.doFinal(hashedBytes, 0);
43+
String sha3Hash = ByteUtils.toHexString(hashedBytes);
44+
return sha3Hash;
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package iot.technology.mqtt.server.utils;
2+
3+
4+
import com.google.common.io.Resources;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.io.File;
8+
import java.io.FileInputStream;
9+
import java.io.InputStream;
10+
import java.net.URI;
11+
import java.net.URL;
12+
13+
@Slf4j
14+
public class ResourceUtils {
15+
16+
public static final String CLASSPATH_URL_PREFIX = "classpath:";
17+
18+
public static boolean resourceExists(Object classLoaderSource, String filePath) {
19+
return resourceExists(classLoaderSource.getClass().getClassLoader(), filePath);
20+
}
21+
22+
public static boolean resourceExists(ClassLoader classLoader, String filePath) {
23+
boolean classPathResource = false;
24+
String path = filePath;
25+
if (path.startsWith(CLASSPATH_URL_PREFIX)) {
26+
path = path.substring(CLASSPATH_URL_PREFIX.length());
27+
classPathResource = true;
28+
}
29+
if (!classPathResource) {
30+
File resourceFile = new File(path);
31+
if (resourceFile.exists()) {
32+
return true;
33+
}
34+
}
35+
InputStream classPathStream = classLoader.getResourceAsStream(path);
36+
if (classPathStream != null) {
37+
return true;
38+
} else {
39+
try {
40+
URL url = Resources.getResource(path);
41+
if (url != null) {
42+
return true;
43+
}
44+
} catch (IllegalArgumentException e) {}
45+
}
46+
return false;
47+
}
48+
49+
public static InputStream getInputStream(Object classLoaderSource, String filePath) {
50+
return getInputStream(classLoaderSource.getClass().getClassLoader(), filePath);
51+
}
52+
53+
public static InputStream getInputStream(ClassLoader classLoader, String filePath) {
54+
boolean classPathResource = false;
55+
String path = filePath;
56+
if (path.startsWith(CLASSPATH_URL_PREFIX)) {
57+
path = path.substring(CLASSPATH_URL_PREFIX.length());
58+
classPathResource = true;
59+
}
60+
try {
61+
if (!classPathResource) {
62+
File resourceFile = new File(path);
63+
if (resourceFile.exists()) {
64+
log.info("Reading resource data from file {}", filePath);
65+
return new FileInputStream(resourceFile);
66+
}
67+
}
68+
InputStream classPathStream = classLoader.getResourceAsStream(path);
69+
if (classPathStream != null) {
70+
log.info("Reading resource data from class path {}", filePath);
71+
return classPathStream;
72+
} else {
73+
URL url = Resources.getResource(path);
74+
if (url != null) {
75+
URI uri = url.toURI();
76+
log.info("Reading resource data from URI {}", filePath);
77+
return new FileInputStream(new File(uri));
78+
}
79+
}
80+
} catch (Exception e) {
81+
if (e instanceof NullPointerException) {
82+
log.warn("Unable to find resource: " + filePath);
83+
} else {
84+
log.warn("Unable to find resource: " + filePath, e);
85+
}
86+
}
87+
throw new RuntimeException("Unable to find resource: " + filePath);
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package iot.technology.mqtt.server.utils;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.util.Base64Utils;
5+
6+
import java.security.cert.Certificate;
7+
import java.security.cert.CertificateEncodingException;
8+
9+
@Slf4j
10+
public class SslUtil {
11+
12+
private SslUtil() {
13+
14+
}
15+
16+
public static String getCertificateString(Certificate cert)
17+
throws CertificateEncodingException {
18+
return EncryptionUtil.certTrimNewLines(Base64Utils.encodeToString(cert.getEncoded()));
19+
20+
}
21+
}

‎server/src/main/resources/application.yml

+37
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,51 @@ server:
22
port: 8088
33

44
mqtt:
5+
# Enable/disable mqtt transport protocol.
56
enabled: true
67
bind_address: 0.0.0.0
78
bind_port: 1883
9+
timeout: 10000
810
netty:
911
leak_detector_level: DISABLED
1012
boss_group_thread_count: 1
1113
worker_group_thread_count: 8
1214
max_payload_size: 65536
15+
so_keep_alive: false
16+
# MQTT SSL configuration
17+
ssl:
18+
# Enable/disable SSL support
19+
enabled: false
20+
# MQTT SSL bind address
21+
bind_address: 0.0.0.0
22+
# MQTT SSL bind port
23+
bind_port: 8883
24+
# SSL protocol: See https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms
25+
protocol: TLSv1.2
26+
# Server SSL credentials
27+
credentials:
28+
# Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore)
29+
type: PEM
30+
# PEM server credentials
31+
pem:
32+
# Path to the server certificate file (holds server certificate or certificate chain, may include server private key)
33+
cert_file: mqttsrver.pem
34+
# Path to the server certificate private key file. Optional by default. Required if the private key is not present in server certificate file;
35+
key_file: mqttserver_key.pem
36+
# Server certificate private key password (optional)
37+
key_password: server_key_password
38+
# Keystore server credentials
39+
keystore:
40+
# Type of the key store (JKS or PKCS12)
41+
type: JKS
42+
# Path to the key store that holds the SSL certificate
43+
store_file: mqttserver.jks
44+
# Password used to access the key store
45+
store_password: server_ks_password
46+
# Optional alias of the private key; If not set, the platform will load the first private key from the keystore;
47+
key_alias:
48+
# Optional password to access the private key, If not set, the platform will attempt to load the private keys that are not protected with the password;
49+
key_password: server_key_password
1350

1451

1552
spring:

‎server/src/test/java/iot/technology/mqtt/server/MQTTRedissonApplicationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package iot.technology.mqtt.server;
22

33
import iot.technology.mqtt.server.domain.Book;
4-
import iot.technology.mqtt.storage.session.cache.CacheManager;
4+
import iot.technology.mqtt.storage.cache.CacheManager;
55
import lombok.extern.slf4j.Slf4j;
66
import org.junit.Test;
77
import org.junit.runner.RunWith;
Binary file not shown.

‎storage/pom.xml

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
<?xml version="1.0" encoding="UTF-8"?>
1+
<!--
2+
Copyright © 2021 IOT Technical Guide - The MQTT broker Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
-->
214
<project xmlns="http://maven.apache.org/POM/4.0.0"
315
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
416
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package iot.technology.mqtt.storage.cache;
2+
3+
/**
4+
* @author mushuwei
5+
*/
6+
public interface CacheConst {
7+
8+
public static final String SUBSRIBE_PRE = "itmb_subscribe_";
9+
public static final String CLIENT_PRE = "itmb_client_";
10+
11+
}

‎storage/src/main/java/iot/technology/mqtt/storage/session/cache/CacheManager.java ‎storage/src/main/java/iot/technology/mqtt/storage/cache/CacheManager.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package iot.technology.mqtt.storage.session.cache;
1+
package iot.technology.mqtt.storage.cache;
22

33
import java.util.List;
44
import java.util.Map;
@@ -85,6 +85,25 @@ public interface CacheManager {
8585
* @return 字典键值集合
8686
*/
8787
Map<String, Object> getAllHashCache(String key);
88+
89+
/**
90+
* 检查某字典是否包含该字典键
91+
*
92+
* @param key 键
93+
* @param mapKey 字典键
94+
* @return
95+
*/
96+
Boolean containHashKey(String key, String mapKey);
97+
98+
99+
/**
100+
* 删除某字典某字典键
101+
*
102+
* @param key 键
103+
* @param mapKey 字典键
104+
* @return
105+
*/
106+
Boolean removeHashKey(String key, String mapKey);
88107
//******hash(字典) 操作 end
89108

90109

@@ -117,6 +136,9 @@ public interface CacheManager {
117136
*/
118137
Boolean existsSetCache(String key, Object value);
119138

139+
140+
Boolean removeSetCache(String key, Object value);
141+
120142
/**
121143
* 批量获取set列表
122144
*

‎storage/src/main/java/iot/technology/mqtt/storage/session/cache/RedissonClientService.java ‎storage/src/main/java/iot/technology/mqtt/storage/cache/RedissonClientService.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package iot.technology.mqtt.storage.session.cache;
1+
package iot.technology.mqtt.storage.cache;
22

33
import org.redisson.api.*;
44
import org.springframework.stereotype.Service;
@@ -76,6 +76,19 @@ public Map<String, Object> getAllHashCache(String key) {
7676
return hashMap;
7777
}
7878

79+
@Override
80+
public Boolean containHashKey(String key, String mapKey) {
81+
RMap<String, Object> cache = redissonClient.getMap(key);
82+
return cache.containsKey(mapKey);
83+
}
84+
85+
@Override
86+
public Boolean removeHashKey(String key, String mapKey) {
87+
RMap<String, Object> cache = redissonClient.getMap(key);
88+
cache.remove(mapKey);
89+
return Boolean.TRUE;
90+
}
91+
7992
@Override
8093
public Boolean addSetCache(String key, Object value) {
8194
RSet<Object> cache = redissonClient.getSet(key);
@@ -95,6 +108,12 @@ public Boolean existsSetCache(String key, Object value) {
95108
return cache.contains(value);
96109
}
97110

111+
@Override
112+
public Boolean removeSetCache(String key, Object value) {
113+
RSet<Object> cache = redissonClient.getSet(key);
114+
return cache.remove(value);
115+
}
116+
98117
@Override
99118
public Set<Object> getAllSetCache(String key) {
100119
RSet<Object> cache = redissonClient.getSet(key);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package iot.technology.mqtt.storage.manager;
2+
3+
import iot.technology.mqtt.storage.cache.CacheManager;
4+
import iot.technology.mqtt.storage.subscribe.domain.SubscribeStore;
5+
import org.springframework.stereotype.Service;
6+
7+
import javax.annotation.Resource;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Set;
11+
12+
import static iot.technology.mqtt.storage.cache.CacheConst.CLIENT_PRE;
13+
import static iot.technology.mqtt.storage.cache.CacheConst.SUBSRIBE_PRE;
14+
15+
/**
16+
* @author mushuwei
17+
*/
18+
@Service("subscribeManager")
19+
public class SubscribeCacheManager {
20+
21+
@Resource(name = "redissonClientService")
22+
private CacheManager cacheManager;
23+
24+
public SubscribeStore put(String topic, String clientId, SubscribeStore subscribeStore) {
25+
cacheManager.putHashCache(SUBSRIBE_PRE + topic, clientId, subscribeStore);
26+
cacheManager.addSetCache(CLIENT_PRE + clientId, topic);
27+
return subscribeStore;
28+
}
29+
30+
public SubscribeStore get(String topic, String clientId) {
31+
return (SubscribeStore) cacheManager.getHashCache(SUBSRIBE_PRE + topic, clientId);
32+
}
33+
34+
public Boolean topicContainClient(String topic, String clientId) {
35+
return cacheManager.containHashKey(SUBSRIBE_PRE + topic, clientId);
36+
}
37+
38+
public Boolean removeClientOfTopic(String topic, String clientId) {
39+
cacheManager.removeSetCache(CLIENT_PRE + clientId, topic);
40+
return cacheManager.removeHashKey(SUBSRIBE_PRE + topic, clientId);
41+
}
42+
43+
public Boolean removeClient(String clientId) {
44+
Set<Object> topicLists = cacheManager.getAllSetCache(CLIENT_PRE + clientId);
45+
topicLists.forEach(topic -> {
46+
cacheManager.removeHashKey(SUBSRIBE_PRE + topic, CLIENT_PRE + clientId);
47+
});
48+
cacheManager.deleteStringCache(clientId);
49+
return Boolean.TRUE;
50+
}
51+
52+
public Map<String, SubscribeStore> getSubscribeStoreMapByTopic(String topic) {
53+
Map<String, SubscribeStore> resultMap = new HashMap<>();
54+
Map<String, Object> cacheMap = cacheManager.getAllHashCache(SUBSRIBE_PRE + topic);
55+
for (Map.Entry<String, Object> entry : cacheMap.entrySet()) {
56+
resultMap.put(entry.getKey(), (SubscribeStore) entry.getValue());
57+
}
58+
return resultMap;
59+
}
60+
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package iot.technology.mqtt.storage.subscribe.domain;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import lombok.experimental.Accessors;
6+
7+
import java.io.Serializable;
8+
9+
/**
10+
* @author mushuwei
11+
*/
12+
@Getter
13+
@Setter
14+
@Accessors(chain = true)
15+
public class SubscribeStore implements Serializable {
16+
17+
private static final long serialVersionUID = 1276156087085594264L;
18+
19+
private String clientId;
20+
21+
private String topicName;
22+
23+
private int mqttQoS;
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package iot.technology.mqtt.storage.subscribe.service;
2+
3+
import iot.technology.mqtt.storage.subscribe.domain.SubscribeStore;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* @author mushuwei
9+
* @description 订阅存储服务接口
10+
*/
11+
public interface SubscribeStoreService {
12+
13+
/**
14+
* 存储订阅
15+
*
16+
* @param topicName 主题名
17+
* @param subscribeStore 订阅元数据
18+
*/
19+
void put(String topicName, SubscribeStore subscribeStore);
20+
21+
22+
/**
23+
* 删除某主题下对应的客户端订阅
24+
*
25+
* @param topicName 主题名
26+
* @param clientId 客户端编号
27+
*/
28+
void remove(String topicName, String clientId);
29+
30+
/**
31+
* 删除某客户端编号对应的所有topic订阅
32+
*
33+
* @param clientId
34+
*/
35+
void removeTopicByClientId(String clientId);
36+
37+
/**
38+
* 获取某topic下的订阅集合
39+
*
40+
* @param topicName 主题名
41+
* @return 订阅集合
42+
*/
43+
Map<String, SubscribeStore> search(String topicName);
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package iot.technology.mqtt.storage.subscribe.service.impl;
2+
3+
import iot.technology.mqtt.storage.manager.SubscribeCacheManager;
4+
import iot.technology.mqtt.storage.subscribe.domain.SubscribeStore;
5+
import iot.technology.mqtt.storage.subscribe.service.SubscribeStoreService;
6+
import org.springframework.stereotype.Service;
7+
8+
import javax.annotation.Resource;
9+
import java.util.Map;
10+
11+
/**
12+
* @author mushuwei
13+
*/
14+
@Service("subscribeStoreService")
15+
public class SubscribeStoreServiceImpl implements SubscribeStoreService {
16+
17+
@Resource
18+
private SubscribeCacheManager cacheManager;
19+
20+
@Override
21+
public void put(String topicName, SubscribeStore subscribeStore) {
22+
cacheManager.put(topicName, subscribeStore.getClientId(), subscribeStore);
23+
}
24+
25+
@Override
26+
public void remove(String topicName, String clientId) {
27+
cacheManager.removeClientOfTopic(topicName, clientId);
28+
}
29+
30+
@Override
31+
public void removeTopicByClientId(String clientId) {
32+
cacheManager.removeClient(clientId);
33+
}
34+
35+
@Override
36+
public Map<String, SubscribeStore> search(String topicName) {
37+
return cacheManager.getSubscribeStoreMapByTopic(topicName);
38+
}
39+
}

0 commit comments

Comments
 (0)
Please sign in to comment.