Skip to content

Commit 83db7ff

Browse files
thedevopmochi-co
andauthored
Buffer optimizations (#355)
* Avoid creating buffer if pkt larger than ClientNetWriteBufferSize * Use mempool for Properties Encode * Use the more efficient Write instead of Write for Buffer to Buffer write --------- Co-authored-by: JB <[email protected]>
1 parent 10a02ab commit 83db7ff

File tree

3 files changed

+28
-20
lines changed

3 files changed

+28
-20
lines changed

clients.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
582582
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
583583
}
584584

585-
n, err := func() (n int64, err error) {
585+
n, err := func() (int64, error) {
586586
cl.Lock()
587587
defer cl.Unlock()
588588
if len(cl.State.outbound) == 0 {
@@ -591,23 +591,26 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
591591
}
592592

593593
// first write to buffer, then flush buffer
594-
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
594+
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
595595
err = cl.flushOutbuf()
596-
return
596+
return int64(n), err
597597
}
598598

599599
// there are more writes in the queue
600600
if cl.Net.outbuf == nil {
601+
if buf.Len() >= cl.ops.options.ClientNetWriteBufferSize {
602+
return buf.WriteTo(cl.Net.Conn)
603+
}
601604
cl.Net.outbuf = new(bytes.Buffer)
602605
}
603606

604-
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
607+
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
605608
if cl.Net.outbuf.Len() < cl.ops.options.ClientNetWriteBufferSize {
606-
return
609+
return int64(n), nil
607610
}
608611

609612
err = cl.flushOutbuf()
610-
return
613+
return int64(n), err
611614
}()
612615
if err != nil {
613616
return err

packets/packets.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (pk *Packet) ConnectEncode(buf *bytes.Buffer) error {
348348

349349
pk.FixedHeader.Remaining = nb.Len()
350350
pk.FixedHeader.Encode(buf)
351-
_, _ = nb.WriteTo(buf)
351+
buf.Write(nb.Bytes())
352352

353353
return nil
354354
}
@@ -512,7 +512,8 @@ func (pk *Packet) ConnackEncode(buf *bytes.Buffer) error {
512512

513513
pk.FixedHeader.Remaining = nb.Len()
514514
pk.FixedHeader.Encode(buf)
515-
_, _ = nb.WriteTo(buf)
515+
buf.Write(nb.Bytes())
516+
516517
return nil
517518
}
518519

@@ -557,7 +558,7 @@ func (pk *Packet) DisconnectEncode(buf *bytes.Buffer) error {
557558

558559
pk.FixedHeader.Remaining = nb.Len()
559560
pk.FixedHeader.Encode(buf)
560-
_, _ = nb.WriteTo(buf)
561+
buf.Write(nb.Bytes())
561562

562563
return nil
563564
}
@@ -628,7 +629,7 @@ func (pk *Packet) PublishEncode(buf *bytes.Buffer) error {
628629

629630
pk.FixedHeader.Remaining = nb.Len() + len(pk.Payload)
630631
pk.FixedHeader.Encode(buf)
631-
_, _ = nb.WriteTo(buf)
632+
buf.Write(nb.Bytes())
632633
buf.Write(pk.Payload)
633634

634635
return nil
@@ -719,7 +720,7 @@ func (pk *Packet) encodePubAckRelRecComp(buf *bytes.Buffer) error {
719720

720721
pk.FixedHeader.Remaining = nb.Len()
721722
pk.FixedHeader.Encode(buf)
722-
_, _ = nb.WriteTo(buf)
723+
buf.Write(nb.Bytes())
723724
return nil
724725
}
725726

@@ -858,7 +859,7 @@ func (pk *Packet) SubackEncode(buf *bytes.Buffer) error {
858859

859860
pk.FixedHeader.Remaining = nb.Len()
860861
pk.FixedHeader.Encode(buf)
861-
_, _ = nb.WriteTo(buf)
862+
buf.Write(nb.Bytes())
862863

863864
return nil
864865
}
@@ -918,7 +919,7 @@ func (pk *Packet) SubscribeEncode(buf *bytes.Buffer) error {
918919

919920
pk.FixedHeader.Remaining = nb.Len()
920921
pk.FixedHeader.Encode(buf)
921-
_, _ = nb.WriteTo(buf)
922+
buf.Write(nb.Bytes())
922923

923924
return nil
924925
}
@@ -1014,7 +1015,7 @@ func (pk *Packet) UnsubackEncode(buf *bytes.Buffer) error {
10141015

10151016
pk.FixedHeader.Remaining = nb.Len()
10161017
pk.FixedHeader.Encode(buf)
1017-
_, _ = nb.WriteTo(buf)
1018+
buf.Write(nb.Bytes())
10181019

10191020
return nil
10201021
}
@@ -1070,7 +1071,7 @@ func (pk *Packet) UnsubscribeEncode(buf *bytes.Buffer) error {
10701071

10711072
pk.FixedHeader.Remaining = nb.Len()
10721073
pk.FixedHeader.Encode(buf)
1073-
_, _ = nb.WriteTo(buf)
1074+
buf.Write(nb.Bytes())
10741075

10751076
return nil
10761077
}
@@ -1132,7 +1133,7 @@ func (pk *Packet) AuthEncode(buf *bytes.Buffer) error {
11321133

11331134
pk.FixedHeader.Remaining = nb.Len()
11341135
pk.FixedHeader.Encode(buf)
1135-
_, _ = nb.WriteTo(buf)
1136+
buf.Write(nb.Bytes())
11361137
return nil
11371138
}
11381139

packets/properties.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"bytes"
99
"fmt"
1010
"strings"
11+
12+
"github.com/mochi-mqtt/server/v2/mempool"
1113
)
1214

1315
const (
@@ -199,7 +201,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
199201
return
200202
}
201203

202-
var buf bytes.Buffer
204+
buf := mempool.GetBuffer()
205+
defer mempool.PutBuffer(buf)
203206
if p.canEncode(pkt, PropPayloadFormat) && p.PayloadFormatFlag {
204207
buf.WriteByte(PropPayloadFormat)
205208
buf.WriteByte(p.PayloadFormat)
@@ -230,7 +233,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
230233
for _, v := range p.SubscriptionIdentifier {
231234
if v > 0 {
232235
buf.WriteByte(PropSubscriptionIdentifier)
233-
encodeLength(&buf, int64(v))
236+
encodeLength(buf, int64(v))
234237
}
235238
}
236239
}
@@ -321,7 +324,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
321324
}
322325

323326
if !mods.DisallowProblemInfo && p.canEncode(pkt, PropUser) {
324-
pb := bytes.NewBuffer([]byte{})
327+
pb := mempool.GetBuffer()
328+
defer mempool.PutBuffer(pb)
325329
for _, v := range p.User {
326330
pb.WriteByte(PropUser)
327331
pb.Write(encodeString(v.Key))
@@ -355,7 +359,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
355359
}
356360

357361
encodeLength(b, int64(buf.Len()))
358-
_, _ = buf.WriteTo(b) // [MQTT-3.1.3-10]
362+
b.Write(buf.Bytes()) // [MQTT-3.1.3-10]
359363
}
360364

361365
// Decode decodes property bytes into a properties struct.

0 commit comments

Comments
 (0)