22
22
import com .oracle .bedrock .runtime .coherence .options .OperationalOverride ;
23
23
import com .oracle .bedrock .runtime .coherence .options .RoleName ;
24
24
import com .oracle .bedrock .runtime .coherence .options .WellKnownAddress ;
25
+ import com .oracle .bedrock .runtime .java .features .JmxFeature ;
25
26
import com .oracle .bedrock .runtime .java .options .SystemProperty ;
26
27
import com .oracle .bedrock .runtime .options .DisplayName ;
27
28
import com .oracle .bedrock .runtime .options .StabilityPredicate ;
51
52
import org .junit .Test ;
52
53
import org .junit .rules .TestName ;
53
54
import topics .callables .GetTopicServiceName ;
54
- import topics .callables .ResumeService ;
55
- import topics .callables .SuspendService ;
56
55
56
+ import javax .management .MBeanInfo ;
57
+ import javax .management .MBeanServerConnection ;
58
+ import javax .management .ObjectName ;
57
59
import java .io .DataInput ;
58
60
import java .io .DataOutput ;
59
61
import java .io .File ;
84
86
* enabled and is removed via a clean shutdown (as would happen in k8s
85
87
* using the operator).
86
88
*/
87
- @ SuppressWarnings ("CallToPrintStackTrace" )
89
+ @ SuppressWarnings ({ "CallToPrintStackTrace" , "resource" } )
88
90
public abstract class AbstractTopicsStorageRecoveryTests
89
91
{
90
92
@ Before
@@ -350,8 +352,7 @@ optComplete, withIdentifyingName(sName)))
350
352
351
353
// Suspend the services - we do this via the storage member like the Operator would
352
354
Logger .info (">>>> Suspending service " + sServiceName + " published=" + cPublished .get ());
353
- Boolean fSuspended = member .invoke (new SuspendService (sServiceName ));
354
- assertThat (fSuspended , is (true ));
355
+ suspendService (sServiceName );
355
356
Logger .info (">>>> Suspended service " + sServiceName + " published=" + cPublished .get ());
356
357
357
358
// shutdown the storage members
@@ -368,10 +369,7 @@ optComplete, withIdentifyingName(sName)))
368
369
369
370
// The cache service should still be suspended so resume it via a storage member like the operator would
370
371
Logger .info (">>>> Resuming service " + sServiceName + " published=" + cPublished .get ());
371
- member = s_storageCluster .stream ().findAny ().orElse (null );
372
- assertThat (member , is (notNullValue ()));
373
- Boolean fResumed = member .invoke (new ResumeService (sServiceName ));
374
- assertThat (fResumed , is (true ));
372
+ resumeService (sServiceName );
375
373
Logger .info (">>>> Resumed service " + sServiceName + " published=" + cPublished .get ());
376
374
Logger .info (">>>> Awake. published=" + cPublished .get ());
377
375
@@ -396,7 +394,7 @@ optComplete, withIdentifyingName(sName)))
396
394
{
397
395
CoherenceClusterMember member = s_storageCluster .stream ().findAny ().orElse (null );
398
396
assertThat (member , is (notNullValue ()));
399
- member . submit ( new ResumeService ( sServiceName )). get ( 1 , TimeUnit . MINUTES );
397
+ resumeService ( sServiceName );
400
398
CompletableFuture .runAsync (publisher ::close ).get (1 , TimeUnit .MINUTES );
401
399
}
402
400
}
@@ -429,8 +427,7 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
429
427
430
428
// Suspend the services - we do this via the storage member like the Operator would
431
429
Logger .info (">>>> Suspending service " + sServiceName );
432
- Boolean fSuspended = member .invoke (new SuspendService (sServiceName ));
433
- assertThat (fSuspended , is (true ));
430
+ suspendService (sServiceName );
434
431
Logger .info (">>>> Suspended service " + sServiceName );
435
432
436
433
// futures should not be completed
@@ -455,10 +452,7 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
455
452
456
453
// The topics cache service should still be suspended so resume it via a storage member like the operator would
457
454
Logger .info (">>>> Resuming service " + sServiceName );
458
- member = s_storageCluster .stream ().findAny ().orElse (null );
459
- assertThat (member , is (notNullValue ()));
460
- Boolean fResumed = member .invoke (new ResumeService (sServiceName ));
461
- assertThat (fResumed , is (true ));
455
+ resumeService (sServiceName );
462
456
Logger .info (">>>> Resumed service " + sServiceName );
463
457
464
458
// futures should not be completed
@@ -501,6 +495,37 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
501
495
502
496
// ----- helper methods -------------------------------------------------
503
497
498
+ protected void suspendService (String sServiceName ) throws Exception
499
+ {
500
+ CoherenceCluster cluster = s_storageCluster ;
501
+ CoherenceClusterMember member = cluster .getAny ();
502
+ JmxFeature jmxFeature = member .get (JmxFeature .class );
503
+ assertThat (jmxFeature , is (notNullValue ()));
504
+
505
+ MBeanServerConnection connection = jmxFeature .getDeferredJMXConnector ().get ().getMBeanServerConnection ();
506
+ ObjectName objectName = new ObjectName ("Coherence:type=Cluster" );
507
+ MBeanInfo info = connection .getMBeanInfo (objectName );
508
+ assertThat (info , is (notNullValue ()));
509
+
510
+ connection .invoke (objectName , "suspendService" , new Object []{sServiceName }, new String []{"java.lang.String" });
511
+ }
512
+
513
+ protected void resumeService (String sServiceName ) throws Exception
514
+ {
515
+ CoherenceCluster cluster = s_storageCluster ;
516
+ CoherenceClusterMember member = cluster .getAny ();
517
+ JmxFeature jmxFeature = member .get (JmxFeature .class );
518
+ assertThat (jmxFeature , is (notNullValue ()));
519
+
520
+ MBeanServerConnection connection = jmxFeature .getDeferredJMXConnector ().get ().getMBeanServerConnection ();
521
+ ObjectName objectName = new ObjectName ("Coherence:type=Cluster" );
522
+ MBeanInfo info = connection .getMBeanInfo (objectName );
523
+ assertThat (info , is (notNullValue ()));
524
+
525
+ connection .invoke (objectName , "resumeService" , new Object []{sServiceName }, new String []{"java.lang.String" });
526
+ }
527
+
528
+
504
529
protected void restartCluster ()
505
530
{
506
531
Logger .info (">>>> Stopping storage." );
@@ -549,6 +574,7 @@ private CoherenceCluster startCluster(String suffix)
549
574
Logging .atMax (),
550
575
JMXManagementMode .ALL ,
551
576
DisplayName .of (sMethodName + '-' + suffix ),
577
+ JmxFeature .enabled (),
552
578
s_testLogs .builder ());
553
579
554
580
builder .with (ClusterName .of (sMethodName ),
@@ -661,7 +687,7 @@ public int hashCode()
661
687
@ ClassRule
662
688
public static TestLogs s_testLogs = new TestLogs (TopicsRecoveryTests .class );
663
689
664
- protected static CoherenceCluster s_storageCluster ;
690
+ protected static volatile CoherenceCluster s_storageCluster ;
665
691
666
692
/**
667
693
* A JUnit rule that will cause the test to fail if it runs too long.
0 commit comments