Skip to content

Commit

Permalink
Added DataStream builder to symfony http foundation bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Feb 20, 2025
1 parent 95607ef commit c4005ed
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 22 deletions.
16 changes: 11 additions & 5 deletions documentation/components/bridges/symfony-http-foundation-bridge.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,28 @@ declare(strict_types=1);

namespace Symfony\Application\Controller;

use Flow\Bridge\Symfony\HttpFoundation\FlowStreamedResponse;
use Flow\Bridge\Symfony\HttpFoundation\DataStream;
use Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Attribute\Route;
use function Flow\ETL\Adapter\Parquet\ParquetEtractor;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\MaskColumns;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;

final class ReportsController extends AbstractController
{
#[Route('/stream/report', name: 'stream-report')]
public function streamReport() : Response
{
return new FlowStreamedResponse(
new ParquetEtractor(__DIR__ . '/reports/orders.parquet'),
new CSVOutput(withHeader: true)
);
return DataStream()
::open(from_parquet(__DIR__ . '/reports/orders.parquet'))
->underFilename('orders.csv')
->transform(
new MaskColumns(['email', 'address']),
new AddRowIndex()
)
->to(new CSVOutput(withHeader: true));
}
}
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation;

use Flow\ETL\{Extractor, Transformation, Transformations};
use Symfony\Component\HttpFoundation\{HeaderUtils, Response};

