Skip to content

Commit

Permalink
Respect limit/offset from query builder in dbal query loader (#1492)
Browse files Browse the repository at this point in the history
* Respect limit/offset from query builder in dbal query loader

* updated dsl definitions
  • Loading branch information
norberttech authored Feb 21, 2025
1 parent 168efb3 commit acf1376
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ final class DbalLimitOffsetExtractor implements Extractor
{
private ?int $maximum = null;

private int $offset = 0;

private int $pageSize = 1000;

private ?Schema $schema = null;
Expand Down Expand Up @@ -52,6 +54,14 @@ public static function table(

public function extract(FlowContext $context) : \Generator
{
if ($this->maximum === null && $this->queryBuilder->getMaxResults()) {
$this->maximum = $this->queryBuilder->getMaxResults();
}

if ($this->offset === 0 && $this->queryBuilder->getFirstResult()) {
$this->offset = $this->queryBuilder->getFirstResult();
}

if (isset($this->maximum)) {
$total = $this->maximum;
} else {
Expand All @@ -76,8 +86,8 @@ public function extract(FlowContext $context) : \Generator

$totalFetched = 0;

for ($page = 0; $page <= (new Pages($total, $this->pageSize))->pages(); $page++) {
$offset = $page * $this->pageSize;
for ($page = 0; $page < (new Pages($total, $this->pageSize))->pages(); $page++) {
$offset = $page * $this->pageSize + $this->offset;

$pageQuery = $this->queryBuilder
->setMaxResults($this->pageSize)
Expand Down Expand Up @@ -116,6 +126,17 @@ public function withMaximum(int $maximum) : self
return $this;
}

public function withOffset(int $offset) : self
{
if ($offset < 0) {
throw new InvalidArgumentException('Offset must be greater than 0, got ' . $offset);
}

$this->offset = $offset;

return $this;
}

public function withPageSize(int $pageSize) : self
{
if ($pageSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,22 @@ function from_dbal_limit_offset(
/**
* @param Connection $connection
* @param int $page_size
* @param null|int $maximum
* @param null|int $maximum - maximum can also be taken from a query builder, $maximum however is used regardless of the query builder if it's set
* @param int $offset - offset can also be taken from a query builder, $offset however is used regardless of the query builder if it's set to non 0 value
*/
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::EXTRACTOR)]
function from_dbal_limit_offset_qb(
Connection $connection,
QueryBuilder $queryBuilder,
int $page_size = 1000,
?int $maximum = null,
int $offset = 0,
) : DbalLimitOffsetExtractor {
$loader = (new DbalLimitOffsetExtractor(
$connection,
$queryBuilder,
))->withPageSize($page_size);
))->withPageSize($page_size)
->withOffset($offset);

if ($maximum !== null) {
$loader->withMaximum($maximum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
{
public function __construct(
private Connection $connection,
private InsertQueryCounter $logger,
private InsertQueryCounter $insertQueryCounter,
private SelectQueryCounter $selectQueryCounter,
) {
}

Expand Down Expand Up @@ -40,14 +41,34 @@ public function dropAllTables() : void
}
}

public function executedSelectQueries() : array
{
return $this->selectQueryCounter->queries;
}

public function insert(string $tableName, array $data, array $types = []) : void
{
$this->connection->insert($tableName, $data, $types);
}

public function numberOfExecutedInsertQueries() : int
{
return $this->logger->count;
return $this->insertQueryCounter->count;
}

public function numberOfExecutedSelectQueries() : int
{
return $this->selectQueryCounter->count;
}

public function resetInsertQueryCounter() : void
{
$this->insertQueryCounter->reset();
}

public function resetSelectQueryCounter() : void
{
$this->selectQueryCounter->reset();
}

public function selectAll(string $tableName) : array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public function log(mixed $level, string|\Stringable $message, array $context =
$this->count++;
}
}

public function reset() : void
{
$this->count = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Doctrine\Tests\Context;

use Psr\Log\{AbstractLogger, LoggerAwareInterface, LoggerAwareTrait, NullLogger};

final class SelectQueryCounter extends AbstractLogger implements LoggerAwareInterface
{
use LoggerAwareTrait;

public int $count = 0;

public array $queries = [];

public function __construct()
{
$this->logger = new NullLogger();
}

public function log(mixed $level, string|\Stringable $message, array $context = []) : void
{
if (!isset($context['sql'])) {
return;
}

if (\str_starts_with(\trim((string) $context['sql']), 'SELECT')) {
$this->count++;
$this->queries[] = $context['sql'];
}
}

public function reset() : void
{
$this->count = 0;
$this->queries = [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,121 @@ public function test_extracting_entire_table_using_qb() : void
);
}

public function test_extracting_entire_table_using_qb_with_maximum_and_offset_set_on_extractor() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
$table = 'flow_doctrine_bulk_test',
[
new Column('id', Type::getType(Types::INTEGER), ['notnull' => true]),
new Column('name', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
new Column('tags', Type::getType(Types::JSON), ['notnull' => true, 'length' => 255]),
],
))
->setPrimaryKey(['id']));

for ($i = 1; $i <= 25; $i++) {
$this->pgsqlDatabaseContext->insert($table, ['id' => $i, 'name' => 'name_' . $i, 'tags' => '{"a": 1, "b": 2 }']);
}

$this->pgsqlDatabaseContext->resetSelectQueryCounter();

$rows = (data_frame())
->extract(
from_dbal_limit_offset_qb(
$this->pgsqlDatabaseContext->connection(),
$this->pgsqlDatabaseContext->connection()->createQueryBuilder()
->from($table)
->select('*')
->orderBy('id', 'ASC')
)->withSchema(schema(
int_schema('id'),
str_schema('name'),
map_schema('tags', type_map(type_string(), type_int()))
))->withMaximum(5)->withOffset(10)->withPageSize(1)
)->fetch()->toArray();

self::assertSame(5, $this->pgsqlDatabaseContext->numberOfExecutedSelectQueries());
self::assertCount(5, $rows);
self::assertSame(11, $rows[0]['id']);

}

public function test_extracting_entire_table_using_qb_with_maximum_and_offset_set_on_query() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
$table = 'flow_doctrine_bulk_test',
[
new Column('id', Type::getType(Types::INTEGER), ['notnull' => true]),
new Column('name', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
new Column('tags', Type::getType(Types::JSON), ['notnull' => true, 'length' => 255]),
],
))
->setPrimaryKey(['id']));

for ($i = 1; $i <= 25; $i++) {
$this->pgsqlDatabaseContext->insert($table, ['id' => $i, 'name' => 'name_' . $i, 'tags' => '{"a": 1, "b": 2 }']);
}

$rows = (data_frame())
->extract(
from_dbal_limit_offset_qb(
$this->pgsqlDatabaseContext->connection(),
$this->pgsqlDatabaseContext->connection()->createQueryBuilder()
->from($table)
->select('*')
->orderBy('id', 'ASC')
->setMaxResults(5)
->setFirstResult(10),
)->withSchema(schema(
int_schema('id'),
str_schema('name'),
map_schema('tags', type_map(type_string(), type_int()))
))
)->fetch()->toArray();

self::assertCount(5, $rows);
self::assertSame(11, $rows[0]['id']);
}

public function test_extracting_entire_table_using_qb_with_maximum_set_on_query() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
$table = 'flow_doctrine_bulk_test',
[
new Column('id', Type::getType(Types::INTEGER), ['notnull' => true]),
new Column('name', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
new Column('tags', Type::getType(Types::JSON), ['notnull' => true, 'length' => 255]),
],
))
->setPrimaryKey(['id']));

