Skip to content

Commit

Permalink
Added DataFrame::autoCast() (#923)
Browse files Browse the repository at this point in the history
* Added DataFrame::autoCast()

* Delay using json_validate to avoid turning scalars into jsons
  • Loading branch information
norberttech authored Jan 18, 2024
1 parent afddb71 commit 2da8585
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 68 deletions.
38 changes: 38 additions & 0 deletions examples/topics/types/csv/csv_read_with_autocast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\to_output;
use Aeon\Calendar\Stopwatch;
use Flow\ETL\Flow;
use Flow\ETL\Loader\StreamLoader\Output;

require __DIR__ . '/../../../bootstrap.php';

if (!\file_exists(__FLOW_OUTPUT__ . '/dataset.csv')) {
include __DIR__ . '/../../../setup/php_to_csv.php';
}

$flow = (new Flow())
->read(from_csv(__FLOW_OUTPUT__ . '/dataset.csv'))
->limit(1000)
->autoCast()
->collect()
->write(to_output(false, Output::rows_and_schema));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
}

$csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024);
print "Reading CSV {$csvFileSize}Mb file...\n";

$stopwatch = new Stopwatch();
$stopwatch->start();

$flow->run();

$stopwatch->stop();

print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n";
8 changes: 8 additions & 0 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Flow\ETL\Row\Reference;
use Flow\ETL\Row\References;
use Flow\ETL\Row\Schema;
use Flow\ETL\Transformer\AutoCastTransformer;
use Flow\ETL\Transformer\CallbackRowTransformer;
use Flow\ETL\Transformer\CrossJoinRowsTransformer;
use Flow\ETL\Transformer\DropDuplicatesTransformer;
Expand Down Expand Up @@ -148,6 +149,13 @@ public function appendSafe(bool $appendSafe = true) : self
return $this;
}

public function autoCast() : self
{
$this->pipeline->add(new AutoCastTransformer());

return $this;
}

/**
* Merge/Split Rows yielded by Extractor into batches of given size.
* For example, when Extractor is yielding one row at time, this method will merge them into batches of given size
Expand Down
77 changes: 9 additions & 68 deletions src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,18 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null)

if ($valueType instanceof ScalarType) {
if ($valueType->isString()) {
$trimmedValue = \trim($value);
$stringChecker = new StringTypeChecker($value);

if ('' !== $trimmedValue) {
if ($this->isJson($trimmedValue)) {
return json_entry($entryName, $value);
}
if ($stringChecker->isJson()) {
return json_entry($entryName, $value);
}

if ($this->isUuid($trimmedValue)) {
return uuid_entry($entryName, Entry\Type\Uuid::fromString($value));
}
if ($stringChecker->isUuid()) {
return uuid_entry($entryName, Entry\Type\Uuid::fromString($value));
}

if ($this->isXML($trimmedValue)) {
return xml_entry($entryName, $value);
}
if ($stringChecker->isXML()) {
return xml_entry($entryName, $value);
}

return str_entry($entryName, $value);
Expand Down Expand Up @@ -246,61 +244,4 @@ private function fromDefinition(Schema\Definition $definition, mixed $value) : E

throw new InvalidArgumentException("Can't convert value into entry \"{$definition->entry()}\"");
}

private function isJson(string $string) : bool
{
if ('{' !== $string[0] && '[' !== $string[0]) {
return false;
}

if (
(!\str_starts_with($string, '{') || !\str_ends_with($string, '}'))
&& (!\str_starts_with($string, '[') || !\str_ends_with($string, ']'))
) {
return false;
}

try {
return \is_array(\json_decode($string, true, flags: \JSON_THROW_ON_ERROR));
} catch (\Exception) {
return false;
}
}

private function isUuid(string $string) : bool
{
if (\strlen($string) !== 36) {
return false;
}

return 0 !== \preg_match(Entry\Type\Uuid::UUID_REGEXP, $string);
}

private function isXML(string $string) : bool
{
if ('<' !== $string[0]) {
return false;
}

if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $string) === 1) {
try {
\libxml_use_internal_errors(true);

$doc = new \DOMDocument();
$result = $doc->loadXML($string);
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

/** @psalm-suppress RedundantCastGivenDocblockType */
return (bool) $result;
} catch (\Exception) {
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

return false;
}
}

