Skip to content

Commit 3b8dbb9

Browse files
committed
added NetMQPollerTest.RemoveAndDisposeSocket() as a repro for issue zeromq#834
1 parent e4dfcf9 commit 3b8dbb9

File tree

1 file changed

+76
-0
lines changed

1 file changed

+76
-0
lines changed

src/NetMQ.Tests/NetMQPollerTest.cs

+76
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,82 @@ public void RemoveSocket()
400400
}
401401
}
402402

403+
[Fact]
404+
public async void RemoveAndDisposeSocket()
405+
{
406+
//set up poller, start it
407+
var patient = new NetMQPoller();
408+
patient.RunAsync();
409+
410+
Assert.True(patient.IsRunning);
411+
412+
//create a pub-sub pair
413+
var port = 55667;
414+
var conn = $"tcp://127.0.0.1:{port}";
415+
416+
var pub = new PublisherSocket();
417+
pub.Bind(conn);
418+
419+
var sub = new SubscriberSocket();
420+
sub.Connect(conn);
421+
sub.SubscribeToAnyTopic();
422+
423+
//handle callbacks from poller thread
424+
sub.ReceiveReady += (s, e) =>
425+
{
426+
var msg = e.Socket.ReceiveFrameString();
427+
428+
Debug.WriteLine($"sub has data: {msg}");
429+
};
430+
431+
//add the subscriber socket to poller
432+
patient.Add(sub);
433+
434+
//set up pub on separate thread
435+
var canceller = new CancellationTokenSource();
436+
437+
var pubAction = new Action(async () =>
438+
{
439+
var token = canceller.Token;
440+
441+
uint i = 0;
442+
443+
while(!token.IsCancellationRequested)
444+
{
445+
pub.SendFrame($"Hello-{++i}");
446+
447+
// send ~ 5Hz
448+
await Task.Delay(200);
449+
}
450+
});
451+
452+
//var pubThread = new Task(pubAction, canceller.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);
453+
//pubThread.Start();
454+
455+
var pubThread = Task.Run(pubAction);
456+
457+
//allow a little time to run
458+
await Task.Delay(5000);
459+
460+
//now try to remove the sub from poller
461+
patient.Remove(sub);
462+
463+
// dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught!
464+
sub.Dispose();
465+
466+
//allow for poller to continue running
467+
await Task.Delay(3000);
468+
469+
patient.Stop();
470+
Assert.False(patient.IsRunning);
471+
472+
canceller.Cancel();
473+
474+
//patient?.Dispose();
475+
pub?.Dispose();
476+
sub?.Dispose();
477+
}
478+
403479
[Fact]
404480
public void AddThrowsIfSocketAlreadyDisposed()
405481
{

0 commit comments

Comments
 (0)