/**
* FlowStreamedResponse builder.
*/
final class DataStream
{
/**
* @var array<string, string>
*/
private array $headers = [
'Cache-Control' => 'no-store, no-cache, must-revalidate, private',
'X-Accel-Buffering' => 'no', // provides support for Nginx
'Pragma' => 'no-cache', // Backward compatibility for HTTP/1.0
];

private ?Output $output = null;

private int $status = Response::HTTP_OK;

/**
* @var array<Transformation>
*/
private array $transformations = [];

public function __construct(private readonly Extractor $extractor)
{
}

public static function open(Extractor $extractor) : self
{
return new self($extractor);
}

/**
* Send the data stream to the output.
*/
public function sendTo(Output $output) : FlowStreamedResponse
{
$this->output = $output;

$this->headers['Content-Type'] = $this->output->type()->toContentTypeHeader();

return new FlowStreamedResponse(
$this->extractor,
$this->output,
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
$this->status,
$this->headers
);
}

/**
* Apply transformations to the data stream.
* Transformations are applied in the order they are passed.
* Transformations are applied on the fly, while streaming the data, this means
* that any resource expensive transformations like for example aggregations or sorting
* might significantly slow down the streaming process or even cause out of memory errors.
*/
public function transform(Transformation ...$transformations) : self

Check warning on line 67 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L67

Added line #L67 was not covered by tests
{
$this->transformations = $transformations;

Check warning on line 69 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L69

Added line #L69 was not covered by tests

return $this;

Check warning on line 71 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L71

Added line #L71 was not covered by tests
}

/**
* Set the filename for the response.
* If the attachment flag is set to true, the response will be treated as an attachment meaning that
* the browser will prompt the user to download the file.
*/
public function underFilename(string $name, bool $attachment = true) : self
{
$this->headers['Content-Disposition'] = HeaderUtils::makeDisposition(
$attachment ? HeaderUtils::DISPOSITION_ATTACHMENT : HeaderUtils::DISPOSITION_INLINE,
$name
);

return $this;
}

/**
* Set additional headers.
* Headers are merged with the default headers.
*/
public function withHeaders(array $headers) : self

Check warning on line 93 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L93

Added line #L93 was not covered by tests
{
$this->headers = array_merge($this->headers, $headers);

Check warning on line 95 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L95

Added line #L95 was not covered by tests

return $this;

Check warning on line 97 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L97

Added line #L97 was not covered by tests
}

/**
* Remove a specific header if it exists.
* If the header does not exist, nothing happens.
*/
public function withoutHeader(string $name) : self

Check warning on line 104 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L104

Added line #L104 was not covered by tests
{
if (\array_key_exists($name, $this->headers)) {
unset($this->headers[$name]);

Check warning on line 107 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L106-L107

Added lines #L106 - L107 were not covered by tests
}

return $this;

Check warning on line 110 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L110

Added line #L110 was not covered by tests
}

/**
* Set the HTTP status code. Default is 200.
*/
public function withStatus(int $status) : self

Check warning on line 116 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L116

Added line #L116 was not covered by tests
{
$this->status = $status;

Check warning on line 118 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L118

Added line #L118 was not covered by tests

return $this;

Check warning on line 120 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L120

Added line #L120 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Flow\Bridge\Symfony\HttpFoundation;

use function Flow\ETL\DSL\df;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\{Transformations};
use Flow\ETL\{Config, Extractor, Transformation};
use Flow\ETL\{Transformations};
use Symfony\Component\HttpFoundation\StreamedResponse;

class FlowStreamedResponse extends StreamedResponse
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation;

use function Flow\ETL\DSL\int_entry;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex\StartFrom;
use Flow\ETL\{DataFrame, Row, Transformation};

final readonly class AddRowIndex implements Transformation
{
public function __construct(private string $indexColumn = 'index', private StartFrom $startFrom = StartFrom::ZERO)
{
}

public function transform(DataFrame $dataFrame) : DataFrame
{
$index = $this->startFrom === StartFrom::ZERO ? 0 : 1;

return $dataFrame->map(function (Row $row) use (&$index) {
$row = $row->add(int_entry($this->indexColumn, $index));
$index++;

return $row;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;

enum StartFrom
{
case ONE;
case ZERO;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation;

use Flow\ETL\{DataFrame, Transformation};
use Flow\Filesystem\Exception\InvalidArgumentException;

/**
* Sets batch size for DataFrame.
* Small batch size can be useful when processing large data sets since only one row is processed at a time.
* This means that while processing large data sets, memory usage is kept low.
*
* Normally flow allows to use batch size -1 (which means no batches) but it defeats the purpose of using this transformation on
* Data Streams.
*/
final readonly class BatchSize implements Transformation
{
/**
* @param int<1, max> $size
*/
public function __construct(private int $size)
{
if ($size < 1) {
throw new InvalidArgumentException('Batch size must be greater than 0');

Check warning on line 26 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/BatchSize.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/BatchSize.php#L26

Added line #L26 was not covered by tests
}
}

public function transform(DataFrame $dataFrame) : DataFrame
{
return $dataFrame->batchSize($this->size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
use function Flow\ETL\DSL\lit;
use Flow\ETL\{DataFrame, Transformation};

/**
* Mask columns in DataFrame by replacing their values with a mask.
* If column does not exist in DataFrame, it will be added with a mask value.
*/
final readonly class MaskColumns implements Transformation
{
public function __construct(private array $columns = [], private string $mask = '******')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\DSL\from_array;
use Flow\Bridge\Symfony\HttpFoundation\{FlowStreamedResponse,
use Flow\Bridge\Symfony\HttpFoundation\{DataStream,
FlowStreamedResponse,
Output\CSVOutput,
Output\JsonOutput,
Output\XMLOutput};
Expand Down Expand Up @@ -37,14 +38,14 @@ public function test_streaming_array_response_to_csv() : void

public function test_streaming_array_response_to_json() : void
{
$response = new FlowStreamedResponse(
from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
]),
new JsonOutput()
);
$extractor = from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
]);

$response = DataStream::open($extractor)
->sendTo(new JsonOutput());

self::assertEquals(<<<'JSON'
[{"id":1,"size":"XL","color":"red","ean":"1234567890123"},{"id":2,"size":"M","color":"blue","ean":"1234567890124"},{"id":3,"size":"S","color":"green","ean":"1234567890125"}]
Expand Down Expand Up @@ -100,6 +101,21 @@ public function test_streaming_partitioned_dataset() : void
, $this->sendResponse($response));
}

public function test_streaming_with_disposition() : void
{
$response = DataStream::open(
from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
])
)
->underFilename('products.csv')
->sendTo(new CSVOutput());

self::assertEquals('attachment; filename=products.csv', $response->headers->get('Content-Disposition'));
}

private function sendResponse(FlowStreamedResponse $response) : string
{
ob_start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Tests\Unit\Transformation;

use function Flow\ETL\DSL\{df, from_array};
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex\StartFrom;
use Flow\ETL\Tests\FlowTestCase;

final class AddRowIndexTest extends FlowTestCase
{
public function test_adding_row_index_to_each_row() : void
{
$rows = df()
->read(from_array(
[
['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
]
))
->with(new AddRowIndex())
->fetch()
->toArray();

self::assertEquals(
[
['index' => 0, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['index' => 1, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['index' => 2, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['index' => 3, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
],
$rows
);
}

public function test_adding_row_index_to_each_row_starting_from_1() : void
{
$rows = df()
->read(from_array(
[
['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
]
))
->with(new AddRowIndex(startFrom: StartFrom::ONE))
->fetch()
->toArray();

self::assertEquals(
[
['index' => 1, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['index' => 2, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['index' => 3, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['index' => 4, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
],
$rows
);
}
}
Loading

0 comments on commit c4005ed

Please sign in to comment.