Skip to content

Commit fa5c83f

Browse files
nvollmarmdedetrich
authored andcommitted
Add handling for tcp register timeout leaving connection dead
1 parent 46e60a6 commit fa5c83f

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala

+21-2
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import java.net.InetSocketAddress
1818
import scala.collection.immutable.Seq
1919

2020
import org.apache.pekko
21-
import pekko.actor.Props
21+
import pekko.actor.{ ActorKilledException, Kill, Props }
2222
import pekko.io.Tcp
2323
import pekko.io.Tcp.{ Connected, PeerClosed, Register }
2424
import pekko.io.dns.{ RecordClass, RecordType }
2525
import pekko.io.dns.internal.DnsClient.Answer
26-
import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe }
26+
import pekko.testkit.{ EventFilter, ImplicitSender, PekkoSpec, TestProbe }
2727

2828
class TcpDnsClientSpec extends PekkoSpec with ImplicitSender {
2929
import TcpDnsClient._
@@ -107,5 +107,24 @@ class TcpDnsClientSpec extends PekkoSpec with ImplicitSender {
107107
answerProbe.expectMsg(Answer(42, Nil))
108108
answerProbe.expectMsg(Answer(43, Nil))
109109
}
110+
111+
"fail when the connection just terminates" in {
112+
val tcpExtensionProbe = TestProbe()
113+
val answerProbe = TestProbe()
114+
val connectionProbe = TestProbe()
115+
116+
val client = system.actorOf(Props(new TcpDnsClient(tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
117+
118+
client ! exampleRequestMessage
119+
120+
tcpExtensionProbe.expectMsg(Tcp.Connect(dnsServerAddress))
121+
connectionProbe.send(tcpExtensionProbe.lastSender, Connected(dnsServerAddress, localAddress))
122+
connectionProbe.expectMsgType[Register]
123+
124+
EventFilter[ActorKilledException](occurrences = 1).intercept {
125+
// simulate connection stopping due to register timeout => client must fail
126+
connectionProbe.ref ! Kill
127+
}
128+
}
110129
}
111130
}

actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
package org.apache.pekko.io.dns.internal
1515

1616
import java.net.InetSocketAddress
17-
1817
import org.apache.pekko
1918
import pekko.PekkoException
20-
import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash }
19+
import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash, Terminated }
2120
import pekko.annotation.InternalApi
2221
import pekko.io.Tcp
2322
import pekko.io.dns.internal.DnsClient.Answer
@@ -49,6 +48,7 @@ import pekko.util.ByteString
4948
log.debug("Connected to TCP address [{}]", ns)
5049
val connection = sender()
5150
context.become(ready(connection))
51+
context.watch(connection)
5252
connection ! Tcp.Register(self)
5353
unstashAll()
5454
case _: Message =>
@@ -80,7 +80,10 @@ import pekko.util.ByteString
8080
}
8181
}
8282
case Tcp.PeerClosed =>
83+
context.unwatch(connection)
8384
context.become(idle)
85+
case Terminated(`connection`) =>
86+
throwFailure("TCP connection terminated without closing (register timeout?)", None)
8487
}
8588

8689
private def parseResponse(data: ByteString) = {

0 commit comments

Comments
 (0)