Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DataStream builder to symfony http foundation bridge #1488

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions documentation/components/bridges/symfony-http-foundation-bridge.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,28 @@ declare(strict_types=1);

namespace Symfony\Application\Controller;

use Flow\Bridge\Symfony\HttpFoundation\FlowStreamedResponse;
use Flow\Bridge\Symfony\HttpFoundation\DataStream;
use Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Attribute\Route;
use function Flow\ETL\Adapter\Parquet\ParquetEtractor;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\MaskColumns;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;

final class ReportsController extends AbstractController
{
#[Route('/stream/report', name: 'stream-report')]
public function streamReport() : Response
{
return new FlowStreamedResponse(
new ParquetEtractor(__DIR__ . '/reports/orders.parquet'),
new CSVOutput(withHeader: true)
);
return DataStream()
::open(from_parquet(__DIR__ . '/reports/orders.parquet'))
->underFilename('orders.csv')
->transform(
new MaskColumns(['email', 'address']),
new AddRowIndex()
)
->to(new CSVOutput(withHeader: true));
}
}
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation;

use Flow\ETL\{Extractor, Transformation, Transformations};
use Symfony\Component\HttpFoundation\{HeaderUtils, Response};

/**
* FlowStreamedResponse builder.
*/
final class DataStream
{
/**
* @var array<string, string>
*/
private array $headers = [
'Cache-Control' => 'no-store, no-cache, must-revalidate, private',
'X-Accel-Buffering' => 'no', // provides support for Nginx
'Pragma' => 'no-cache', // Backward compatibility for HTTP/1.0
];

private ?Output $output = null;

private int $status = Response::HTTP_OK;

/**
* @var array<Transformation>
*/
private array $transformations = [];

public function __construct(private readonly Extractor $extractor)
{
}

public static function open(Extractor $extractor) : self
{
return new self($extractor);
}

/**
* Send the data stream to the output.
*/
public function sendTo(Output $output) : FlowStreamedResponse
{
$this->output = $output;

$this->headers['Content-Type'] = $this->output->type()->toContentTypeHeader();

return new FlowStreamedResponse(
$this->extractor,
$this->output,
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
$this->status,
$this->headers
);
}

/**
* Apply transformations to the data stream.
* Transformations are applied in the order they are passed.
* Transformations are applied on the fly, while streaming the data, this means
* that any resource expensive transformations like for example aggregations or sorting
* might significantly slow down the streaming process or even cause out of memory errors.
*/
public function transform(Transformation ...$transformations) : self

Check warning on line 67 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L67

Added line #L67 was not covered by tests
{
$this->transformations = $transformations;

Check warning on line 69 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L69

Added line #L69 was not covered by tests

return $this;

Check warning on line 71 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L71

Added line #L71 was not covered by tests
}

/**
* Set the filename for the response.
* If the attachment flag is set to true, the response will be treated as an attachment meaning that
* the browser will prompt the user to download the file.
*/
public function underFilename(string $name, bool $attachment = true) : self
{
$this->headers['Content-Disposition'] = HeaderUtils::makeDisposition(
$attachment ? HeaderUtils::DISPOSITION_ATTACHMENT : HeaderUtils::DISPOSITION_INLINE,
$name
);

return $this;
}

/**
* Set additional headers.
* Headers are merged with the default headers.
*/
public function withHeaders(array $headers) : self

Check warning on line 93 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L93

Added line #L93 was not covered by tests
{
$this->headers = array_merge($this->headers, $headers);

Check warning on line 95 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L95

Added line #L95 was not covered by tests

return $this;

Check warning on line 97 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L97

Added line #L97 was not covered by tests
}

/**
* Remove a specific header if it exists.
* If the header does not exist, nothing happens.
*/
public function withoutHeader(string $name) : self

Check warning on line 104 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L104

Added line #L104 was not covered by tests
{
if (\array_key_exists($name, $this->headers)) {
unset($this->headers[$name]);

Check warning on line 107 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L106-L107

Added lines #L106 - L107 were not covered by tests
}

return $this;

Check warning on line 110 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L110

Added line #L110 was not covered by tests
}

/**
* Set the HTTP status code. Default is 200.
*/
public function withStatus(int $status) : self

Check warning on line 116 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L116

Added line #L116 was not covered by tests
{
$this->status = $status;

Check warning on line 118 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L118

Added line #L118 was not covered by tests

return $this;

Check warning on line 120 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php#L120

Added line #L120 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Flow\Bridge\Symfony\HttpFoundation;

use function Flow\ETL\DSL\df;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\{Transformations};
use Flow\ETL\{Config, Extractor, Transformation};
use Flow\ETL\{Transformations};
use Symfony\Component\HttpFoundation\StreamedResponse;

class FlowStreamedResponse extends StreamedResponse
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation;

use function Flow\ETL\DSL\int_entry;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex\StartFrom;
use Flow\ETL\{DataFrame, Row, Transformation};

final readonly class AddRowIndex implements Transformation
{
public function __construct(private string $indexColumn = 'index', private StartFrom $startFrom = StartFrom::ZERO)
{
}

public function transform(DataFrame $dataFrame) : DataFrame
{
$index = $this->startFrom === StartFrom::ZERO ? 0 : 1;

return $dataFrame->map(function (Row $row) use (&$index) {
$row = $row->add(int_entry($this->indexColumn, $index));
$index++;

return $row;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;

enum StartFrom
{
case ONE;
case ZERO;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Transformation;

use Flow\ETL\{DataFrame, Transformation};
use Flow\Filesystem\Exception\InvalidArgumentException;

/**
* Sets batch size for DataFrame.
* Small batch size can be useful when processing large data sets since only one row is processed at a time.
* This means that while processing large data sets, memory usage is kept low.
*
* Normally flow allows to use batch size -1 (which means no batches) but it defeats the purpose of using this transformation on
* Data Streams.
*/
final readonly class BatchSize implements Transformation
{
/**
* @param int<1, max> $size
*/
public function __construct(private int $size)
{
if ($size < 1) {
throw new InvalidArgumentException('Batch size must be greater than 0');

Check warning on line 26 in src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/BatchSize.php

View check run for this annotation

Codecov / codecov/patch

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Transformation/BatchSize.php#L26

Added line #L26 was not covered by tests
}
}

public function transform(DataFrame $dataFrame) : DataFrame
{
return $dataFrame->batchSize($this->size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
use function Flow\ETL\DSL\lit;
use Flow\ETL\{DataFrame, Transformation};

/**
* Mask columns in DataFrame by replacing their values with a mask.
* If column does not exist in DataFrame, it will be added with a mask value.
*/
final readonly class MaskColumns implements Transformation
{
public function __construct(private array $columns = [], private string $mask = '******')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\DSL\from_array;
use Flow\Bridge\Symfony\HttpFoundation\{FlowStreamedResponse,
use Flow\Bridge\Symfony\HttpFoundation\{DataStream,
FlowStreamedResponse,
Output\CSVOutput,
Output\JsonOutput,
Output\XMLOutput};
Expand Down Expand Up @@ -37,14 +38,14 @@ public function test_streaming_array_response_to_csv() : void

public function test_streaming_array_response_to_json() : void
{
$response = new FlowStreamedResponse(
from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
]),
new JsonOutput()
);
$extractor = from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
]);

$response = DataStream::open($extractor)
->sendTo(new JsonOutput());

self::assertEquals(<<<'JSON'
[{"id":1,"size":"XL","color":"red","ean":"1234567890123"},{"id":2,"size":"M","color":"blue","ean":"1234567890124"},{"id":3,"size":"S","color":"green","ean":"1234567890125"}]
Expand Down Expand Up @@ -100,6 +101,21 @@ public function test_streaming_partitioned_dataset() : void
, $this->sendResponse($response));
}

public function test_streaming_with_disposition() : void
{
$response = DataStream::open(
from_array([
['id' => 1, 'size' => 'XL', 'color' => 'red', 'ean' => '1234567890123'],
['id' => 2, 'size' => 'M', 'color' => 'blue', 'ean' => '1234567890124'],
['id' => 3, 'size' => 'S', 'color' => 'green', 'ean' => '1234567890125'],
])
)
->underFilename('products.csv')
->sendTo(new CSVOutput());

self::assertEquals('attachment; filename=products.csv', $response->headers->get('Content-Disposition'));
}

private function sendResponse(FlowStreamedResponse $response) : string
{
ob_start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation\Tests\Unit\Transformation;

use function Flow\ETL\DSL\{df, from_array};
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex\StartFrom;
use Flow\ETL\Tests\FlowTestCase;

final class AddRowIndexTest extends FlowTestCase
{
public function test_adding_row_index_to_each_row() : void
{
$rows = df()
->read(from_array(
[
['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
]
))
->with(new AddRowIndex())
->fetch()
->toArray();

self::assertEquals(
[
['index' => 0, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['index' => 1, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['index' => 2, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['index' => 3, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
],
$rows
);
}

public function test_adding_row_index_to_each_row_starting_from_1() : void
{
$rows = df()
->read(from_array(
[
['id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
]
))
->with(new AddRowIndex(startFrom: StartFrom::ONE))
->fetch()
->toArray();

self::assertEquals(
[
['index' => 1, 'id' => 1, 'name' => 'John Doe', 'salary' => 7000, 'currency' => 'USD'],
['index' => 2, 'id' => 2, 'name' => 'Jane Doe', 'salary' => 8000, 'currency' => 'USD'],
['index' => 3, 'id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['index' => 4, 'id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
],
$rows
);
}
}
Loading
Loading