@@ -245,89 +245,6 @@ func mqttMakeTestServer(tb testing.TB) *mqttTarget {
245
245
}
246
246
}
247
247
248
- func mqttMakeTestServerWithLeafnode (hubd , leafd string , connectSystemAccount bool ) func (tb testing.TB ) * mqttTarget {
249
- return func (tb testing.TB ) * mqttTarget {
250
- tb .Helper ()
251
-
252
- if hubd != "" {
253
- hubd = "domain: " + hubd + ", "
254
- }
255
- sconf := `
256
- listen: 127.0.0.1:-1
257
-
258
- server_name: HUB
259
- jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + hubd + `store_dir: '` + tb .TempDir () + `'}
260
-
261
- leafnodes {
262
- listen: 127.0.0.1:-1
263
- }
264
-
265
- accounts {
266
- ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
267
- $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
268
- }
269
-
270
- mqtt {
271
- listen: 127.0.0.1:-1
272
- }
273
- `
274
- hubConf := createConfFile (tb , []byte (sconf ))
275
- hubServer , o := RunServerWithConfig (hubConf )
276
- leafRemoteAddr := fmt .Sprintf ("%s:%d" , o .LeafNode .Host , o .LeafNode .Port )
277
- tb .Cleanup (func () {
278
- os .Remove (hubConf )
279
- })
280
-
281
- sysRemote := ""
282
- if connectSystemAccount {
283
- sysRemote = `{ url: "nats://admin:s3cr3t!@` + leafRemoteAddr + `", account: "$SYS" },` + "\n \t \t "
284
- }
285
- if leafd != "" {
286
- leafd = "domain: " + leafd + ", "
287
- }
288
- leafconf := `
289
- listen: 127.0.0.1:-1
290
-
291
- server_name: SPOKE
292
- jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + leafd + `store_dir: '` + tb .TempDir () + `'}
293
-
294
- leafnodes {
295
- remotes = [
296
- ` + sysRemote + `{ url: "nats://one:p@` + leafRemoteAddr + `", account: "ONE" },
297
- ]
298
- }
299
-
300
- accounts {
301
- ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
302
- $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
303
- }
304
-
305
- mqtt {
306
- listen: 127.0.0.1:-1
307
- }
308
- `
309
- leafConf := createConfFile (tb , []byte (leafconf ))
310
- leafServer , _ := RunServerWithConfig (leafConf )
311
- tb .Cleanup (func () {
312
- os .Remove (leafConf )
313
- })
314
-
315
- both := []mqttDial {
316
- mqttNewDialForServer (hubServer , "one" , "p" ),
317
- mqttNewDialForServer (leafServer , "one" , "p" ),
318
- }
319
- return & mqttTarget {
320
- singleServers : []* Server {hubServer , leafServer },
321
- all : both ,
322
- configs : []mqttTestConfig {
323
- {name : "pub to all" , pub : both , sub : both },
324
- {name : "pub to SPOKE" , pub : both [1 :], sub : both },
325
- {name : "pub to HUB" , pub : both [:1 ], sub : both },
326
- },
327
- }
328
- }
329
- }
330
-
331
248
func mqttMakeTestCluster (size int , domain string ) func (tb testing.TB ) * mqttTarget {
332
249
return func (tb testing.TB ) * mqttTarget {
333
250
tb .Helper ()
@@ -393,119 +310,6 @@ func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarge
393
310
}
394
311
}
395
312
396
- func mqttMakeTestClusterWithLeafnodeCluster (hubd , leafd string , connectSystemAccount bool ) func (tb testing.TB ) * mqttTarget {
397
- return func (tb testing.TB ) * mqttTarget {
398
- tb .Helper ()
399
-
400
- // Create HUB cluster.
401
- if hubd != "" {
402
- hubd = "domain: " + hubd + ", "
403
- }
404
- hubConf := `
405
- listen: 127.0.0.1:-1
406
-
407
- server_name: %s
408
- jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + hubd + `store_dir: '%s'}
409
-
410
- leafnodes {
411
- listen: 127.0.0.1:-1
412
- }
413
-
414
- cluster {
415
- name: %s
416
- listen: 127.0.0.1:%d
417
- routes = [%s]
418
- }
419
-
420
- mqtt {
421
- listen: 127.0.0.1:-1
422
- stream_replicas: 3
423
- }
424
-
425
- accounts {
426
- ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
427
- $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
428
- }
429
- `
430
- hub := createJetStreamClusterWithTemplate (tb , hubConf , "HUB" , 3 )
431
- hub .waitOnLeader ()
432
-
433
- // Pick a host to connect leafnodes to
434
- lno := hub .randomNonLeader ().getOpts ().LeafNode
435
- leafRemoteAddr := fmt .Sprintf ("%s:%d" , lno .Host , lno .Port )
436
- hubRandom := mqttNewDialForServer (hub .randomNonLeader (), "one" , "p" )
437
- hubAll := []mqttDial {}
438
- for _ , s := range hub .servers {
439
- hubAll = append (hubAll , mqttNewDialForServer (s , "one" , "p" ))
440
- }
441
-
442
- // Create SPOKE (leafnode) cluster.
443
- sysRemote := ""
444
- if connectSystemAccount {
445
- sysRemote = `{ url: "nats://admin:s3cr3t!@` + leafRemoteAddr + `", account: "$SYS" },` + "\n \t \t \t "
446
- }
447
- if leafd != "" {
448
- leafd = "domain: " + leafd + ", "
449
- }
450
- leafConf := `
451
- listen: 127.0.0.1:-1
452
-
453
- server_name: %s
454
- jetstream: {max_mem_store: 256MB, max_file_store: 2GB, ` + leafd + `store_dir: '%s'}
455
-
456
- leafnodes {
457
- remotes = [
458
- ` + sysRemote + `{ url: "nats://one:p@` + leafRemoteAddr + `", account: "ONE" },
459
- ]
460
- }
461
-
462
- cluster {
463
- name: %s
464
- listen: 127.0.0.1:%d
465
- routes = [%s]
466
- }
467
-
468
- mqtt {
469
- listen: 127.0.0.1:-1
470
- stream_replicas: 3
471
- }
472
-
473
- accounts {
474
- ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
475
- $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
476
- }
477
- `
478
- spoke := createJetStreamCluster (tb , leafConf , "SPOKE" , "SPOKE-" , 3 , 22111 , false )
479
- expectedConnections := 2
480
- if ! connectSystemAccount {
481
- expectedConnections = 1
482
- }
483
- for _ , s := range spoke .servers {
484
- checkLeafNodeConnectedCount (tb , s , expectedConnections )
485
- }
486
- spoke .waitOnPeerCount (3 )
487
- spokeRandom := mqttNewDialForServer (spoke .randomNonLeader (), "one" , "p" )
488
- spokeAll := []mqttDial {}
489
- for _ , s := range spoke .servers {
490
- spokeAll = append (spokeAll , mqttNewDialForServer (s , "one" , "p" ))
491
- }
492
-
493
- all := append (hubAll , spokeAll ... )
494
-
495
- return & mqttTarget {
496
- clusters : []* cluster {hub , spoke },
497
- all : all ,
498
- configs : []mqttTestConfig {
499
- {name : "publish to all" , pub : all , sub : all },
500
- {name : "publish to all hub" , pub : hubAll , sub : all },
501
- {name : "publish to random in hub" , pub : []mqttDial {hubRandom }, sub : all },
502
- {name : "publish to all spoke" , pub : spokeAll , sub : all },
503
- {name : "publish to random in spoke" , pub : []mqttDial {spokeRandom }, sub : all },
504
- },
505
- }
506
- }
507
- }
508
-
509
313
var mqttCLICommandPath = func () string {
510
314
p := os .Getenv ("MQTT_CLI" )
511
315
if p == "" {
0 commit comments