Skip to content

Commit

Permalink
Freeze and unfreeze commands (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin255 authored Jan 15, 2021
1 parent 3e0b46a commit 8601154
Show file tree
Hide file tree
Showing 7 changed files with 568 additions and 21 deletions.
81 changes: 75 additions & 6 deletions src/IndexBuilder/BaseIndexBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class BaseIndexBuilder implements IndexBuilder

private ?Client $client = null;

private ?array $listOfIndicies = null;

public function __construct(ClientProvider $clientProvider)
{
$this->clientProvider = $clientProvider;
Expand Down Expand Up @@ -125,14 +127,77 @@ public function delete(IndexMapperInterface $indexMapper): void
}
}

/**
* {@inheritDoc}
*/
public function isFrozen(IndexMapperInterface $indexMapper): bool
{
$description = $this->getIndexDescription($indexMapper);

if ($description === null) {
$message = sprintf("Index with name '%s' not found.", $indexMapper->getName());
throw new IndexBuilderException($message);
}

return !empty($description['settings']['index']['frozen'])
&& $description['settings']['index']['frozen'] === 'true';
}

/**
* {@inheritDoc}
*/
public function freeze(IndexMapperInterface $indexMapper): void
{
try {
$this->getClient()->indices()->freeze(
[
'index' => $indexMapper->getName(),
]
);
$this->listOfIndicies = null;
} catch (Throwable $e) {
throw new IndexBuilderException($e->getMessage(), 0, $e);
}
}

/**
* {@inheritDoc}
*/
public function unfreeze(IndexMapperInterface $indexMapper): void
{
try {
$this->getClient()->indices()->unfreeze(
[
'index' => $indexMapper->getName(),
]
);
$this->listOfIndicies = null;
} catch (Throwable $e) {
throw new IndexBuilderException($e->getMessage(), 0, $e);
}
}

/**
* {@inheritDoc}
*/
public function hasIndex(IndexMapperInterface $indexMapper): bool
{
return $this->getIndexDescription($indexMapper) !== null;
}

/**
* Возвращает массив с описанием указанного индекса или null, если индекс
* не найден.
*
* @param IndexMapperInterface $indexMapper
*
* @return array|null
*/
private function getIndexDescription(IndexMapperInterface $indexMapper): ?array
{
$indices = $this->getListOfIndices();

return isset($indices[$indexMapper->getName()]);
return $indices[$indexMapper->getName()] ?? null;
}

/**
Expand All @@ -142,11 +207,15 @@ public function hasIndex(IndexMapperInterface $indexMapper): bool
*/
private function getListOfIndices(): array
{
return $this->getClient()->indices()->get(
[
'index' => '_all',
]
);
if ($this->listOfIndicies === null) {
$this->listOfIndicies = $this->getClient()->indices()->get(
[
'index' => '_all',
]
);
}

return $this->listOfIndicies;
}

/**
Expand Down
29 changes: 29 additions & 0 deletions src/IndexBuilder/IndexBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,35 @@ public function refresh(IndexMapperInterface $indexMapper): void;
*/
public function delete(IndexMapperInterface $indexMapper): void;

/**
* Возвращает правду, если индекс заморожен.
*
* @param IndexMapperInterface $indexMapper
*
* @return bool
*
* @throws IndexBuilderException
*/
public function isFrozen(IndexMapperInterface $indexMapper): bool;

/**
* Замораживает индекс.
*
* @param IndexMapperInterface $indexMapper
*
* @throws IndexBuilderException
*/
public function freeze(IndexMapperInterface $indexMapper): void;

/**
* Размораживает индекс.
*
* @param IndexMapperInterface $indexMapper
*
* @throws IndexBuilderException
*/
public function unfreeze(IndexMapperInterface $indexMapper): void;

/**
* Возвращает правду, если указанный индекс существует в elasticsearch.
*
Expand Down
72 changes: 68 additions & 4 deletions src/Storage/ElasticStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Liquetsoft\Fias\Elastic\ClientProvider\ClientProvider;
use Liquetsoft\Fias\Elastic\Exception\IndexMapperException;
use Liquetsoft\Fias\Elastic\Exception\IndexMapperRegistryException;
use Liquetsoft\Fias\Elastic\IndexBuilder\IndexBuilder;
use Liquetsoft\Fias\Elastic\IndexMapperInterface;
use Liquetsoft\Fias\Elastic\IndexMapperRegistry\IndexMapperRegistry;
use Throwable;

Expand All @@ -28,22 +30,44 @@ class ElasticStorage implements Storage
*/
private IndexMapperRegistry $registry;

