diff --git a/examples/topics/types/csv/csv_read_with_autocast.php b/examples/topics/types/csv/csv_read_with_autocast.php new file mode 100644 index 000000000..b94e873c1 --- /dev/null +++ b/examples/topics/types/csv/csv_read_with_autocast.php @@ -0,0 +1,38 @@ +read(from_csv(__FLOW_OUTPUT__ . '/dataset.csv')) + ->limit(1000) + ->autoCast() + ->collect() + ->write(to_output(false, Output::rows_and_schema)); + +if ($_ENV['FLOW_PHAR_APP'] ?? false) { + return $flow; +} + +$csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/dataset.csv') / 1024 / 1024); +print "Reading CSV {$csvFileSize}Mb file...\n"; + +$stopwatch = new Stopwatch(); +$stopwatch->start(); + +$flow->run(); + +$stopwatch->stop(); + +print "Total elapsed time: {$stopwatch->totalElapsedTime()->inSecondsPrecise()}s\n"; diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 0895e3e4f..b23bc9d00 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -31,6 +31,7 @@ use Flow\ETL\Row\Reference; use Flow\ETL\Row\References; use Flow\ETL\Row\Schema; +use Flow\ETL\Transformer\AutoCastTransformer; use Flow\ETL\Transformer\CallbackRowTransformer; use Flow\ETL\Transformer\CrossJoinRowsTransformer; use Flow\ETL\Transformer\DropDuplicatesTransformer; @@ -148,6 +149,13 @@ public function appendSafe(bool $appendSafe = true) : self return $this; } + public function autoCast() : self + { + $this->pipeline->add(new AutoCastTransformer()); + + return $this; + } + /** * Merge/Split Rows yielded by Extractor into batches of given size. * For example, when Extractor is yielding one row at time, this method will merge them into batches of given size diff --git a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php index e1582df95..c50b1018a 100644 --- a/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php +++ b/src/core/etl/src/Flow/ETL/Row/Factory/NativeEntryFactory.php @@ -58,20 +58,18 @@ public function create(string $entryName, mixed $value, ?Schema $schema = null) if ($valueType instanceof ScalarType) { if ($valueType->isString()) { - $trimmedValue = \trim($value); + $stringChecker = new StringTypeChecker($value); - if ('' !== $trimmedValue) { - if ($this->isJson($trimmedValue)) { - return json_entry($entryName, $value); - } + if ($stringChecker->isJson()) { + return json_entry($entryName, $value); + } - if ($this->isUuid($trimmedValue)) { - return uuid_entry($entryName, Entry\Type\Uuid::fromString($value)); - } + if ($stringChecker->isUuid()) { + return uuid_entry($entryName, Entry\Type\Uuid::fromString($value)); + } - if ($this->isXML($trimmedValue)) { - return xml_entry($entryName, $value); - } + if ($stringChecker->isXML()) { + return xml_entry($entryName, $value); } return str_entry($entryName, $value); @@ -246,61 +244,4 @@ private function fromDefinition(Schema\Definition $definition, mixed $value) : E throw new InvalidArgumentException("Can't convert value into entry \"{$definition->entry()}\""); } - - private function isJson(string $string) : bool - { - if ('{' !== $string[0] && '[' !== $string[0]) { - return false; - } - - if ( - (!\str_starts_with($string, '{') || !\str_ends_with($string, '}')) - && (!\str_starts_with($string, '[') || !\str_ends_with($string, ']')) - ) { - return false; - } - - try { - return \is_array(\json_decode($string, true, flags: \JSON_THROW_ON_ERROR)); - } catch (\Exception) { - return false; - } - } - - private function isUuid(string $string) : bool - { - if (\strlen($string) !== 36) { - return false; - } - - return 0 !== \preg_match(Entry\Type\Uuid::UUID_REGEXP, $string); - } - - private function isXML(string $string) : bool - { - if ('<' !== $string[0]) { - return false; - } - - if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $string) === 1) { - try { - \libxml_use_internal_errors(true); - - $doc = new \DOMDocument(); - $result = $doc->loadXML($string); - \libxml_clear_errors(); // Clear any errors if needed - \libxml_use_internal_errors(false); // Restore standard error handling - - /** @psalm-suppress RedundantCastGivenDocblockType */ - return (bool) $result; - } catch (\Exception) { - \libxml_clear_errors(); // Clear any errors if needed - \libxml_use_internal_errors(false); // Restore standard error handling - - return false; - } - } - - return false; - } } diff --git a/src/core/etl/src/Flow/ETL/Row/Factory/StringTypeChecker.php b/src/core/etl/src/Flow/ETL/Row/Factory/StringTypeChecker.php new file mode 100644 index 000000000..9e3c1543b --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Row/Factory/StringTypeChecker.php @@ -0,0 +1,158 @@ +string = \trim($string); + } + + public function isBoolean() : bool + { + if ($this->string === '') { + return false; + } + + return \in_array(\strtolower($this->string), ['true', 'false'], true); + } + + public function isDateTime() : bool + { + if ($this->string === '') { + return false; + } + + $dateParts = \date_parse($this->string); + + if ($dateParts['error_count'] > 0) { + return false; + } + + if ($dateParts['year'] === false) { + return false; + } + + if ($dateParts['month'] === false) { + return false; + } + + if ($dateParts['day'] === false) { + return false; + } + + return true; + } + + public function isFloat() : bool + { + if ($this->string === '') { + return false; + } + + return \is_numeric($this->string) && \str_contains($this->string, '.'); + } + + public function isInteger() : bool + { + if ($this->string === '') { + return false; + } + + if (\is_numeric($this->string)) { + return (string) ((int) $this->string) === $this->string; + } + + return false; + } + + public function isJson() : bool + { + if ($this->string === '') { + return false; + } + + if ('{' !== $this->string[0] && '[' !== $this->string[0]) { + return false; + } + + if (\function_exists('json_validate')) { + return \json_validate($this->string); + } + + if ( + (!\str_starts_with($this->string, '{') || !\str_ends_with($this->string, '}')) + && (!\str_starts_with($this->string, '[') || !\str_ends_with($this->string, ']')) + ) { + return false; + } + + try { + return \is_array(\json_decode($this->string, true, flags: \JSON_THROW_ON_ERROR)); + } catch (\Exception) { + return false; + } + } + + public function isNull() : bool + { + return \in_array(\mb_strtolower($this->string), ['null', 'nil'], true); + } + + public function isUuid() : bool + { + if ($this->string === '') { + return false; + } + + if (\strlen($this->string) !== 36) { + return false; + } + + return 0 !== \preg_match(Uuid::UUID_REGEXP, $this->string); + } + + public function isXML() : bool + { + if ($this->string === '') { + return false; + } + + if ('<' !== $this->string[0]) { + return false; + } + + if (\preg_match('/<(.+?)>(.+?)<\/(.+?)>/', $this->string) === 1) { + try { + \libxml_use_internal_errors(true); + + $doc = new \DOMDocument(); + $result = $doc->loadXML($this->string); + \libxml_clear_errors(); // Clear any errors if needed + \libxml_use_internal_errors(false); // Restore standard error handling + + /** @psalm-suppress RedundantCastGivenDocblockType */ + return (bool) $result; + } catch (\Exception) { + \libxml_clear_errors(); // Clear any errors if needed + \libxml_use_internal_errors(false); // Restore standard error handling + + return false; + } + } + + return false; + } + + public function value() : string + { + return $this->string; + } +} diff --git a/src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php b/src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php new file mode 100644 index 000000000..10d8d7fcd --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Transformer/AutoCastTransformer.php @@ -0,0 +1,70 @@ +value()); + + if ($typeChecker->isNull()) { + return null_entry($entry->name()); + } + + if ($typeChecker->isInteger()) { + return int_entry($entry->name(), (int) $entry->value()); + } + + if ($typeChecker->isFloat()) { + return float_entry($entry->name(), (float) $entry->value()); + } + + if ($typeChecker->isBoolean()) { + return bool_entry($entry->name(), (bool) $entry->value()); + } + + if ($typeChecker->isJson()) { + return json_entry($entry->name(), $entry->value()); + } + + if ($typeChecker->isUuid()) { + return uuid_entry($entry->name(), $entry->value()); + } + + if ($typeChecker->isDateTime()) { + return datetime_entry($entry->name(), $entry->value()); + } + + return $entry; + } + + public function transform(Rows $rows, FlowContext $context) : Rows + { + return $rows->map(function (Row $row) { + return $row->map(function (Entry $entry) { + return $this->autoCast($entry); + }); + }); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Factory/StringTypeCheckerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Factory/StringTypeCheckerTest.php new file mode 100644 index 000000000..0f02050e1 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Factory/StringTypeCheckerTest.php @@ -0,0 +1,88 @@ +assertTrue((new StringTypeChecker('true'))->isBoolean()); + $this->assertTrue((new StringTypeChecker('false'))->isBoolean()); + $this->assertFalse((new StringTypeChecker('0'))->isBoolean()); + $this->assertFalse((new StringTypeChecker('not bool'))->isBoolean()); + } + + public function test_detecting_date_time() : void + { + $this->assertFalse((new StringTypeChecker('not date time'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('2021-13-01'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('now'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('midnight'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('today'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('yesterday'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('tomorrow'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('+24h'))->isDateTime()); + $this->assertFalse((new StringTypeChecker('00:00:00'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('2023-01-01 +10 hours'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('Thursday, 02-Jun-2022 16:58:35 UTC'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('2022-06-02T16:58:35+0000'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('2022-06-02T16:58:35+00:00'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('Thu, 02 Jun 22 16:58:35 +0000'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('Thursday, 02-Jun-22 16:58:35 UTC'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('Thu, 02 Jun 22 16:58:35 +0000'))->isDateTime()); + $this->assertTrue((new StringTypeChecker('Thu, 02 Jun 2022 16:58:35 +0000'))->isDateTime()); + } + + public function test_detecting_float() : void + { + $this->assertTrue((new StringTypeChecker('1.0'))->isFloat()); + $this->assertTrue((new StringTypeChecker('0.0'))->isFloat()); + $this->assertFalse((new StringTypeChecker('not float'))->isFloat()); + $this->assertFalse((new StringTypeChecker('1'))->isFloat()); + $this->assertFalse((new StringTypeChecker('1.0.0'))->isFloat()); + } + + public function test_detecting_integer() : void + { + $this->assertTrue((new StringTypeChecker('1'))->isInteger()); + $this->assertTrue((new StringTypeChecker('0'))->isInteger()); + $this->assertFalse((new StringTypeChecker('not integer'))->isInteger()); + $this->assertFalse((new StringTypeChecker('1.0'))->isInteger()); + $this->assertTrue((new StringTypeChecker('112312312'))->isInteger()); + $this->assertFalse((new StringTypeChecker('11_2312_312'))->isInteger()); + } + + public function test_detecting_json() : void + { + $this->assertTrue((new StringTypeChecker('{"foo":"bar"}'))->isJson()); + $this->assertTrue((new StringTypeChecker('[{"foo":"bar"}]'))->isJson()); + $this->assertFalse((new StringTypeChecker('not json'))->isJson()); + } + + public function test_detecting_null() : void + { + $this->assertTrue((new StringTypeChecker('null'))->isNull()); + $this->assertTrue((new StringTypeChecker('NULL'))->isNull()); + $this->assertTrue((new StringTypeChecker('Nil'))->isNull()); + $this->assertTrue((new StringTypeChecker('nil'))->isNull()); + $this->assertFalse((new StringTypeChecker('not null'))->isNull()); + $this->assertFalse((new StringTypeChecker(''))->isNull()); + } + + public function test_detecting_uuid() : void + { + $this->assertTrue((new StringTypeChecker('f47ac10b-58cc-4372-a567-0e02b2c3d479'))->isUuid()); + $this->assertFalse((new StringTypeChecker('not uuid'))->isUuid()); + } + + public function test_detecting_xml() : void + { + $this->assertTrue((new StringTypeChecker('bar'))->isXML()); + $this->assertFalse((new StringTypeChecker('not xml'))->isXML()); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/AutoCastTransformerTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/AutoCastTransformerTest.php new file mode 100644 index 000000000..1830c52da --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Transformer/AutoCastTransformerTest.php @@ -0,0 +1,45 @@ + '1', + 'float' => '1.0', + 'boolean' => 'true', + 'json' => '{"foo":"bar"}', + 'datetime' => '2021-01-01 00:00:00', + 'null' => 'null', + 'nil' => 'nil', + ], + ]); + + $this->assertEquals( + [ + [ + 'integer' => 1, + 'float' => 1.0, + 'boolean' => true, + 'json' => '{"foo":"bar"}', + 'datetime' => new \DateTimeImmutable('2021-01-01 00:00:00'), + 'null' => null, + 'nil' => null, + ], + ], + $transformer->transform($rows, flow_context())->toArray() + ); + } +}