Skip to content

Commit b1555ec

Browse files
authored
Merge pull request hollie#861 from DaveNeudoerffer/MQTT_fixes
Mqtt fixes
2 parents 18a51d6 + 6ef06f3 commit b1555ec

File tree

4 files changed

+76
-79
lines changed

4 files changed

+76
-79
lines changed

lib/mqtt.pm

Lines changed: 73 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ eval "use bytes"; # Not on all installs, so eval to avoid errors
190190

191191
# Need to share this with the outside world
192192
my $msg_id = 1;
193+
my $blocking_read_timeout = .5;
193194

194195
my %MQTT_Data;
195196

@@ -201,15 +202,23 @@ sub dump() {
201202
}
202203

203204
sub log {
204-
my ($self, $str) = @_;
205-
&main::print_log( 'MQTT: '. $str );
205+
my ($self, $str, $prefix) = @_;
206+
my $maxlength = 300;
207+
208+
$prefix = $prefix || 'MQTT: ';
209+
while( length( $str ) > $maxlength ) {
210+
&main::print_log( $prefix . substr($str,0,$maxlength) );
211+
$str = substr( $str, $maxlength );
212+
$prefix = '.... ';
213+
}
214+
&main::print_log( $prefix . $str );
206215
}
207216

208217
sub debug {
209218
my( $self, $level, $str ) = @_;
210219
if( $main::Debug{mqtt} >= $level ) {
211220
$level = 'D' if $level == 0;
212-
&main::print_log( "MQTT D$level: $str" );
221+
$self->log( $str, "MQTT D$level: " );
213222
}
214223
}
215224