/**
* Объект для работы с снастройками индекса.
*/
private IndexBuilder $indexBuilder;

/**
* Количество элементов для множественной вставки.
*/
private int $insertBatch;

/**
* Ссылка на объект клиента, если он уже был получен.
*/
private ?Client $client = null;

/**
* Данные операций для множественной отправки.
*
* @var array<string, array>
*/
private array $bulkOperations = [];

public function __construct(ClientProvider $clientProvider, IndexMapperRegistry $registry, int $insertBatch = 1000)
{
/**
* Список замороженных индексов, которые были разморожены для вставки.
*
* @var array<string, IndexMapperInterface>
*/
private array $unfrozedIndicies = [];

public function __construct(
ClientProvider $clientProvider,
IndexMapperRegistry $registry,
IndexBuilder $indexBuilder,
int $insertBatch = 1000
) {
$this->clientProvider = $clientProvider;
$this->registry = $registry;
$this->indexBuilder = $indexBuilder;
$this->insertBatch = $insertBatch;
}

Expand All @@ -60,6 +84,7 @@ public function start(): void
public function stop(): void
{
$this->flushBulk();
$this->freezeIndices();
}

/**
Expand Down Expand Up @@ -232,7 +257,7 @@ private function createOperationArray(string $operation, object $object): array
{
$operationArray = [];

$mapper = $this->registry->getMapperForObject($object);
$mapper = $this->getAndPrepareMapperForObject($object);
$index = $mapper->getName();
$id = $mapper->extractPrimaryFromEntity($object);

Expand All @@ -256,13 +281,52 @@ private function createOperationArray(string $operation, object $object): array
return $operationArray;
}

/**
* Получает и подготавливает маппер для указанного объекта.
*
* @param object $object
*
* @return IndexMapperInterface
*
* @throws IndexMapperRegistryException
* @throws IndexMapperException
*/
private function getAndPrepareMapperForObject(object $object): IndexMapperInterface
{
$mapper = $this->registry->getMapperForObject($object);
$name = $mapper->getName();

if (!isset($this->unfrozedIndicies[$name]) && $this->indexBuilder->isFrozen($mapper)) {
$this->indexBuilder->unfreeze($mapper);
$this->unfrozedIndicies[$name] = $mapper;
}

return $mapper;
}

/**
* Замораживает все размороженные в процессе работы индексы.
*/
private function freezeIndices(): void
{
foreach ($this->unfrozedIndicies as $mapper) {
$this->indexBuilder->freeze($mapper);
}

$this->unfrozedIndicies = [];
}

/**
* Возвращает клиента из текущего провайдера клиента elasticsearch.
*
* @return Client
*/
private function getClient(): Client
{
return $this->clientProvider->provide();
if ($this->client === null) {
$this->client = $this->clientProvider->provide();
}

return $this->client;
}
}
40 changes: 40 additions & 0 deletions src/Task/FreezeElasticIndiciesTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Liquetsoft\Fias\Elastic\Task;

use Liquetsoft\Fias\Component\Pipeline\State\State;
use Liquetsoft\Fias\Component\Pipeline\Task\Task;
use Liquetsoft\Fias\Elastic\IndexBuilder\IndexBuilder;
use Liquetsoft\Fias\Elastic\IndexMapperRegistry\IndexMapperRegistry;

/**
* Операция, которая помечает все индексы заморожеными.
*/
class FreezeElasticIndiciesTask implements Task
{
private IndexMapperRegistry $registry;

private IndexBuilder $builder;

public function __construct(IndexMapperRegistry $registry, IndexBuilder $builder)
{
$this->registry = $registry;
$this->builder = $builder;
}

/**
* {@inheritDoc}
*/
public function run(State $state): void
{
$mappers = $this->registry->getAllMappers();

foreach ($mappers as $mapper) {
if (!$this->builder->isFrozen($mapper)) {
$this->builder->freeze($mapper);
}
}
}
}
Loading

0 comments on commit 8601154

Please sign in to comment.