Skip to content

Commit

Permalink
Fixed schema inferring when first rows are null (#1274)
Browse files Browse the repository at this point in the history
* Fixed schema infering when first rows are null

* Static analysis

* fixed failing tests

* Regenerated DSL functions definition for documentation
  • Loading branch information
norberttech authored Dec 18, 2024
1 parent 2c5407f commit e35478a
Show file tree
Hide file tree
Showing 55 changed files with 516 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\Elasticsearch\EntryIdFactory;

use function Flow\ETL\DSL\string_entry;
use Flow\ETL\Adapter\Elasticsearch\IdFactory;
use Flow\ETL\Hash\{Algorithm, NativePHPHash};
use Flow\ETL\Row;
Expand All @@ -26,7 +27,7 @@ public function __construct(string ...$entryNames)

public function create(Row $row) : Entry
{
return new Entry\StringEntry(
return string_entry(
'id',
$this->hashAlgorithm->hash(
\implode(':', \array_map(fn (string $name) : string => (string) $row->valueOf($name), $this->entryNames))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\ETL\Adapter\Elasticsearch\Tests\Integration\ElasticsearchPHP;

use function Flow\ETL\Adapter\Elasticsearch\{es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{df, generate_random_int};
use function Flow\ETL\DSL\{bool_entry, df, generate_random_int, int_entry, string_entry};
use Flow\ETL\Adapter\Elasticsearch\ElasticsearchPHP\DocumentDataSource;
use Flow\ETL\Adapter\Elasticsearch\EntryIdFactory\EntryIdFactory;
use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase;
Expand Down Expand Up @@ -37,10 +37,10 @@ public function test_empty_extraction() : void
$loader->load(new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', (bool) generate_random_int(0, 1))
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', (bool) generate_random_int(0, 1))
),
\range(1, 100)
),
Expand Down Expand Up @@ -77,10 +77,10 @@ public function test_extraction_index_with_from_and_size() : void
$loader->load(new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', (bool) generate_random_int(0, 1))
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', (bool) generate_random_int(0, 1))
),
\range(1, 2000)
),
Expand Down Expand Up @@ -120,10 +120,10 @@ public function test_extraction_index_with_search_after() : void
$loader->load(new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', (bool) generate_random_int(0, 1))
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', (bool) generate_random_int(0, 1))
),
\range(1, 2005)
),
Expand Down Expand Up @@ -156,10 +156,10 @@ public function test_extraction_index_with_search_after_with_point_in_time() : v
$loader->load(new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', (bool) generate_random_int(0, 1))
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', (bool) generate_random_int(0, 1))
),
\range(1, 2005)
),
Expand Down Expand Up @@ -197,10 +197,10 @@ public function test_extraction_whole_index_with_point_in_time() : void
$loader->load(new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', (bool) generate_random_int(0, 1))
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', (bool) generate_random_int(0, 1))
),
\range(1, 2005)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\ETL\Adapter\Elasticsearch\Tests\Integration\ElasticsearchPHP;

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, hash_id_factory, to_es_bulk_index, to_es_bulk_update};
use function Flow\ETL\DSL\generate_random_string;
use function Flow\ETL\DSL\{generate_random_string, string_entry};
use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase;
use Flow\ETL\{Config, FlowContext, Row, Rows};

Expand Down Expand Up @@ -53,20 +53,20 @@ public function test_integration_with_entry_factory() : void

$loader->load(new Rows(
Row::create(
new Row\Entry\StringEntry('id', \sha1('id' . generate_random_string())),
new Row\Entry\StringEntry('name', 'Łukasz')
string_entry('id', \sha1('id' . generate_random_string())),
string_entry('name', 'Łukasz')
),
Row::create(
new Row\Entry\StringEntry('id', \sha1('id' . generate_random_string())),
new Row\Entry\StringEntry('name', 'Norbert')
string_entry('id', \sha1('id' . generate_random_string())),
string_entry('name', 'Norbert')
),
Row::create(
new Row\Entry\StringEntry('id', \sha1('id' . generate_random_string())),
new Row\Entry\StringEntry('name', 'Dawid')
string_entry('id', \sha1('id' . generate_random_string())),
string_entry('name', 'Dawid')
),
Row::create(
new Row\Entry\StringEntry('id', \sha1('id' . generate_random_string())),
new Row\Entry\StringEntry('name', 'Tomek')
string_entry('id', \sha1('id' . generate_random_string())),
string_entry('name', 'Tomek')
),
), new FlowContext(Config::default()));

Expand Down Expand Up @@ -125,8 +125,8 @@ public function test_integration_with_partial_update_id_factory() : void
$insertLoader->load(new Rows(
Row::create(
new Row\Entry\IntegerEntry('id', 1),
new Row\Entry\StringEntry('name', 'Some Name'),
new Row\Entry\StringEntry('status', 'NEW'),
string_entry('name', 'Some Name'),
string_entry('status', 'NEW'),
new Row\Entry\DateTimeEntry('updated_at', new \DateTimeImmutable('2022-01-01 00:00:00'))
),
), new FlowContext(Config::default()));
Expand All @@ -136,7 +136,7 @@ public function test_integration_with_partial_update_id_factory() : void
$updateLoader->load(new Rows(
Row::create(
new Row\Entry\IntegerEntry('id', 1),
new Row\Entry\StringEntry('name', 'Other Name'),
string_entry('name', 'Other Name'),
),
), new FlowContext(Config::default()));

Expand Down Expand Up @@ -179,19 +179,19 @@ public function test_integration_with_sha1_id_factory() : void
$loader->load(new Rows(
Row::create(
new Row\Entry\IntegerEntry('id', 1),
new Row\Entry\StringEntry('name', 'Łukasz')
string_entry('name', 'Łukasz')
),
Row::create(
new Row\Entry\IntegerEntry('id', 2),
new Row\Entry\StringEntry('name', 'Norbert')
string_entry('name', 'Norbert')
),
Row::create(
new Row\Entry\IntegerEntry('id', 3),
new Row\Entry\StringEntry('name', 'Dawid')
string_entry('name', 'Dawid')
),
Row::create(
new Row\Entry\IntegerEntry('id', 4),
new Row\Entry\StringEntry('name', 'Tomek')
string_entry('name', 'Tomek')
),
), new FlowContext(Config::default()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\ETL\Adapter\Elasticsearch\Tests\Integration\ElasticsearchPHP;

use function Flow\ETL\Adapter\Elasticsearch\{es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{bool_entry, int_entry, string_entry};
use Flow\ETL\Adapter\Elasticsearch\EntryIdFactory\EntryIdFactory;
use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase;
use Flow\ETL\{Flow, Row, Rows};
Expand Down Expand Up @@ -39,10 +40,10 @@ public function test_loading_and_extraction_with_limit_and_transformation() : vo
new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', false)
string_entry('id', \sha1((string) $i)),
int_entry('position', $i),
string_entry('name', 'id_' . $i),
bool_entry('active', false)
),
\range(1, 2005)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Adapter\Elasticsearch\Tests\Unit\EntryIdFactory;

use function Flow\ETL\DSL\str_entry;
use function Flow\ETL\DSL\{str_entry, string_entry};
use Flow\ETL\Adapter\Elasticsearch\EntryIdFactory\HashIdFactory;
use Flow\ETL\Hash\NativePHPHash;
use Flow\ETL\Row;
Expand All @@ -17,7 +17,7 @@ public function test_create_row() : void
$factory = new HashIdFactory('first_name', 'last_name');

self::assertEquals(
new Row\Entry\StringEntry(
string_entry(
'id',
\hash('xxh128', 'John:Doe')
),
Expand All @@ -32,7 +32,7 @@ public function test_create_row_with_different_hash() : void
$factory = (new HashIdFactory('first_name', 'last_name'))->withAlgorithm(new NativePHPHash('sha1'));

self::assertEquals(
new Row\Entry\StringEntry(
string_entry(
'id',
\sha1('John:Doe')
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
namespace Flow\ETL\Adapter\GoogleSheet\Tests\Unit;

use function Flow\ETL\Adapter\GoogleSheet\from_google_sheet_columns;
use function Flow\ETL\DSL\str_entry;
use function Flow\ETL\DSL\{str_entry, string_entry};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\{Config\ConfigBuilder, FlowContext, Row, Rows};
use Google\Service\Sheets;
use Google\Service\Sheets\Resource\SpreadsheetsValues;
Expand All @@ -26,8 +25,8 @@ public function test_its_stop_fetching_data_if_processed_row_count_is_less_then_
true,
2,
);
$spreadSheetIdEntry = new StringEntry('_spread_sheet_id', $spreadSheetId);
$sheetNameEntry = new StringEntry('_sheet_name', $sheetName);
$spreadSheetIdEntry = string_entry('_spread_sheet_id', $spreadSheetId);
$sheetNameEntry = string_entry('_sheet_name', $sheetName);
$firstValueRangeMock = $this->createMock(Sheets\ValueRange::class);
$firstValueRangeMock->method('getValues')->willReturn([
['header'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\Http;

use function Flow\ETL\DSL\string_entry;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;
use Psr\Http\Message\RequestInterface;
Expand Down Expand Up @@ -38,7 +39,7 @@ public function create(RequestInterface $request) : Row\Entries
}
}

$requestBodyEntry = new Row\Entry\StringEntry('request_body', null);
$requestBodyEntry = string_entry('request_body', null);
$requestBody = $request->getBody();

if ($requestBody->isReadable()) {
Expand All @@ -58,13 +59,13 @@ public function create(RequestInterface $request) : Row\Entries
if (\class_exists(Row\Entry\JsonEntry::class)) {
$requestBodyEntry = new Row\Entry\JsonEntry('request_body', (array) \json_decode($requestBodyContent, true, 512, JSON_THROW_ON_ERROR));
} else {
$requestBodyEntry = new Row\Entry\StringEntry('request_body', $requestBodyContent);
$requestBodyEntry = string_entry('request_body', $requestBodyContent);
}

break;

default:
$requestBodyEntry = new Row\Entry\StringEntry('request_body', $requestBodyContent);
$requestBodyEntry = string_entry('request_body', $requestBodyContent);

break;
}
Expand All @@ -73,10 +74,10 @@ public function create(RequestInterface $request) : Row\Entries

return new Row\Entries(
$requestBodyEntry,
new Row\Entry\StringEntry('request_uri', (string) $request->getUri()),
string_entry('request_uri', (string) $request->getUri()),
new Row\Entry\JsonEntry('request_headers', $request->getHeaders()),
new Row\Entry\StringEntry('request_protocol_version', $request->getProtocolVersion()),
new Row\Entry\StringEntry('request_method', $request->getMethod()),
string_entry('request_protocol_version', $request->getProtocolVersion()),
string_entry('request_method', $request->getMethod()),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\Http;

use function Flow\ETL\DSL\string_entry;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Row;
use Psr\Http\Message\ResponseInterface;
Expand Down Expand Up @@ -48,26 +49,26 @@ public function create(ResponseInterface $response) : Row\Entries
if (\class_exists(Row\Entry\JsonEntry::class)) {
$responseBodyEntry = new Row\Entry\JsonEntry('response_body', (array) \json_decode($responseBodyContent, true, 512, JSON_THROW_ON_ERROR));
} else {
$responseBodyEntry = new Row\Entry\StringEntry('response_body', $responseBodyContent);
$responseBodyEntry = string_entry('response_body', $responseBodyContent);
}

break;

default:
$responseBodyEntry = new Row\Entry\StringEntry('response_body', $responseBodyContent);
$responseBodyEntry = string_entry('response_body', $responseBodyContent);

break;
}
} else {
$responseBodyEntry = new Row\Entry\StringEntry('response_body', null);
$responseBodyEntry = string_entry('response_body', null);
}

return new Row\Entries(
$responseBodyEntry,
new Row\Entry\JsonEntry('response_headers', $response->getHeaders()),
new Row\Entry\IntegerEntry('response_status_code', $response->getStatusCode()),
new Row\Entry\StringEntry('response_protocol_version', $response->getProtocolVersion()),
new Row\Entry\StringEntry('response_reason_phrase', $response->getReasonPhrase()),
string_entry('response_protocol_version', $response->getProtocolVersion()),
string_entry('response_reason_phrase', $response->getReasonPhrase()),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\Logger\Tests\Unit;

use function Flow\ETL\DSL\{int_entry, string_entry};
use Flow\ETL\Adapter\Logger\PsrLoggerLoader;
use Flow\ETL\{Config, FlowContext, Row, Rows};
use PHPUnit\Framework\TestCase;
Expand All @@ -20,8 +21,8 @@ public function test_psr_logger_loader() : void

$loader->load(new Rows(
Row::create(
new Row\Entry\IntegerEntry('id', 12345),
Row\Entry\StringEntry::lowercase('name', 'Norbert')
int_entry('id', 12345),
string_entry('name', 'Norbert')->toLowercase()
)
), new FlowContext(Config::default()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\ETL\Adapter\Meilisearch\Tests\Integration\MeilisearchPHP;

use function Flow\ETL\Adapter\Meilisearch\{from_meilisearch, meilisearch_hits_to_rows, to_meilisearch_bulk_index};
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\{from_array, string_entry};
use Flow\ETL\Adapter\Meilisearch\Tests\Context\MeilisearchContext;
use Flow\ETL\Adapter\Meilisearch\Tests\Double\Spy\HttpClientSpy;
use Flow\ETL\{Flow, Row, Rows};
Expand Down Expand Up @@ -70,9 +70,9 @@ public function test_loading_and_extraction_with_limit_and_transformation() : vo
new Rows(
...\array_map(
static fn (int $i) : Row => Row::create(
new Row\Entry\StringEntry('id', \sha1((string) $i)),
string_entry('id', \sha1((string) $i)),
new Row\Entry\IntegerEntry('position', $i),
new Row\Entry\StringEntry('name', 'id_' . $i),
string_entry('name', 'id_' . $i),
new Row\Entry\BooleanEntry('active', false)
),
\range(1, 500)
Expand Down
Loading

0 comments on commit e35478a

Please sign in to comment.