Skip to content

Commit c851044

Browse files
authored
Flow Http Stream DX improvements (#1497)
* Flow Http Stream DX improvements * Updated DSL definitions * CS Fixes
1 parent b5d4fae commit c851044

File tree

38 files changed

+1044
-162
lines changed

38 files changed

+1044
-162
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@
152152
"src/lib/filesystem/src/Flow/Filesystem/DSL/functions.php",
153153
"src/lib/parquet/src/Flow/Parquet/functions.php",
154154
"src/lib/parquet/src/stubs.php",
155-
"src/lib/snappy/polyfill.php"
155+
"src/lib/snappy/polyfill.php",
156+
"src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/functions.php"
156157
]
157158
},
158159
"autoload-dev": {

documentation/components/bridges/symfony-http-foundation-bridge.md

Lines changed: 38 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Http Foundation Bridge provides seamless integration between Symfony Http Founda
1616
## Installation
1717

1818
```
19-
composer require flow-php/symfony-http-foundation-bridge
19+
composer require flow-php/symfony-http-foundation-bridge:1.x-dev
2020
```
2121

2222
## Usage
@@ -30,87 +30,67 @@ files that normally would not fit in memory.
3030

3131
namespace Symfony\Application\Controller;
3232

33-
use Flow\Bridge\Symfony\HttpFoundation\DataStream;
34-
use Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput;
33+
use Flow\Bridge\Symfony\HttpFoundation\Response\FlowBufferedResponse;
34+
use Flow\Bridge\Symfony\HttpFoundation\Response\FlowStreamedResponse;
35+
use Flow\ETL\Transformation\AddRowIndex;
36+
use Flow\ETL\Transformation\Limit;
37+
use Flow\ETL\Transformation\MaskColumns;
3538
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
36-
use Symfony\Component\HttpFoundation\Response;
3739
use Symfony\Component\Routing\Attribute\Route;
38-
use Flow\Bridge\Symfony\HttpFoundation\Transformation\MaskColumns;
39-
use Flow\Bridge\Symfony\HttpFoundation\Transformation\AddRowIndex;
40+
use function Flow\Bridge\Symfony\HttpFoundation\http_csv_output;
41+
use function Flow\Bridge\Symfony\HttpFoundation\http_stream_open;
4042
use function Flow\ETL\Adapter\Parquet\from_parquet;
4143

4244
final class ReportsController extends AbstractController
4345
{
44-
#[Route('/stream/report', name: 'stream-report')]
45-
public function streamReport() : Response
46+
#[Route('/report/stream', name: 'report_stream')]
47+
public function streamReport() : FlowStreamedResponse
4648
{
47-
return DataStream
48-
::open(from_parquet(__DIR__ . '/reports/orders.parquet'))
49-
->underFilename('orders.csv')
49+
return http_stream_open(from_parquet(__DIR__ . '/reports/orders.parquet'))
50+
->headers(['X-Custom-Header' => 'Custom Value'])
5051
->transform(
5152
new MaskColumns(['email', 'address']),
5253
new AddRowIndex()
5354
)
54-
->to(new CSVOutput(withHeader: true));
55+
->as('orders.csv')
56+
->status(200)
57+
->streamedResponse(http_csv_output());
58+
}
59+
60+
#[Route('/report', name: 'report')]
61+
public function bufferReport() : FlowBufferedResponse
62+
{
63+
return http_stream_open(from_parquet(__DIR__ . '/reports/orders.parquet'))
64+
->transform(
65+
new Limit(100),
66+
new MaskColumns(['email', 'address']),
67+
new AddRowIndex(),
68+
)
69+
->as('orders.csv')
70+
->response(http_csv_output());
5571
}
5672
}
5773
```
5874

5975
## Available Outputs
6076

61-
- `Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput` - converts dataset to CSV format.
62-
- `Flow\Bridge\Symfony\HttpFoundation\Output\JSONOutput` - converts dataset to JSON format.
63-
- `Flow\Bridge\Symfony\HttpFoundation\Output\ParquetOutput` - converts dataset to Parquet format.
64-
- `Flow\Bridge\Symfony\HttpFoundation\Output\XMLOutput` - converts dataset to XML format.
77+
- `Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput` - `http_csv_output()` - converts dataset to CSV format.
78+
- `Flow\Bridge\Symfony\HttpFoundation\Output\JSONOutput` - `http_json_output()` -converts dataset to JSON format.
79+
- `Flow\Bridge\Symfony\HttpFoundation\Output\ParquetOutput` - `http_parquet_output()` -converts dataset to Parquet format.
80+
- `Flow\Bridge\Symfony\HttpFoundation\Output\XMLOutput` - `http_xml_output()` -converts dataset to XML format.
6581

6682
## Modify output on the fly
6783

6884
Sometimes we need to modify the output on the fly.
69-
To do that, FlowStreamedResponse allows to pass a Transformation that will be applied on the dataset.
85+
To do that, FlowStreamedResponse allows passing a Transformation that will be applied on the dataset.
7086

7187
```php
72-
return new FlowStreamedResponse(
73-
new ParquetEtractor(__DIR__ . '/reports/orders.parquet'),
74-
new CSVOutput(withHeader: true),
75-
new class implements Transformation {
76-
public function transform(DataFrame $dataFrame): DataFrame
77-
{
78-
return $dataFrame->withColumn('time', \time());
79-
}
88+
new class implements Transformation {
89+
public function transform(DataFrame $dataFrame): DataFrame
90+
{
91+
return $dataFrame->withColumn('time', \time());
8092
}
81-
);
93+
}
8294
```
8395

8496
Above example will add a new column `time` to the dataset with the current timestamp.
85-
86-
Predefined Transformations:
87-
88-
- `Flow\Bridge\Symfony\HttpFoundation\Transformation\MaskColumns` - mask columns with `*****` value.
89-
90-
```php
91-
<?php
92-
93-
declare(strict_types=1);
94-
95-
namespace Symfony\Application\Controller;
96-
97-
use Flow\Bridge\Symfony\HttpFoundation\FlowStreamedResponse;
98-
use Flow\Bridge\Symfony\HttpFoundation\Output\CSVOutput;
99-
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
100-
use Symfony\Component\HttpFoundation\Response;
101-
use Symfony\Component\Routing\Attribute\Route;
102-
use function Flow\ETL\Adapter\Parquet\ParquetEtractor;
103-
104-
final class ReportsController extends AbstractController
105-
{
106-
#[Route('/stream/report', name: 'stream-report')]
107-
public function streamReport() : Response
108-
{
109-
return new FlowStreamedResponse(
110-
new ParquetEtractor(__DIR__ . '/reports/orders.parquet'),
111-
new CSVOutput(withHeader: true),
112-
new MaskColumns(['email', 'address'])
113-
);
114-
}
115-
}
116-
```

src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,50 +6,38 @@
66

77
use function Flow\ETL\Adapter\Parquet\{from_parquet, to_parquet};
88
use function Flow\ETL\DSL\data_frame;
9-
use function Flow\ETL\DSL\{df, from_array, json_schema, schema, str_schema};
9+
use function Flow\ETL\DSL\{config, from_array, json_schema, schema, str_schema};
10+
use function Flow\Filesystem\DSL\{path};
1011
use Flow\ETL\Tests\Double\FakeExtractor;
1112
use Flow\ETL\{Tests\FlowTestCase};
12-
use Flow\Parquet\ParquetFile\Compressions;
13-
use Flow\Parquet\Reader;
1413
use Ramsey\Uuid\Uuid;
1514

1615
final class ParquetTest extends FlowTestCase
1716
{
18-
public function test_writing_to_file() : void
17+
public function test_writing_and_reading_into_parquet() : void
1918
{
20-
$path = __DIR__ . '/var/file.snappy.parquet';
21-
$this->removeFile($path);
19+
$path = path('memory://var/file.snappy.parquet');
2220

23-
df()
21+
$config = config();
22+
data_frame($config)
2423
->read(new FakeExtractor(10))
2524
->drop('null', 'array', 'object', 'enum')
2625
->write(to_parquet($path))
2726
->run();
2827

2928
self::assertEquals(
3029
10,
31-
(data_frame())
30+
(data_frame($config))
3231
->read(from_parquet($path))
3332
->count()
3433
);
35-
36-
$parquetFile = (new Reader())->read($path);
37-
self::assertNotEmpty($parquetFile->metadata()->columnChunks());
38-
39-
foreach ($parquetFile->metadata()->columnChunks() as $columnChunk) {
40-
self::assertSame(Compressions::SNAPPY, $columnChunk->codec());
41-
}
42-
43-
self::assertFileExists($path);
44-
$this->removeFile($path);
4534
}
4635

4736
public function test_writing_with_provided_schema() : void
4837
{
49-
$path = __DIR__ . '/var/file_schema.snappy.parquet';
50-
$this->removeFile($path);
51-
52-
df()
38+
$path = path('memory://var/file_schema.snappy.parquet');
39+
$config = config();
40+
data_frame($config)
5341
->read(from_array([
5442
['id' => 1, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
5543
['id' => 2, 'name' => 'test', 'uuid' => Uuid::fromString('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => '[{"id":1,"name":"test"},{"id":2,"name":"test"}]'],
@@ -69,14 +57,13 @@ public function test_writing_with_provided_schema() : void
6957
['id' => '1', 'name' => 'test', 'uuid' => new \Flow\ETL\PHP\Value\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]],
7058
['id' => '2', 'name' => 'test', 'uuid' => new \Flow\ETL\PHP\Value\Uuid('26fd21b0-6080-4d6c-bdb4-1214f1feffef'), 'json' => [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]],
7159
],
72-
df()
60+
data_frame($config)
7361
->read(from_parquet($path))
7462
->fetch()
7563
->toArray()
7664
);
7765

78-
self::assertFileExists($path);
79-
$this->removeFile($path);
66+
self::assertTrue($config->fstab()->for($path)->status($path)?->isFile());
8067
}
8168

8269
/**

src/bridge/symfony/http-foundation/composer.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
},
2525
"license": "MIT",
2626
"autoload": {
27+
"files": [
28+
"src/Flow/Bridge/Symfony/HttpFoundation/functions.php"
29+
],
2730
"psr-4": {
2831
"Flow\\": [
2932
"src/Flow"

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/DataStream.php

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Flow\Bridge\Symfony\HttpFoundation;
66

7+
use Flow\Bridge\Symfony\HttpFoundation\Response\{FlowBufferedResponse, FlowStreamedResponse};
78
use Flow\ETL\{Extractor, Transformation, Transformations};
89
use Symfony\Component\HttpFoundation\{HeaderUtils, Response};
910

@@ -21,8 +22,6 @@ final class DataStream
2122
'Pragma' => 'no-cache', // Backward compatibility for HTTP/1.0
2223
];
2324

24-
private ?Output $output = null;
25-
2625
private int $status = Response::HTTP_OK;
2726

2827
/**
@@ -40,59 +39,86 @@ public static function open(Extractor $extractor) : self
4039
}
4140

4241
/**
43-
* Send the data stream to the output.
42+
* Set the filename for the response.
43+
* If the attachment flag is set to true, the response will be treated as an attachment meaning that
44+
* the browser will prompt the user to download the file.
4445
*/
45-
public function sendTo(Output $output) : FlowStreamedResponse
46+
public function as(string $name, bool $attachment = true) : self
4647
{
47-
$this->output = $output;
48+
$this->headers['Content-Disposition'] = HeaderUtils::makeDisposition(
49+
$attachment ? HeaderUtils::DISPOSITION_ATTACHMENT : HeaderUtils::DISPOSITION_INLINE,
50+
$name
51+
);
4852

49-
$this->headers['Content-Type'] = $this->output->type()->toContentTypeHeader();
53+
return $this;
54+
}
5055

51-
return new FlowStreamedResponse(
56+
/**
57+
* Set additional headers.
58+
* Headers are merged with the default headers.
59+
*/
60+
public function headers(array $headers) : self
61+
{
62+
$this->headers = array_merge($this->headers, $headers);
63+
64+
return $this;
65+
}
66+
67+
/**
68+
* Create regular response where whole dataset is loaded into the memory.
69+
* It's highly recommended to use limit transformation to avoid loading entire dataset into the memory.
70+
* Some extractors like Parquet/Elasticsearch/Doctrine allows also for setting offset directly on the extractor.
71+
*/
72+
public function response(Output $output) : FlowBufferedResponse
73+
{
74+
$this->headers['Content-Type'] = $output->type()->toContentTypeHeader();
75+
76+
return new FlowBufferedResponse(
5277
$this->extractor,
53-
$this->output,
78+
$output,
5479
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
5580
$this->status,
5681
$this->headers
5782
);
5883
}
5984

6085
/**
61-
* Apply transformations to the data stream.
62-
* Transformations are applied in the order they are passed.
63-
* Transformations are applied on the fly, while streaming the data, this means
64-
* that any resource expensive transformations like for example aggregations or sorting
65-
* might significantly slow down the streaming process or even cause out of memory errors.
86+
* Set the HTTP status code. Default is 200.
6687
*/
67-
public function transform(Transformation ...$transformations) : self
88+
public function status(int $status) : self
6889
{
69-
$this->transformations = $transformations;
90+
$this->status = $status;
7091

7192
return $this;
7293
}
7394

7495
/**
75-
* Set the filename for the response.
76-
* If the attachment flag is set to true, the response will be treated as an attachment meaning that
77-
* the browser will prompt the user to download the file.
96+
* Send the data stream to the output.
7897
*/
79-
public function underFilename(string $name, bool $attachment = true) : self
98+
public function streamedResponse(Output $output) : FlowStreamedResponse
8099
{
81-
$this->headers['Content-Disposition'] = HeaderUtils::makeDisposition(
82-
$attachment ? HeaderUtils::DISPOSITION_ATTACHMENT : HeaderUtils::DISPOSITION_INLINE,
83-
$name
84-
);
85100

86-
return $this;
101+
$this->headers['Content-Type'] = $output->type()->toContentTypeHeader();
102+
103+
return new FlowStreamedResponse(
104+
$this->extractor,
105+
$output,
106+
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
107+
$this->status,
108+
$this->headers
109+
);
87110
}
88111

89112
/**
90-
* Set additional headers.
91-
* Headers are merged with the default headers.
113+
* Apply transformations to the data stream.
114+
* Transformations are applied in the order they are passed.
115+
* Transformations are applied on the fly, while streaming the data, this means
116+
* that any resource expensive transformations like for example aggregations or sorting
117+
* might significantly slow down the streaming process or even cause out of memory errors.
92118
*/
93-
public function withHeaders(array $headers) : self
119+
public function transform(Transformation ...$transformations) : self
94120
{
95-
$this->headers = array_merge($this->headers, $headers);
121+
$this->transformations = $transformations;
96122

97123
return $this;
98124
}
@@ -109,14 +135,4 @@ public function withoutHeader(string $name) : self
109135

110136
return $this;
111137
}
112-
113-
/**
114-
* Set the HTTP status code. Default is 200.
115-
*/
116-
public function withStatus(int $status) : self
117-
{
118-
$this->status = $status;
119-
120-
return $this;
121-
}
122138
}

src/bridge/symfony/http-foundation/src/Flow/Bridge/Symfony/HttpFoundation/Output.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
interface Output
1111
{
12-
public function loader() : Loader;
12+
public function memoryLoader(string $id) : Loader;
13+
14+
public function stdoutLoader() : Loader;
1315

1416
public function type() : Type;
1517
}

0 commit comments

Comments
 (0)