return false;
}
}
158 changes: 158 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Factory/StringTypeChecker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Row\Factory;

use Flow\ETL\Row\Entry\Type\Uuid;

final class StringTypeChecker
{
private readonly string $string;

public function __construct(string $string)
{
$this->string = \trim($string);
}

public function isBoolean() : bool
{
if ($this->string === '') {
return false;
}

return \in_array(\strtolower($this->string), ['true', 'false'], true);
}

public function isDateTime() : bool
{
if ($this->string === '') {
return false;
}

$dateParts = \date_parse($this->string);

if ($dateParts['error_count'] > 0) {
return false;
}

if ($dateParts['year'] === false) {
return false;
}

if ($dateParts['month'] === false) {
return false;
}

if ($dateParts['day'] === false) {
return false;
}

return true;
}

public function isFloat() : bool
{
if ($this->string === '') {
return false;
}

return \is_numeric($this->string) && \str_contains($this->string, '.');
}

public function isInteger() : bool
{
if ($this->string === '') {
return false;
}

if (\is_numeric($this->string)) {
return (string) ((int) $this->string) === $this->string;
}

return false;
}

public function isJson() : bool
{
if ($this->string === '') {
return false;
}

if ('{' !== $this->string[0] && '[' !== $this->string[0]) {
return false;
}

if (\function_exists('json_validate')) {
return \json_validate($this->string);
}

if (
(!\str_starts_with($this->string, '{') || !\str_ends_with($this->string, '}'))
&& (!\str_starts_with($this->string, '[') || !\str_ends_with($this->string, ']'))
) {
return false;
}

try {
return \is_array(\json_decode($this->string, true, flags: \JSON_THROW_ON_ERROR));
} catch (\Exception) {
return false;
}
}

public function isNull() : bool
{
return \in_array(\mb_strtolower($this->string), ['null', 'nil'], true);
}

public function isUuid() : bool
{
if ($this->string === '') {
return false;
}

if (\strlen($this->string) !== 36) {
return false;
}

return 0 !== \preg_match(Uuid::UUID_REGEXP, $this->string);
}

public function isXML() : bool
{
if ($this->string === '') {
return false;
}

if ('<' !== $this->string[0]) {
return false;
}

if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $this->string) === 1) {
try {
\libxml_use_internal_errors(true);

$doc = new \DOMDocument();
$result = $doc->loadXML($this->string);
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

/** @psalm-suppress RedundantCastGivenDocblockType */
return (bool) $result;
} catch (\Exception) {
\libxml_clear_errors(); // Clear any errors if needed
\libxml_use_internal_errors(false); // Restore standard error handling

return false;
}
}

return false;
}

public function value() : string
{
return $this->string;
}
}
70 changes: 70 additions & 0 deletions src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Transformer;

use function Flow\ETL\DSL\bool_entry;
use function Flow\ETL\DSL\datetime_entry;
use function Flow\ETL\DSL\float_entry;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\json_entry;
use function Flow\ETL\DSL\null_entry;
use function Flow\ETL\DSL\uuid_entry;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use Flow\ETL\Row\Entry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\Rows;
use Flow\ETL\Transformer;

final class AutoCastTransformer implements Transformer
{
public function autoCast(Entry $entry) : Entry
{
if (!$entry instanceof StringEntry) {
return $entry;
}

$typeChecker = new Row\Factory\StringTypeChecker($entry->value());

if ($typeChecker->isNull()) {
return null_entry($entry->name());
}

if ($typeChecker->isInteger()) {
return int_entry($entry->name(), (int) $entry->value());
}

if ($typeChecker->isFloat()) {
return float_entry($entry->name(), (float) $entry->value());
}

if ($typeChecker->isBoolean()) {
return bool_entry($entry->name(), (bool) $entry->value());
}

if ($typeChecker->isJson()) {
return json_entry($entry->name(), $entry->value());
}

if ($typeChecker->isUuid()) {
return uuid_entry($entry->name(), $entry->value());
}

if ($typeChecker->isDateTime()) {
return datetime_entry($entry->name(), $entry->value());
}

return $entry;
}

public function transform(Rows $rows, FlowContext $context) : Rows
{
return $rows->map(function (Row $row) {
return $row->map(function (Entry $entry) {
return $this->autoCast($entry);
});
});
}
}
Loading

0 comments on commit 2da8585

Please sign in to comment.