@@ -667,77 +667,6 @@ func TestTenantStreamingDeleteRange(t *testing.T) {
667
667
checkDelRangeOnTable ("t2" , false /* embeddedInSST */ )
668
668
}
669
669
670
- func TestTenantStreamingMultipleNodes (t * testing.T ) {
671
- defer leaktest .AfterTest (t )()
672
- defer log .Scope (t ).Close (t )
673
-
674
- skip .UnderDeadlock (t , "multi-node may time out under deadlock" )
675
- skip .UnderRace (t , "multi-node test may time out under race" )
676
-
677
- ctx := context .Background ()
678
-
679
- testutils .RunTrueAndFalse (t , "fromSystem" , func (t * testing.T , sys bool ) {
680
- args := replicationtestutils .DefaultTenantStreamingClustersArgs
681
- args .MultitenantSingleClusterNumNodes = 3
682
- args .RoutingMode = streamclient .RoutingModeNode
683
-
684
- // Track the number of unique addresses that were connected to
685
- clientAddresses := make (map [string ]struct {})
686
- var addressesMu syncutil.Mutex
687
- args .TestingKnobs = & sql.StreamingTestingKnobs {
688
- BeforeClientSubscribe : func (addr string , token string , _ span.Frontier , _ bool ) {
689
- addressesMu .Lock ()
690
- defer addressesMu .Unlock ()
691
- clientAddresses [addr ] = struct {}{}
692
- },
693
- }
694
-
695
- if sys {
696
- args .SrcTenantID = roachpb .SystemTenantID
697
- args .SrcTenantName = "system"
698
- }
699
- telemetry .GetFeatureCounts (telemetry .Raw , telemetry .ResetCounts )
700
- c , cleanup := replicationtestutils .CreateMultiTenantStreamingCluster (ctx , t , args )
701
- defer cleanup ()
702
-
703
- // Make sure we have data on all nodes, so that we will have multiple
704
- // connections and client addresses (and actually test multi-node).
705
- replicationtestutils .CreateScatteredTable (t , c , 3 )
706
-
707
- producerJobID , ingestionJobID := c .StartStreamReplication (ctx )
708
- jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (producerJobID ))
709
- jobutils .WaitForJobToRun (c .T , c .DestSysSQL , jobspb .JobID (ingestionJobID ))
710
-
711
- c .SrcExec (func (t * testing.T , sysSQL * sqlutils.SQLRunner , tenantSQL * sqlutils.SQLRunner ) {
712
- tenantSQL .Exec (t , "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)" )
713
- tenantSQL .Exec (t , "INSERT INTO d.x VALUES (1, 1)" )
714
- })
715
-
716
- c .DestSysSQL .Exec (t , `PAUSE JOB $1` , ingestionJobID )
717
- jobutils .WaitForJobToPause (t , c .DestSysSQL , jobspb .JobID (ingestionJobID ))
718
- c .SrcExec (func (t * testing.T , sysSQL * sqlutils.SQLRunner , tenantSQL * sqlutils.SQLRunner ) {
719
- tenantSQL .Exec (t , "INSERT INTO d.x VALUES (2, 2)" )
720
- })
721
- c .DestSysSQL .Exec (t , `RESUME JOB $1` , ingestionJobID )
722
- jobutils .WaitForJobToRun (t , c .DestSysSQL , jobspb .JobID (ingestionJobID ))
723
-
724
- c .SrcExec (func (t * testing.T , sysSQL * sqlutils.SQLRunner , tenantSQL * sqlutils.SQLRunner ) {
725
- tenantSQL .Exec (t , "INSERT INTO d.x VALUES (3, 3)" )
726
- })
727
-
728
- c .WaitUntilStartTimeReached (jobspb .JobID (ingestionJobID ))
729
-
730
- cutoverTime := c .DestSysServer .Clock ().Now ()
731
- c .Cutover (ctx , producerJobID , ingestionJobID , cutoverTime .GoTime (), false )
732
- counts := telemetry .GetFeatureCounts (telemetry .Raw , telemetry .ResetCounts )
733
- require .GreaterOrEqual (t , counts ["physical_replication.cutover" ], int32 (1 ))
734
- c .RequireFingerprintMatchAtTimestamp (cutoverTime .AsOfSystemTime ())
735
-
736
- // Since the data was distributed across multiple nodes, multiple nodes should've been connected to
737
- require .Greater (t , len (clientAddresses ), 1 )
738
- })
739
- }
740
-
741
670
func TestSpecsPersistedOnlyAfterInitialPlan (t * testing.T ) {
742
671
defer leaktest .AfterTest (t )()
743
672
defer log .Scope (t ).Close (t )
0 commit comments