@@ -226,7 +235,7 @@ sub error {
226235
sub mqtt_connect() {
227236
my ($self) = @_;
228237

229-
$self->debug( 1, "mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ");
238+
$self->log( "mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) Topic ($$self{topic}) ");
230239

231240
### 1) open a socket (host, port and keepalive
232241
my $socket = IO::Socket::INET->new(
@@ -249,7 +258,8 @@ sub mqtt_connect() {
249258

250259
$self->{socket} = $socket;
251260
$self->{got_ping_response} = 1;
252-
$self->{next_ping} = $self->{keep_alive_timer};
261+
$self->{ping_missed_count} = 0;
262+
$self->{next_ping} = Time::HiRes::time + $$self{keep_alive_timer};
253263
$self->{buf} = '';
254264

255265
# --------------------------------------------------------------------------
@@ -267,7 +277,7 @@ sub mqtt_connect() {
267277
### 3) Check for ACK or fail
268278
$self->debug( 1, "Socket check ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) );
269279

270-
my $msg = $self->read_mqtt_msg_timeout();
280+
my $msg = $self->read_mqtt_msg( $blocking_read_timeout );
271281
if ( !$msg ) {
272282
$self->error("mqtt $$self{instance} No ConnAck ");
273283

@@ -287,9 +297,9 @@ sub mqtt_connect() {
287297
);
288298

289299
### 5) Check for ACK or fail
290-
$msg = $self->read_mqtt_msg_timeout();
300+
$msg = $self->read_mqtt_msg( $blocking_read_timeout );
291301
if( !$msg ) {
292-
$self->log( "$$self{instance} Received: " . "No SubAck" );
302+
$self->log( "$$self{instance} Received: " . "No subscription Ack" );
293303
}
294304
if ( $main::Debug{mqtt} ) {
295305
my $s =
@@ -299,11 +309,13 @@ sub mqtt_connect() {
299309
###
300310
### IF we're not getting $$msg{string} then what are we getting ?
301311
###
302-
$self->log( "$$self{instance} Sub 1 Received: " . "$s" ); # @FIXME: Use of uninitialized value
312+
$self->log( "$$self{instance} Subscription 1 ($$self{topic}) acknowledged: " . "$s" ); # @FIXME: Use of uninitialized value
303313
}
304314

305315
### 6) check for data
306316
$self->debug( 1, "$$self{instance} Initializing MQTT connection ...");
317+
318+
$self->set( 'on', $self );
307319
}
308320

309321
# ------------------------------------------------------------------------------
@@ -378,11 +390,11 @@ sub new {
378390

379391
### 5) Check for ACK or fail
380392
$self->{buf} = '';
381-
my $msg = $self->read_mqtt_msg();
393+
my $msg = $self->read_mqtt_msg( $blocking_read_timeout );
382394
if( !$msg ) {
383-
$self->log( "$$self{instance} Received: " . "No SubAck" );
395+
$self->log( "$inst Received: " . "No Subscription Ack" );
384396
} else {
385-
$self->debug( 1, "$inst Sub 2 Received: " . $msg->string );
397+
$self->log( 1, "$inst Subscription 2 ($topic) acknowledged: " . $msg->string );
386398
}
387399
}
388400

@@ -420,11 +432,12 @@ sub new {
420432

421433
$$self{next_ping} = 0;
422434
$$self{got_ping_response} = 1;
435+
$$self{ping_missed_count} = 0;
423436

424437
bless $self, $class;
425438

426439
# This is the little messages that appear when MH starts
427-
$self->log("Creating $instance on $host:$port $topic");
440+
$self->log("Creating $instance on $host:$port topic:$topic");
428441

429442
$self->set_states( "off", "on" );
430443

@@ -458,7 +471,7 @@ sub new {
458471
#exit 1;
459472
}
460473

461-
$self->set( 'on', $self );
474+
# $self->set( 'on', $self );
462475
return $self;
463476
}
464477

@@ -492,6 +505,7 @@ sub check_for_data {
492505
###
493506
### @FIXME: failed connection
494507
if ( 'off' ne $self->{state} ) {
508+
my $inst = $self->{instance};
495509

496510
if ($$self{recon_timer}->inactive) {
497511
$self->log("$inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds");
@@ -507,12 +521,12 @@ sub check_for_data {
507521
next;
508522
}
509523

510-
# This one doesn't block
511-
my $msg = $self->read_mqtt_msg();
512-
513524
### -[ Input ]----------------------------------------------------------
514525

515-
if ($msg) {
526+
# This one doesn't block
527+
my $msg;
528+
529+
while( $msg = $self->read_mqtt_msg( 0 ) ) {
516530
###
517531
### Okay this is the hard part
518532
### For now I'm only worried about data that fits into 1 read
@@ -576,16 +590,21 @@ sub check_for_data {
576590
###
577591
# Ping check
578592
if ( Time::HiRes::time > $$self{next_ping} ) {
579-
###
580-
### We've exceeded the ping time
581-
###
582-
$self->log("$inst read_mqtt_msg Ping Response timeout.")
583-
unless ( $$self{got_ping_response} );
584-
###
585-
### This has confused me, I'm not certain if I should put it back in or not
586-
### I'll need to sit down a put together a state table and review this
587-
###
588-
# return unless ($$self{got_ping_response});
593+
if( $self->{got_ping_response} ) {
594+
$self->{ping_missed_count} = 0;
595+
} else {
596+
###
597+
### We've exceeded the ping time
598+
###
599+
$self->{ping_missed_count} += 1;
600+
$self->log("$inst check_for_data Ping Response timeout.");
601+
if( $self->{ping_missed_count} >= 4 ) {
602+
$self->log("$inst check_for_data Ping Response threshold exceeded.");
603+
shutdown( $self->{socket}, 2 );
604+
$self->{socket} = undef;
605+
# check_for_data will reconnect socket
606+
}
607+
}
589608

590609
$self->debug( 2, "$inst read_mqtt_msg Ping Request" );
591610
send_mqtt_msg( $self, message_type => MQTT_PINGREQ );
@@ -620,10 +639,12 @@ sub send_mqtt_msg {
620639
=cut
621640

622641
sub read_mqtt_msg {
623-
my $self = shift;
642+
my ($self, $timeout) = @_;
624643

625644
my $select = IO::Select->new( $$self{socket} );
626-
my $timeout = $$self{next_ping} - Time::HiRes::time;
645+
if( !defined $timeout ) {
646+
$timeout = 0;
647+
}
627648

628649
do {
629650
###
@@ -638,15 +659,11 @@ sub read_mqtt_msg {
638659
return $mqtt;
639660
}
640661

641-
### very short wait
642-
### Return if there is no data
643-
$select->can_read(0.1) || return;
644-
645-
#
646-
$timeout = $$self{next_ping} - Time::HiRes::time;
662+
### Return if there is no data within the alloted time
663+
$select->can_read($timeout) || return;
647664

648665
# can return undef (error) or 0 bytes (eof)
649-
my $bytes = sysread $$self{socket}, $self->{buf}, 2048, length $self->{buf};
666+
my $bytes = sysread $self->{socket}, $self->{buf}, 2048, length $self->{buf};
650667

651668
# We get no bytes if there is an error or the socket has closed
652669
unless ($bytes) {
@@ -664,50 +681,11 @@ sub read_mqtt_msg {
664681

665682
return;
666683
}
667-
} while ( $timeout > 0 );
684+
} while ( 1 );
668685
}
669686

670687
# ------------------------------------------------------------------------------
671688

672-
=item C<read_mqtt_msg_timeout()>
673-
=cut
674-
675-
sub read_mqtt_msg_timeout {
676-
my $self = shift;
677-
678-
my $select = IO::Select->new( $$self{socket} );
679-
my $timeout = $$self{next_ping} - Time::HiRes::time;
680-
681-
do {
682-
my $mqtt = Net::MQTT::Message->new_from_bytes( $self->{buf}, 1 );
683-
684-
return $mqtt if ( defined $mqtt );
685-
686-
###
687-
### This is where it waits (blocking)
688-
###
689-
$select->can_read($timeout) || return;
690-
691-
#
692-
$timeout = $$self{next_ping} - Time::HiRes::time;
693-
694-
# can return undef (error) or 0 bytes (eof)
695-
my $bytes = sysread $$self{socket}, $self->{buf}, 2048, length $self->{buf};
696-
697-
# We get no bytes if there is an error or the socket has closed
698-
unless ($bytes) {
699-
$self->log( "$$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) );
700-
701-
# Not a permanent solution just a way to keep debugging
702-
$self->debug( 1, "deleting $$self{instance}\n" . Dumper( \$self ) );
703-
delete( $MQTT_Data{ $$self{instance} } );
704-
705-
return;
706-
}
707-
} while ( $timeout > 0 );
708-
}
709-
710-
# ------------------------------------------------------------------------------
711689

712690
=item C<set()>
713691
=cut
@@ -772,9 +750,25 @@ sub pub_msg {
772750
# First say something
773751
$self->error("$$self{instance} is not connected -- publish failed to $p_objects{topic}");
774752

775-
# Then do something (reconnect)
753+
# Check_for_data should initiate reconnect
754+
755+
# ###
756+
# ### This needs a lot of work
757+
# ###
758+
# ### @FIXME: failed connection
759+
# if ( 'off' ne $self->{state} ) {
760+
# my $inst = $self->{instance};
761+
#
762+
# if ($$self{recon_timer}->inactive) {
763+
# $self->log("$inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds");
764+
# $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
765+
# }
766+
#
767+
# # check the state to see if it's off already
768+
#
769+
# $self->set( 'off', $self );
770+
# }
776771

777-
# Skip if we're not connected
778772
return;
779773
}
780774
$self->debug( 1, "$$self{instance} Pub: R:$p_objects{retain} T:'$p_objects{topic}' M:'$p_objects{message}'" );

lib/mqtt_discovery.pm

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ sub new { #### mqtt_DiscoveredItem
104104
} elsif( $disc_type eq 'binary_sensor' ) {
105105
} elsif( $disc_type eq 'sensor' ) {
106106
} elsif( $disc_type eq 'switch' ) {
107+
} elsif( $disc_type eq 'multi_switch' ) {
107108
} else {
108109
$disc_obj->log( "UNRECOGNIZED DISCOVERY TYPE: $disc_type" );
109110
return;

lib/mqtt_items.pm

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,7 @@ sub new { ### mqtt_InstMqttItem
15281528
$self->{disc_info}->{command_topic} = "$topic_prefix/level";
15291529
$self->{disc_info}->{schema} = 'json';
15301530
$self->{disc_info}->{brightness} = "true";
1531+
$self->{disc_info}->{brightness_scale} = 100;
15311532
} elsif( $base_type eq 'binary_sensor' ) {
15321533
$self->{disc_info}->{device_class} = $device_class;
15331534
} elsif( $base_type eq 'sensor' ) {

lib/read_table_A.pl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,6 +1919,7 @@ sub read_table_A {
19191919
# e.g.MQTT_BROKER, mqtt_1
19201920
require 'mqtt.pm';
19211921
my ( $name, $topic, $host, $port, $username, $password, $keepalive ) = @item_info;
1922+
$topic =~ s/\*/#/g;
19221923
$code .= sprintf( "\n\$%-35s = new mqtt(\"%s\", '$host', '$port', '$topic', '$username', '$password', $keepalive );\n", $name, $name );
19231924
}
19241925
elsif ( $type eq "MQTT_DEVICE" ) {

0 commit comments

Comments
 (0)