From 78dbc9008565bc78b239f94da10b0de3e5a1cdd1 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Thu, 20 Feb 2025 09:08:12 +0100 Subject: [PATCH] Added DataStream builder to symfony http foundation bridge (#1488) --- .../bridges/symfony-http-foundation-bridge.md | 16 ++- .../Symfony/HttpFoundation/DataStream.php | 122 ++++++++++++++++++ .../HttpFoundation/FlowStreamedResponse.php | 2 +- .../Transformation/AddRowIndex.php | 28 ++++ .../Transformation/AddRowIndex/StartFrom.php | 11 ++ .../Transformation/BatchSize.php | 34 +++++ .../Transformation/MaskColumns.php | 4 + .../Integration/FlowStreamedResponseTest.php | 34 +++-- .../Unit/Transformation/AddRowIndexTest.php | 65 ++++++++++ .../Unit/Transformation/BatchSizeTest.php | 29 +++++ src/core/etl/src/Flow/ETL/DataFrame.php | 12 +- .../etl/src/Flow/ETL}/Transformations.php | 10 +- .../ETL/Tests/Unit/TransformationsTest.php | 25 ++++ 13 files changed, 370 insertions(+), 22 deletions(-) create mode 100644 src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php create mode 100644 src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex.php create mode 100644 src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex/StartFrom.php create mode 100644 src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/BatchSize.php create mode 100644 src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/AddRowIndexTest.php create mode 100644 src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/BatchSizeTest.php rename src/{bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation => core/etl/src/Flow/ETL}/Transformations.php (75%) create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Unit/TransformationsTest.php diff --git a/documentation/components/bridges/symfony-http-foundation-bridge.md b/documentation/components/bridges/symfony-http-foundation-bridge.md index 1c0436ebe..47a81da5c 100644 --- a/documentation/components/bridges/symfony-http-foundation-bridge.md +++ b/documentation/components/bridges/symfony-http-foundation-bridge.md @@ -32,22 +32,28 @@ declare(strict_types=1); namespace Symfony\Application\Controller; -use Flow\Bridge\Symfony\HttpFoundation\FlowStreamedResponse; +use Flow\Bridge\Symfony\HttpFoundation\DataStream; use Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\Routing\Attribute\Route; use function Flow\ETL\Adapter\Parquet\ParquetEtractor; +use Flow\Bridge\Symfony\HttpFoundation\Transformation\MaskColumns; +use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex; final class ReportsController extends AbstractController { #[Route('/stream/report', name: 'stream-report')] public function streamReport() : Response { - return new FlowStreamedResponse( - new ParquetEtractor(__DIR__ . '/reports/orders.parquet'), - new CSVOutput(withHeader: true) - ); + return DataStream() + ::open(from_parquet(__DIR__ . '/reports/orders.parquet')) + ->underFilename('orders.csv') + ->transform( + new MaskColumns(['email', 'address']), + new AddRowIndex() + ) + ->to(new CSVOutput(withHeader: true)); } } ``` diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php new file mode 100644 index 000000000..6999fc463 --- /dev/null +++ b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php @@ -0,0 +1,122 @@ + + */ + private array $headers = [ + 'Cache-Control' => 'no-store, no-cache, must-revalidate, private', + 'X-Accel-Buffering' => 'no', // provides support for Nginx + 'Pragma' => 'no-cache', // Backward compatibility for HTTP/1.0 + ]; + + private ?Output $output = null; + + private int $status = Response::HTTP_OK; + + /** + * @var array + */ + private array $transformations = []; + + public function __construct(private readonly Extractor $extractor) + { + } + + public static function open(Extractor $extractor) : self + { + return new self($extractor); + } + + /** + * Send the data stream to the output. + */ + public function sendTo(Output $output) : FlowStreamedResponse + { + $this->output = $output; + + $this->headers['Content-Type'] = $this->output->type()->toContentTypeHeader(); + + return new FlowStreamedResponse( + $this->extractor, + $this->output, + \count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(), + $this->status, + $this->headers + ); + } + + /** + * Apply transformations to the data stream. + * Transformations are applied in the order they are passed. + * Transformations are applied on the fly, while streaming the data, this means + * that any resource expensive transformations like for example aggregations or sorting + * might significantly slow down the streaming process or even cause out of memory errors. + */ + public function transform(Transformation ...$transformations) : self + { + $this->transformations = $transformations; + + return $this; + } + + /** + * Set the filename for the response. + * If the attachment flag is set to true, the response will be treated as an attachment meaning that + * the browser will prompt the user to download the file. + */ + public function underFilename(string $name, bool $attachment = true) : self + { + $this->headers['Content-Disposition'] = HeaderUtils::makeDisposition( + $attachment ? HeaderUtils::DISPOSITION_ATTACHMENT : HeaderUtils::DISPOSITION_INLINE, + $name + ); + + return $this; + } + + /** + * Set additional headers. + * Headers are merged with the default headers. + */ + public function withHeaders(array $headers) : self + { + $this->headers = array_merge($this->headers, $headers); + + return $this; + } + + /** + * Remove a specific header if it exists. + * If the header does not exist, nothing happens. + */ + public function withoutHeader(string $name) : self + { + if (\array_key_exists($name, $this->headers)) { + unset($this->headers[$name]); + } + + return $this; + } + + /** + * Set the HTTP status code. Default is 200. + */ + public function withStatus(int $status) : self + { + $this->status = $status; + + return $this; + } +} diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/FlowStreamedResponse.php b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/FlowStreamedResponse.php index 091ff5ece..bf2926a73 100644 --- a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/FlowStreamedResponse.php +++ b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/FlowStreamedResponse.php @@ -5,8 +5,8 @@ namespace Flow\Bridge\Symfony\HttpFoundation; use function Flow\ETL\DSL\df; -use Flow\Bridge\Symfony\HttpFoundation\Transformation\{Transformations}; use Flow\ETL\{Config, Extractor, Transformation}; +use Flow\ETL\{Transformations}; use Symfony\Component\HttpFoundation\StreamedResponse; class FlowStreamedResponse extends StreamedResponse diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex.php b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex.php new file mode 100644 index 000000000..41a612fac --- /dev/null +++ b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex.php @@ -0,0 +1,28 @@ +startFrom === StartFrom::ZERO ? 0 : 1; + + return $dataFrame->map(function (Row $row) use (&$index) { + $row = $row->add(int_entry($this->indexColumn, $index)); + $index++; + + return $row; + }); + } +} diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex/StartFrom.php b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex/StartFrom.php new file mode 100644 index 000000000..09edf7a45 --- /dev/null +++ b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/AddRowIndex/StartFrom.php @@ -0,0 +1,11 @@ + $size + */ + public function __construct(private int $size) + { + if ($size < 1) { + throw new InvalidArgumentException('Batch size must be greater than 0'); + } + } + + public function transform(DataFrame $dataFrame) : DataFrame + { + return $dataFrame->batchSize($this->size); + } +} diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/MaskColumns.php b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/MaskColumns.php index 9de803434..d4c6f1bee 100644 --- a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/MaskColumns.php +++ b/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/MaskColumns.php @@ -7,6 +7,10 @@ use function Flow\ETL\DSL\lit; use Flow\ETL\{DataFrame, Transformation}; +/** + * Mask columns in DataFrame by replacing their values with a mask. + * If column does not exist in DataFrame, it will be added with a mask value. + */ final readonly class MaskColumns implements Transformation { public function __construct(private array $columns = [], private string $mask = '******') diff --git a/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Integration/FlowStreamedResponseTest.php b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Integration/FlowStreamedResponseTest.php index 5a0da37cc..dc401d35c 100644 --- a/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Integration/FlowStreamedResponseTest.php +++ b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Integration/FlowStreamedResponseTest.php @@ -6,7 +6,8 @@ use function Flow\ETL\Adapter\JSON\from_json; use function Flow\ETL\DSL\from_array; -use Flow\Bridge\Symfony\HttpFoundation\{FlowStreamedResponse, +use Flow\Bridge\Symfony\HttpFoundation\{DataStream, + FlowStreamedResponse, Output\CSVOutput, Output\JsonOutput, Output\XMLOutput}; @@ -37,14 +38,14 @@ public function test_streaming_array_response_to_csv() : void public function test_streaming_array_response_to_json() : void { - $response = new FlowStreamedResponse( - from_array([ - ['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'], - ['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'], - ['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'], - ]), - new JsonOutput() - ); + $extractor = from_array([ + ['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'], + ['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'], + ['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'], + ]); + + $response = DataStream::open($extractor) + ->sendTo(new JsonOutput()); self::assertEquals(<<<'JSON' [{"id":1,"size":"XL","color":"red","ean":"1234567890123"},{"id":2,"size":"M","color":"blue","ean":"1234567890124"},{"id":3,"size":"S","color":"green","ean":"1234567890125"}] @@ -100,6 +101,21 @@ public function test_streaming_partitioned_dataset() : void , $this->sendResponse($response)); } + public function test_streaming_with_disposition() : void + { + $response = DataStream::open( + from_array([ + ['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'], + ['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'], + ['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'], + ]) + ) + ->underFilename('products.csv') + ->sendTo(new CSVOutput()); + + self::assertEquals('attachment; filename=products.csv', $response->headers->get('Content-Disposition')); + } + private function sendResponse(FlowStreamedResponse $response) : string { ob_start(); diff --git a/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/AddRowIndexTest.php b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/AddRowIndexTest.php new file mode 100644 index 000000000..f23216ebf --- /dev/null +++ b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/AddRowIndexTest.php @@ -0,0 +1,65 @@ +read(from_array( + [ + ['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'], + ['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'], + ['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'], + ['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'], + ] + )) + ->with(new AddRowIndex()) + ->fetch() + ->toArray(); + + self::assertEquals( + [ + ['index' => 0, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'], + ['index' => 1, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'], + ['index' => 2, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'], + ['index' => 3, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'], + ], + $rows + ); + } + + public function test_adding_row_index_to_each_row_starting_from_1() : void + { + $rows = df() + ->read(from_array( + [ + ['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'], + ['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'], + ['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'], + ['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'], + ] + )) + ->with(new AddRowIndex(startFrom: StartFrom::ONE)) + ->fetch() + ->toArray(); + + self::assertEquals( + [ + ['index' => 1, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'], + ['index' => 2, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'], + ['index' => 3, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'], + ['index' => 4, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'], + ], + $rows + ); + } +} diff --git a/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/BatchSizeTest.php b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/BatchSizeTest.php new file mode 100644 index 000000000..bb93a1c1c --- /dev/null +++ b/src/bridge/symfony/http-foundation/tests/Flow/Bridge/Symfony/HttpFoundation/Tests/Unit/Transformation/BatchSizeTest.php @@ -0,0 +1,29 @@ +read(from_array([ + ['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'], + ['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'], + ['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'], + ['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'], + ])) + ->with(new BatchSize(2)) + ->get(); + + foreach ($rowsIterator as $rows) { + self::assertCount(2, $rows); + } + } +} diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 28ea95ca8..8a95be11f 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -148,7 +148,7 @@ public function autoCast() : self * * In order to merge all Rows into a single batch use DataFrame::collect() method or set size to -1 or 0. * - * @param int<-1, max> $size + * @param int<1, max> $size * * @lazy */ @@ -832,7 +832,7 @@ public function sortBy(Reference ...$entries) : self * * @lazy */ - public function transform(Transformer|Transformation $transformer) : self + public function transform(Transformer|Transformation|Transformations $transformer) : self { return $this->with($transformer); } @@ -879,7 +879,7 @@ public function void() : self /** * @lazy */ - public function with(Transformer|Transformation $transformer) : self + public function with(Transformer|Transformation|Transformations $transformer) : self { if ($transformer instanceof Transformer) { $this->pipeline->add($transformer); @@ -887,6 +887,12 @@ public function with(Transformer|Transformation $transformer) : self return $this; } + if ($transformer instanceof Transformations) { + $transformer->transform($this); + + return $this; + } + return $transformer->transform($this); } diff --git a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/Transformations.php b/src/core/etl/src/Flow/ETL/Transformations.php similarity index 75% rename from src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/Transformations.php rename to src/core/etl/src/Flow/ETL/Transformations.php index 9a58ed962..62bfd64d7 100644 --- a/src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/Transformations.php +++ b/src/core/etl/src/Flow/ETL/Transformations.php @@ -2,16 +2,18 @@ declare(strict_types=1); -namespace Flow\Bridge\Symfony\HttpFoundation\Transformation; - -use Flow\ETL\{DataFrame, Transformation}; +namespace Flow\ETL; +/** + * Collection of transformations. + * Transformations are applied in the order they are passed to the constructor. + */ final readonly class Transformations implements Transformation { /** * @param Transformation ...$transformations */ - private array $transformations; + public array $transformations; public function __construct(Transformation ...$transformations) { diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/TransformationsTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/TransformationsTest.php new file mode 100644 index 000000000..e56f8c490 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/TransformationsTest.php @@ -0,0 +1,25 @@ +createMock(Transformation::class); + $transformation2 = $this->createMock(Transformation::class); + + $dataFrame = df()->read(from_array([['id' => 1], ['id' => 2]])); + + $transformation1->expects(self::once())->method('transform')->with($dataFrame)->willReturn($dataFrame); + $transformation2->expects(self::once())->method('transform')->with($dataFrame)->willReturn($dataFrame); + + (new Transformations($transformation1, $transformation2))->transform($dataFrame); + } +}