Skip to content

Commit

Permalink
Simplified filesystem streams (#993)
Browse files Browse the repository at this point in the history
* Simplified filesystem streams

* Updated UPGRADE.md

* Updated dependencies

* Fixed failing examples
  • Loading branch information
norberttech authored Feb 18, 2024
1 parent bee0204 commit d5ea7d4
Show file tree
Hide file tree
Showing 68 changed files with 1,527 additions and 1,713 deletions.
6 changes: 6 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ Please follow the instructions for your specific version to ensure a smooth upgr

---

## Upgrading from 0.6.x to 0.7.x

### 1) DataFrame::appendSafe() method was removed

`DataFrame::appendSafe()` aka `DataFrame::threadSafe()` method was removed as it was introducing additional complexity and was not used in any of the adapters.

## Upgrading from 0.5.x to 0.6.x

### 1) Rows::merge() accepts single instance of Rows
Expand Down
58 changes: 29 additions & 29 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
File renamed without changes.
30 changes: 30 additions & 0 deletions examples/topics/data_frame/overwrite/code.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\overwrite;

require __DIR__ . '/../../../autoload.php';

df()
->read(from_csv(__DIR__ . '/input/file.csv'))
->saveMode(overwrite())
->write(to_csv(__DIR__ . '/output/file.csv'))
->run();

df()
->read(from_csv(__DIR__ . '/output/file.csv'))
->saveMode(overwrite())
->drop('name')
->write(to_csv(__DIR__ . '/output/file.csv'))
->run();

// content of /output/file.csv:
// id
// 1
// 2
// 3
// 4
5 changes: 5 additions & 0 deletions examples/topics/data_frame/overwrite/input/file.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name
1,"John Doe"
2,"Jane Doe"
3,"Tom Doe"
4,"Jerry Doe"
2 changes: 2 additions & 0 deletions examples/topics/data_frame/overwrite/output/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
18 changes: 9 additions & 9 deletions examples/topics/partitioning/partitioning/code.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@
]
))
->partitionBy(ref('color'), ref('sku'))
->write(to_csv(__DIR__ . '/output')) // do not provider extension, partitions are anyway written to separate folders
->write(to_csv(__DIR__ . '/output/products.csv'))
->run();

// output
// ├── color=blue
// │ ├── sku=PRODUCT01
// │ │ └── 65c7e9bc4460a568233195.csv
// │ │ └── products.csv
// │ └── sku=PRODUCT02
// │ └── 65c7e9bc446c2326068326.csv
// │ └── products.csv
// ├── color=green
// │ ├── sku=PRODUCT01
// │ │ └── 65c7e9bc44305321518126.csv
// │ │ └── products.csv
// │ ├── sku=PRODUCT02
// │ │ └── 65c7e9bc44421020940545.csv
// │ │ └── products.csv
// │ └── sku=PRODUCT03
// │ └── 65c7e9bc44515031584752.csv
// │ └── products.csv
// └── color=red
// ├── sku=PRODUCT01
// │ └── 65c7e9bc4386f958078278.csv
// │ └── products.csv
// ├── sku=PRODUCT02
// │ └── 65c7e9bc440fa083889144.csv
// │ └── products.csv
// └── sku=PRODUCT03
// └── 65c7e9bc44209401416287.csv
// └── products.csv
//
// 12 directories, 8 files
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use Flow\ETL\Extractor\PartitionsExtractor;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\Filesystem\Path;
use Flow\ETL\Filesystem\Stream\Mode;
use Flow\ETL\FlowContext;

final class AvroExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionsExtractor
Expand All @@ -31,14 +30,9 @@ public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $this->partitionFilter()) as $filePath) {
foreach ($context->streams()->scan($this->path, $this->partitionFilter()) as $stream) {
$reader = new \AvroDataIOReader(
new AvroResource(
$context->streams()->fs()->open(
$filePath,
Mode::READ_BINARY
)->resource()
),
new AvroResource($stream->resource()),
new \AvroIODatumReader(null, null),
);

Expand All @@ -48,21 +42,21 @@ public function extract(FlowContext $context) : \Generator
$row = $valueConverter->convert($rowData);

if ($shouldPutInputIntoRows) {
$row['_input_file_uri'] = $filePath->uri();
$row['_input_file_uri'] = $stream->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $filePath->partitions());
$signal = yield array_to_rows($row, $context->entryFactory(), $stream->path()->partitions());
$this->countRow();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->close($this->path);
$context->streams()->closeWriters($this->path);

return;
}
}
}

$context->streams()->close($this->path);
$stream->close();
}
}

public function source() : Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function closure(FlowContext $context) : void
$this->writer($context)->close();
}

$context->streams()->close($this->path);
$context->streams()->closeWriters($this->path);
$this->writer = null;
}

Expand Down Expand Up @@ -127,13 +127,7 @@ private function writer(FlowContext $context) : \AvroDataIOWriter
$schema = \AvroSchema::parse($this->schema());

$this->writer = new \AvroDataIOWriter(
new AvroResource(
$context->streams()->open(
$this->path,
'avro',
$context->appendSafe()
)->resource()
),
new AvroResource($context->streams()->writeTo($this->path)->resource()),
new \AvroIODatumWriter($schema),
$schema,
'null'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Adapter\Avro\FlixTech;

use function Flow\ETL\DSL\type_string;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
Expand Down Expand Up @@ -68,6 +69,10 @@ private function convert(Definition $definition) : array
}

if ($type instanceof MapType) {
if (!$type->key()->isEqual(type_string())) {
throw new RuntimeException('Map key can be only string, ' . $type->key()->toString() . ' is not supported.');
}

return match ($type->value()->type()->toString()) {
ScalarType::STRING => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::STRING_TYPE]],
ScalarType::INTEGER => ['name' => $definition->entry()->name(), 'type' => ['type' => 'map', 'values' => \AvroSchema::INT_TYPE]],
Expand Down
Loading

0 comments on commit d5ea7d4

Please sign in to comment.