Skip to content

Commit e9f7215

Browse files
authored
Allow configurable max stored qos > 0 messages (#359)
* Allow configurable max stored qos > 0 messages * Only rollback Inflight if QoS > 0 * Only rollback Inflight if QoS > 0 * Minor refactor
1 parent 69412dd commit e9f7215

File tree

4 files changed

+106
-14
lines changed

4 files changed

+106
-14
lines changed

clients.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
215215
cl.Properties.Clean = pk.Connect.Clean
216216
cl.Properties.Props = pk.Properties.Copy(false)
217217

218+
if cl.Properties.Props.ReceiveMaximum > cl.ops.options.Capabilities.MaximumInflight { // 3.3.4 Non-normative
219+
cl.Properties.Props.ReceiveMaximum = cl.ops.options.Capabilities.MaximumInflight
220+
}
221+
218222
if pk.Connect.Keepalive <= minimumKeepalive {
219223
cl.ops.log.Warn(
220224
ErrMinimumKeepalive.Error(),

clients_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func newTestClient() (cl *Client, r net.Conn, w net.Conn) {
3737
options: &Options{
3838
Capabilities: &Capabilities{
3939
ReceiveMaximum: 10,
40+
MaximumInflight: 5,
4041
TopicAliasMaximum: 10000,
4142
MaximumClientWritesPending: 3,
4243
maximumPacketID: 10,
@@ -183,6 +184,45 @@ func TestClientParseConnect(t *testing.T) {
183184
require.Equal(t, int32(pk.Properties.ReceiveMaximum), cl.State.Inflight.maximumSendQuota)
184185
}
185186

187+
func TestClientParseConnectReceiveMaxExceedMaxInflight(t *testing.T) {
188+
const MaxInflight uint16 = 1
189+
cl, _, _ := newTestClient()
190+
cl.ops.options.Capabilities.MaximumInflight = MaxInflight
191+
192+
pk := packets.Packet{
193+
ProtocolVersion: 4,
194+
Connect: packets.ConnectParams{
195+
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
196+
Clean: true,
197+
Keepalive: 60,
198+
ClientIdentifier: "mochi",
199+
WillFlag: true,
200+
WillTopic: "lwt",
201+
WillPayload: []byte("lol gg"),
202+
WillQos: 1,
203+
WillRetain: false,
204+
},
205+
Properties: packets.Properties{
206+
ReceiveMaximum: uint16(5),
207+
},
208+
}
209+
210+
cl.ParseConnect("tcp1", pk)
211+
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
212+
require.Equal(t, pk.Connect.Keepalive, cl.State.Keepalive)
213+
require.Equal(t, pk.Connect.Clean, cl.Properties.Clean)
214+
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
215+
require.Equal(t, pk.Connect.WillTopic, cl.Properties.Will.TopicName)
216+
require.Equal(t, pk.Connect.WillPayload, cl.Properties.Will.Payload)
217+
require.Equal(t, pk.Connect.WillQos, cl.Properties.Will.Qos)
218+
require.Equal(t, pk.Connect.WillRetain, cl.Properties.Will.Retain)
219+
require.Equal(t, uint32(1), cl.Properties.Will.Flag)
220+
require.Equal(t, int32(cl.ops.options.Capabilities.ReceiveMaximum), cl.State.Inflight.receiveQuota)
221+
require.Equal(t, int32(cl.ops.options.Capabilities.ReceiveMaximum), cl.State.Inflight.maximumReceiveQuota)
222+
require.Equal(t, int32(MaxInflight), cl.State.Inflight.sendQuota)
223+
require.Equal(t, int32(MaxInflight), cl.State.Inflight.maximumSendQuota)
224+
}
225+
186226
func TestClientParseConnectOverrideWillDelay(t *testing.T) {
187227
cl, _, _ := newTestClient()
188228

server.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,21 @@ var (
4343

4444
// Capabilities indicates the capabilities and features provided by the server.
4545
type Capabilities struct {
46-
MaximumMessageExpiryInterval int64
47-
MaximumClientWritesPending int32
48-
MaximumSessionExpiryInterval uint32
49-
MaximumPacketSize uint32
46+
MaximumMessageExpiryInterval int64 // maximum message expiry if message expiry is 0 or over
47+
MaximumClientWritesPending int32 // maximum number of pending message writes for a client
48+
MaximumSessionExpiryInterval uint32 // maximum number of seconds to keep disconnected sessions
49+
MaximumPacketSize uint32 // maximum packet size, no limit if 0
5050
maximumPacketID uint32 // unexported, used for testing only
51-
ReceiveMaximum uint16
52-
TopicAliasMaximum uint16
53-
SharedSubAvailable byte
54-
MinimumProtocolVersion byte
51+
ReceiveMaximum uint16 // maximum number of concurrent qos messages per client
52+
MaximumInflight uint16 // maximum number of qos > 0 messages can be stored, 0(=8192)-65535
53+
TopicAliasMaximum uint16 // maximum topic alias value
54+
SharedSubAvailable byte // support of shared subscriptions
55+
MinimumProtocolVersion byte // minimum supported mqtt version
5556
Compatibilities Compatibilities
56-
MaximumQos byte
57-
RetainAvailable byte
58-
WildcardSubAvailable byte
59-
SubIDAvailable byte
57+
MaximumQos byte // maximum qos value available to clients
58+
RetainAvailable byte // support of retain messages
59+
WildcardSubAvailable byte // support of wildcard subscriptions
60+
SubIDAvailable byte // support of subscription identifiers
6061
}
6162

6263
// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
@@ -68,6 +69,7 @@ func NewDefaultServerCapabilities() *Capabilities {
6869
MaximumPacketSize: 0, // no maximum packet size
6970
maximumPacketID: math.MaxUint16,
7071
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
72+
MaximumInflight: 1024 * 8, // maximum number of qos > 0 messages can be stored
7173
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
7274
SharedSubAvailable: 1, // shared subscriptions are available
7375
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
@@ -201,6 +203,10 @@ func (o *Options) ensureDefaults() {
201203

202204
o.Capabilities.maximumPacketID = math.MaxUint16 // spec maximum is 65535
203205

206+
if o.Capabilities.MaximumInflight == 0 {
207+
o.Capabilities.MaximumInflight = 1024 * 8
208+
}
209+
204210
if o.SysTopicResendInterval == 0 {
205211
o.SysTopicResendInterval = defaultSysTopicInterval
206212
}
@@ -975,9 +981,17 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
975981
}
976982

977983
if out.FixedHeader.Qos > 0 {
984+
if cl.State.Inflight.Len() >= int(s.Options.Capabilities.MaximumInflight) {
985+
// add hook?
986+
atomic.AddInt64(&s.Info.InflightDropped, 1)
987+
s.Log.Warn("client store quota reached", "client", cl.ID, "listener", cl.Net.Listener)
988+
return out, packets.ErrQuotaExceeded
989+
}
990+
978991
i, err := cl.NextPacketID() // [MQTT-4.3.2-1] [MQTT-4.3.3-1]
979992
if err != nil {
980993
s.hooks.OnPacketIDExhausted(cl, pk)
994+
atomic.AddInt64(&s.Info.InflightDropped, 1)
981995
s.Log.Warn("packet ids exhausted", "error", err, "client", cl.ID, "listener", cl.Net.Listener)
982996
return out, packets.ErrQuotaExceeded
983997
}
@@ -1008,8 +1022,10 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
10081022
default:
10091023
atomic.AddInt64(&s.Info.MessagesDropped, 1)
10101024
cl.ops.hooks.OnPublishDropped(cl, pk)
1011-
cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
1012-
cl.State.Inflight.IncreaseSendQuota()
1025+
if out.FixedHeader.Qos > 0 {
1026+
cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
1027+
cl.State.Inflight.IncreaseSendQuota()
1028+
}
10131029
return out, packets.ErrPendingClientWritesExceeded
10141030
}
10151031

server_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,6 +1907,7 @@ func TestPublishToClientSubscriptionDowngradeQos(t *testing.T) {
19071907
}
19081908

19091909
func TestPublishToClientExceedClientWritesPending(t *testing.T) {
1910+
var sendQuota uint16 = 5
19101911
s := newServer()
19111912

19121913
_, w := net.Pipe()
@@ -1917,9 +1918,12 @@ func TestPublishToClientExceedClientWritesPending(t *testing.T) {
19171918
options: &Options{
19181919
Capabilities: &Capabilities{
19191920
MaximumClientWritesPending: 3,
1921+
maximumPacketID: 10,
19201922
},
19211923
},
19221924
})
1925+
cl.Properties.Props.ReceiveMaximum = sendQuota
1926+
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum))
19231927

19241928
s.Clients.Add(cl)
19251929

@@ -1928,9 +1932,20 @@ func TestPublishToClientExceedClientWritesPending(t *testing.T) {
19281932
atomic.AddInt32(&cl.State.outboundQty, 1)
19291933
}
19301934

1935+
id, _ := cl.NextPacketID()
1936+
cl.State.Inflight.Set(packets.Packet{PacketID: uint16(id)})
1937+
cl.State.Inflight.DecreaseSendQuota()
1938+
sendQuota--
1939+
19311940
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 2}, packets.Packet{})
19321941
require.Error(t, err)
19331942
require.ErrorIs(t, packets.ErrPendingClientWritesExceeded, err)
1943+
require.Equal(t, int32(sendQuota), atomic.LoadInt32(&cl.State.Inflight.sendQuota))
1944+
1945+
_, err = s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 2}, packets.Packet{FixedHeader: packets.FixedHeader{Qos: 1}})
1946+
require.Error(t, err)
1947+
require.ErrorIs(t, packets.ErrPendingClientWritesExceeded, err)
1948+
require.Equal(t, int32(sendQuota), atomic.LoadInt32(&cl.State.Inflight.sendQuota))
19341949
}
19351950

19361951
func TestPublishToClientServerTopicAlias(t *testing.T) {
@@ -1986,6 +2001,22 @@ func TestPublishToClientMqtt5RetainAsPublishedTrueLeverageNoConn(t *testing.T) {
19862001
require.ErrorIs(t, err, packets.CodeDisconnect)
19872002
}
19882003

2004+
func TestPublishToClientExceedMaximumInflight(t *testing.T) {
2005+
const MaxInflight uint16 = 5
2006+
s := newServer()
2007+
cl, _, _ := newTestClient()
2008+
s.Options.Capabilities.MaximumInflight = MaxInflight
2009+
cl.ops.options.Capabilities.MaximumInflight = MaxInflight
2010+
for i := uint16(0); i < MaxInflight; i++ {
2011+
cl.State.Inflight.Set(packets.Packet{PacketID: i})
2012+
}
2013+
2014+
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
2015+
require.Error(t, err)
2016+
require.ErrorIs(t, err, packets.ErrQuotaExceeded)
2017+
require.Equal(t, int64(1), atomic.LoadInt64(&s.Info.InflightDropped))
2018+
}
2019+
19892020
func TestPublishToClientExhaustedPacketID(t *testing.T) {
19902021
s := newServer()
19912022
cl, _, _ := newTestClient()
@@ -1996,6 +2027,7 @@ func TestPublishToClientExhaustedPacketID(t *testing.T) {
19962027
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
19972028
require.Error(t, err)
19982029
require.ErrorIs(t, err, packets.ErrQuotaExceeded)
2030+
require.Equal(t, int64(1), atomic.LoadInt64(&s.Info.InflightDropped))
19992031
}
20002032

20012033
func TestPublishToClientACLNotAuthorized(t *testing.T) {

0 commit comments

Comments
 (0)