for ($i = 1; $i <= 25; $i++) {
$this->pgsqlDatabaseContext->insert($table, ['id' => $i, 'name' => 'name_' . $i, 'tags' => '{"a": 1, "b": 2 }']);
}

$this->pgsqlDatabaseContext->resetSelectQueryCounter();

$rows = (data_frame())
->extract(
from_dbal_limit_offset_qb(
$this->pgsqlDatabaseContext->connection(),
$this->pgsqlDatabaseContext->connection()->createQueryBuilder()
->from($table)
->select('*')
->orderBy('id', 'ASC')
->setMaxResults(5)
)->withSchema(schema(
int_schema('id'),
str_schema('name'),
map_schema('tags', type_map(type_string(), type_int()))
))
)->fetch()->toArray();

self::assertSame(1, $this->pgsqlDatabaseContext->numberOfExecutedSelectQueries());
self::assertCount(5, $rows);
self::assertSame(1, $rows[0]['id']);
}

public function test_extracting_entire_table_using_qb_with_schema() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Doctrine\DBAL\Logging\Middleware;
use Doctrine\DBAL\Tools\DsnParser;
use Doctrine\DBAL\{Configuration, DriverManager};
use Flow\ETL\Adapter\Doctrine\Tests\Context\{DatabaseContext, InsertQueryCounter};
use Flow\ETL\Adapter\Doctrine\Tests\Context\{DatabaseContext, InsertQueryCounter, SelectQueryCounter};
use Flow\ETL\Tests\FlowTestCase;

abstract class IntegrationTestCase extends FlowTestCase
Expand All @@ -16,14 +16,16 @@ abstract class IntegrationTestCase extends FlowTestCase

protected function setUp() : void
{
$logger = new InsertQueryCounter();
$insertQueryCounter = new InsertQueryCounter();
$selectQueryCounter = new SelectQueryCounter();

$this->pgsqlDatabaseContext = new DatabaseContext(
DriverManager::getConnection(
$this->connectionParams(),
(new Configuration())->setMiddlewares([new Middleware($logger)])
(new Configuration())->setMiddlewares([new Middleware($insertQueryCounter), new Middleware($selectQueryCounter)])
),
$logger
$insertQueryCounter,
$selectQueryCounter
);
}

Expand Down
2 changes: 1 addition & 1 deletion web/landing/resources/dsl.json

Large diffs are not rendered by default.

0 comments on commit acf1376

Please sign in to comment.