diff --git a/mqtt/stream.go b/mqtt/stream.go index 27aea25a..738fd819 100644 --- a/mqtt/stream.go +++ b/mqtt/stream.go @@ -47,17 +47,6 @@ func (c *Client) connect(obs Observer) (s *stream, err error) { return nil, errors.Trace(err) } - if len(c.ops.Subscriptions) != 0 { - subscribe := NewSubscribe() - subscribe.ID = subscribeId - subscribe.Subscriptions = c.ops.Subscriptions - err = conn.Send(subscribe, false) - if err != nil { - conn.Close() - return nil, errors.Trace(err) - } - } - s = &stream{ cli: c, observer: obs, @@ -76,6 +65,14 @@ func (c *Client) connect(obs Observer) (s *stream, err error) { return nil, errors.Trace(err) } if len(c.ops.Subscriptions) != 0 { + subscribe := NewSubscribe() + subscribe.ID = subscribeId + subscribe.Subscriptions = c.ops.Subscriptions + err = conn.Send(subscribe, false) + if err != nil { + conn.Close() + return nil, errors.Trace(err) + } err = s.subscribeFuture.Wait(c.ops.Timeout) if err != nil { s.die("subscribe timeout", err)