diff --git a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php index 22981359e..ed7fdd356 100644 --- a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php +++ b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php @@ -25,26 +25,30 @@ use OpenSearch\Common\Exceptions\NoNodesAvailableException; use OpenSearch\ConnectionPool\Selectors\SelectorInterface; use OpenSearch\Connections\Connection; -use OpenSearch\Connections\ConnectionInterface; use OpenSearch\Connections\ConnectionFactoryInterface; +use OpenSearch\Connections\ConnectionInterface; -class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface +class SniffingConnectionPool extends AbstractConnectionPool { /** * @var int */ - private $sniffingInterval = 300; + private $sniffingInterval; /** * @var int */ - private $nextSniff = -1; + private $nextSniff; /** * {@inheritdoc} */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { + public function __construct( + $connections, + SelectorInterface $selector, + ConnectionFactoryInterface $factory, + $connectionPoolParams + ) { parent::__construct($connections, $selector, $factory, $connectionPoolParams); $this->setConnectionPoolParams($connectionPoolParams); @@ -78,9 +82,9 @@ public function scheduleCheck(): void $this->nextSniff = -1; } - private function sniff(bool $force = false) + private function sniff(bool $force = false): void { - if ($force === false && $this->nextSniff >= time()) { + if ($force === false && $this->nextSniff > time()) { return; } @@ -123,19 +127,19 @@ private function sniffConnection(Connection $connection): bool return false; } - $nodes = $this->parseClusterState($connection->getTransportSchema(), $response); + $nodes = $this->parseClusterState($response); if (count($nodes) === 0) { return false; } - $this->connections = array(); + $this->connections = []; foreach ($nodes as $node) { - $nodeDetails = array( + $nodeDetails = [ 'host' => $node['host'], - 'port' => $node['port'] - ); + 'port' => $node['port'], + ]; $this->connections[] = $this->connectionFactory->create($nodeDetails); } @@ -144,18 +148,18 @@ private function sniffConnection(Connection $connection): bool return true; } - private function parseClusterState(string $transportSchema, $nodeInfo): array + private function parseClusterState($nodeInfo): array { - $pattern = '/([^:]*):([0-9]+)/'; - $hosts = []; + $pattern = '/([^:]*):(\d+)/'; + $hosts = []; foreach ($nodeInfo['nodes'] as $node) { if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { - $hosts[] = array( + $hosts[] = [ 'host' => $match[1], - 'port' => (int) $match[2], - ); + 'port' => (int)$match[2], + ]; } } } @@ -163,10 +167,8 @@ private function parseClusterState(string $transportSchema, $nodeInfo): array return $hosts; } - private function setConnectionPoolParams(array $connectionPoolParams) + private function setConnectionPoolParams(array $connectionPoolParams): void { - if (isset($connectionPoolParams['sniffingInterval']) === true) { - $this->sniffingInterval = $connectionPoolParams['sniffingInterval']; - } + $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300); } } diff --git a/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php b/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php index 997b29622..168cbe1db 100644 --- a/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php +++ b/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php @@ -23,7 +23,9 @@ use OpenSearch\ClientBuilder; use OpenSearch\ConnectionPool\SniffingConnectionPool; +use OpenSearch\ConnectionPool\StaticConnectionPool; use OpenSearch\Tests\Utility; +use PHPUnit\Framework\TestCase; /** * Class SniffingConnectionPoolIntegrationTest @@ -31,18 +33,14 @@ * @subpackage Tests/SniffingConnectionPoolTest * @group Integration */ -class SniffingConnectionPoolIntegrationTest extends \PHPUnit\Framework\TestCase +class SniffingConnectionPoolIntegrationTest extends TestCase { - protected function setUp(): void - { - static::markTestSkipped("All of Sniffing unit tests use outdated cluster state format, need to redo"); - } - - public function testSniff() + public function testSniff(): void { $client = ClientBuilder::create() ->setHosts([Utility::getHost()]) - ->setConnectionPool(SniffingConnectionPool::class, ['sniffingInterval' => -10]) + ->setConnectionPool(SniffingConnectionPool::class, ['sniffingInterval' => 10]) + ->setSSLVerification(false) ->build(); $pinged = $client->ping(); diff --git a/tests/ConnectionPool/SniffingConnectionPoolTest.php b/tests/ConnectionPool/SniffingConnectionPoolTest.php index 613f5d03a..5935b14eb 100644 --- a/tests/ConnectionPool/SniffingConnectionPoolTest.php +++ b/tests/ConnectionPool/SniffingConnectionPoolTest.php @@ -6,7 +6,7 @@ * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a + * this file to be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See @@ -15,385 +15,279 @@ namespace OpenSearch\Tests\ConnectionPool; +use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException; use OpenSearch\Common\Exceptions\NoNodesAvailableException; -use OpenSearch\ConnectionPool\Selectors\SelectorInterface; +use OpenSearch\ConnectionPool\Selectors\RoundRobinSelector; use OpenSearch\ConnectionPool\SniffingConnectionPool; use OpenSearch\Connections\Connection; -use Mockery as m; -use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException; use OpenSearch\Connections\ConnectionFactoryInterface; +use PHPUnit\Framework\TestCase; /** * Class SniffingConnectionPoolTest * * @subpackage Tests/SniffingConnectionPoolTest */ -class SniffingConnectionPoolTest extends \PHPUnit\Framework\TestCase +class SniffingConnectionPoolTest extends TestCase { - protected function setUp(): void + /** @test */ + public function itShouldReturnTheSingleLiveConnectionAvailable(): void { - static::markTestSkipped("All of Sniffing unit tests use outdated cluster state format, need to redo"); + $clusterState = $this->clusterState(1); + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(true); + $connection->method('sniff')->willReturn($clusterState); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturn($connection); + + $connectionPool = new SniffingConnectionPool( + [$connection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); + + $this->assertSame($connection, $connectionPool->nextConnection()); } - - public function tearDown(): void + /** @test */ + public function itShouldSniffNewConnectionsWhenPossible(): void { - m::close(); + $clusterState = $this->clusterState(2); + $originalConnection = $this->createMock(Connection::class); + $originalConnection->method('isAlive')->willReturn(false); + $originalConnection->method('sniff')->willReturn($clusterState); + $discoveredConnection = $this->createMock(Connection::class); + $discoveredConnection->method('isAlive')->willReturn(true); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls($originalConnection, $discoveredConnection); + + $connectionPool = new SniffingConnectionPool( + [$originalConnection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); + + $actualConnection = $connectionPool->nextConnection(); + + $this->assertSame($discoveredConnection, $actualConnection); } - public function testAddOneHostThenGetConnection() + /** @test */ + public function forceNextConnection(): void { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(true); - $mockConnection->allows('isAlive')->andReturns(true); - - $connections = [$mockConnection]; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturns($connections[0]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - - $connectionPoolParams = ['randomizeHosts' => false]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($mockConnection, $retConnection); + $clusterState = $this->clusterState(2); + $firstConnection = $this->createMock(Connection::class); + $firstConnection->method('isAlive')->willReturn(true); + $firstConnection->method('sniff')->willReturn($clusterState); + $secondConnection = $this->createMock(Connection::class); + $secondConnection->method('isAlive')->willReturn(true); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); + + $connectionPool = new SniffingConnectionPool( + [$firstConnection, $secondConnection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); + + $this->assertSame($secondConnection, $connectionPool->nextConnection(true)); } - public function testAddOneHostAndTriggerSniff() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true); - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andReturns($clusterState); - - $connections = [$mockConnection]; - $mockNewConnection = m::mock(Connection::class); - $mockNewConnection->allows('isAlive')->andReturns(true); - $selector = m::mock(SelectorInterface::class); - $selector->expects('select')->twice()->andReturns($mockNewConnection); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($mockNewConnection); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($mockNewConnection, $retConnection); - } - - public function testAddOneHostAndForceNext() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"Bl2ihSr7TcuUHxhu1GA_YQ":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}}}', true); - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andReturns($clusterState); - - $connections = [$mockConnection]; - $mockNewConnection = m::mock(Connection::class); - $mockNewConnection->allows('isAlive')->andReturns(true); - - $selector = m::mock(SelectorInterface::class); - $selector->expects('select')->andReturns($mockConnection); - $selector->expects('select')->andReturns($mockNewConnection); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($mockNewConnection); - - $connectionPoolParams = [ - 'randomizeHosts' => false - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(true); - - $this->assertSame($mockNewConnection, $retConnection); - } - - public function testAddTenNodesThenGetConnection() + /** @test */ + public function itShouldReturnFirstSeededConnectionIfAlive(): void { + $clusterState = $this->clusterState(10); $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(true); - $mockConnection->allows('isAlive')->andReturns(true); - - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(true); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturns($connections[0]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - - $connectionPoolParams = ['randomizeHosts' => false]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($connections[0], $retConnection); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); + + $connectionPool = new SniffingConnectionPool( + $connections, + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); + + $this->assertSame($connections[0], $connectionPool->nextConnection()); } - public function testAddTenNodesTimeoutAllButLast() + /** @test */ + public function itShouldReturnTheFirstAvailableConnection(): void { + $clusterState = $this->clusterState(10); $connections = []; - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(false); - - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } - - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(true); - $mockConnection->allows('isAlive')->andReturns(true); - - $connections[] = $mockConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($connections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - - $connectionPoolParams = ['randomizeHosts' => false]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($connections[9], $retConnection); + $randomLiveConnectionIndex = random_int(0, 9); + $connections[$randomLiveConnectionIndex] = $this->createMock(Connection::class); + $connections[$randomLiveConnectionIndex]->method('isAlive')->willReturn(true); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); + + $connectionPool = new SniffingConnectionPool( + $connections, + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); + + $this->assertSame($connections[$randomLiveConnectionIndex], $connectionPool->nextConnection()); } - public function testAddTenNodesAllTimeout() + /** @test */ + public function itShouldFailIfAllNodesAreDown(): void { + $clusterState = $this->clusterState(10); $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(false); - - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($connections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - - $connectionPoolParams = ['randomizeHosts' => false]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); + $connectionPool = new SniffingConnectionPool( + $connections, + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); $connectionPool->nextConnection(); } - public function testAddOneHostSniffTwo() - { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->twice()->andReturns('http'); - $mockConnection->expects('sniff')->twice()->andReturns($clusterState); - - $connections = [$mockConnection]; - - $newConnections = []; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(true); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues([ - //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1], - ]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($newConnections[0]); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns($newConnections[1]); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[0], $retConnection); - - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[1], $retConnection); - } - - public function testAddSeedSniffTwoTimeoutTwo() + /** @test */ + public function sniffShouldStopIfAllSniffRequestsFail(): void { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andReturns($clusterState); - - $connections = [$mockConnection]; - - $newConnections = []; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(false); - $newConnection->allows('ping')->andReturns(false); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues([ //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1], - ]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($newConnections[0]); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns($newConnections[1]); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willThrowException(new OperationTimeoutException()); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + + $connectionPool = new SniffingConnectionPool( + [$connection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); + $connectionFactory->expects($this->never())->method('create'); - $retConnection = $connectionPool->nextConnection(); + $connectionPool->nextConnection(); } - public function testTenTimeoutNineSniffTenthAddTwoAlive() + /** @test */ + public function sniffShouldStopIfNodesAreEmpty(): void { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); - - $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(true); - $mockConnection->allows('sniff')->andThrow(OperationTimeoutException::class); - - $connections[] = $mockConnection; - } - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('sniff')->andReturns($clusterState); - $mockConnection->expects('getTransportSchema')->twice()->andReturns('http'); - - $connections[] = $mockConnection; - - $newConnections = $connections; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(true); - $newConnection->allows('ping')->andReturns(true); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; + $clusterState = $this->clusterState(0); + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($clusterState); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + + $connectionPool = new SniffingConnectionPool( + [$connection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($newConnections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($newConnections[10]); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns($newConnections[11]); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[11], $retConnection); + $this->expectException(NoNodesAvailableException::class); + $connectionFactory->expects($this->never())->method('create'); - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[12], $retConnection); + $connectionPool->nextConnection(); } - public function testTenTimeoutNineSniffTenthAddTwoDeadTimeoutEveryone() + /** @test */ + public function itShouldNotSniffBeforeScheduledSniffTime(): void { - $clusterState = json_decode('{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', true); + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($this->clusterState(2)); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + + $connectionPool = new SniffingConnectionPool( + [$connection], + $selector, + $connectionFactory, + ['sniffingInterval' => 300] + ); + + $connectionFactory->expects($this->never())->method('create'); + $this->expectException(NoNodesAvailableException::class); - $connections = []; + $connectionPool->nextConnection(); + } - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(true); - $mockConnection->allows('sniff')->andThrow(OperationTimeoutException::class); + /** @test */ + public function scheduleCheck(): void + { + $clusterState = $this->clusterState(2); + $firstConnection = $this->createMock(Connection::class); + $firstConnection->method('isAlive')->willReturn(true); + $firstConnection->method('sniff')->willReturn($clusterState); + $secondConnection = $this->createMock(Connection::class); + $secondConnection->method('isAlive')->willReturn(true); + $selector = $this->createMock(RoundRobinSelector::class); + $selector->expects($this->exactly(2))->method('select')->willReturnOnConsecutiveCalls( + $firstConnection, + $secondConnection + ); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); + + $connectionPool = new SniffingConnectionPool( + [$firstConnection], + $selector, + $connectionFactory, + ['sniffingInterval' => 300] + ); + + $connectionPool->scheduleCheck(); + + $this->assertSame($secondConnection, $connectionPool->nextConnection()); + } - $connections[] = $mockConnection; + private function clusterState(int $numberOfNodes): array + { + $clusterState = ['nodes' => []]; + + for ($i = 1; $i <= $numberOfNodes; $i++) { + $clusterState['nodes']["node-$i"] = [ + 'http' => [ + 'publish_address' => "172.17.0.2:920$i", + ], + ]; } - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('sniff')->andReturns($clusterState); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andThrow(OperationTimeoutException::class); - - $connections[] = $mockConnection; - - $newConnections = $connections; - - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(false); - $newConnection->allows('ping')->andReturns(false); - $newConnection->allows('sniff')->andThrow(OperationTimeoutException::class); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($newConnections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns($newConnections[10]); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns($newConnections[11]); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1 - ]; - $connectionPool = new SniffingConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $retConnection = $connectionPool->nextConnection(); + return $clusterState; } }