Skip to content

Commit

Permalink
Added CLI command to read schema from a file (#1244)
Browse files Browse the repository at this point in the history
* Added CLI command to read schema from a file

* Excluded symfony commands from static analysis
  • Loading branch information
norberttech authored Oct 11, 2024
1 parent cf4e25c commit 2c34a94
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 35 deletions.
28 changes: 14 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 90 additions & 0 deletions docs/components/cli/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Flow Command Line Interface


## Installation

```
composer require flow-php/cli
```

In some cases it might make sense to install the CLI globally:

```
composer global require flow-php/cli
```

Now you can run the CLI using the `flow` command.

## Usage

```shell
$ flow
Flow PHP - Data processing framework

Usage:
command [options] [arguments]

Options:
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
completion Dump the shell completion script
help Display help for a command
list List commands
run Execute ETL pipeline from a php/json file.
file
file:schema Read data schema from a file.
parquet
parquet:read [parquet:read:data] Read data from parquet file
parquet:read:metadata Read metadata from parquet file
```

### `file:schema`

```shell
$ flow file:schema --help
Description:
Read data schema from a file.

Usage:
file:schema [options] [--] <source>
schema

Arguments:
source Path to a file from which schema should be extracted.

Options:
--pretty[=PRETTY] Pretty print schema [default: false]
--table[=TABLE] Pretty schema as ascii table [default: false]
--auto-cast[=AUTO-CAST] When set Flow will try to automatically cast values to more precise data types, for example datetime strings will be casted to datetime type [default: false]
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-if, --input-format=INPUT-FORMAT Source file format. When not set file format is guessed from source file path extension
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
```

Example:

```shell
$ flow schema orders.csv --table --auto-cast
+------------+----------+----------+-------------+----------+
| name | type | nullable | scalar_type | metadata |
+------------+----------+----------+-------------+----------+
| order_id | uuid | false | | [] |
| created_at | datetime | false | | [] |
| updated_at | datetime | false | | [] |
| discount | scalar | true | string | [] |
| address | json | false | | [] |
| notes | json | false | | [] |
| items | json | false | | [] |
+------------+----------+----------+-------------+----------+
7 rows
```
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ parameters:
- examples/topics

excludePaths:
- src/cli/src/Flow/CLI/Command/*
- src/core/etl/src/Flow/ETL/Formatter/ASCII/ASCIITable.php
- src/core/etl/src/Flow/ETL/Sort/ExternalSort/RowsMinHeap.php
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchResults.php
Expand Down
1 change: 1 addition & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

<file name="src/lib/parquet-viewer/src/Flow/ParquetViewer/Command/ReadMetadataCommand.php" />

<directory name="src/cli/src/Flow/CLI/Command" />
<directory name="src/lib/parquet/src/Flow/Parquet/ThriftStream/" />
<directory name="src/lib/parquet/src/Flow/Parquet/Thrift/" />
<directory name="src/lib/parquet/src/Flow/Parquet/BinaryReader/" />
Expand Down
10 changes: 10 additions & 0 deletions src/cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Flow Command Line Interface

Flow CLI is a powerful command-line interface that provides a wide range of tools and utilities for managing and interacting with various data sources.

> [!IMPORTANT]
> This repository is a subtree split from our monorepo. If you'd like to contribute, please visit our main monorepo [flow-php/flow](https://github.com/flow-php/flow).
- 📜 [Documentation](https://github.com/flow-php/flow/blob/1.x/docs/cli/docs.md)
- ➡️ [Installation](https://github.com/flow-php/flow/blob/1.x/docs/installation.md)
- 🛠️ [Contributing](https://github.com/flow-php/flow/blob/1.x/CONTRIBUTING.md)
11 changes: 8 additions & 3 deletions src/cli/flow
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#!/usr/bin/env php
<?php declare(strict_types=1);

use Flow\CLI\Command\ConvertCommand;
use Flow\CLI\Command\RunCommand;
use Flow\CLI\Command\FileSchemaCommand;
use Flow\CLI\FlowVersion;
use Flow\ParquetViewer\Command\ReadDataCommand;
use Flow\ParquetViewer\Command\ReadMetadataCommand;
use Symfony\Component\Console\Application;

if ('' !== Phar::running(false)) {
$pharRuntime = ('' !== Phar::running(false));

if ($pharRuntime) {
require 'phar://flow.phar/vendor/autoload.php';
} else {
if (\is_file(__DIR__ . '/vendor/autoload.php')) {
Expand Down Expand Up @@ -35,10 +39,11 @@ $_ENV['FLOW_PHAR_APP'] = 1;

\ini_set('memory_limit', -1);

$application = new Application('Flow-PHP - Extract Transform Load - Data processing framework', FlowVersion::getVersion());
$application = new Application('Flow PHP - Data processing framework', $pharRuntime ? FlowVersion::getVersion() : 'UNKNOWN');

$application->add((new ReadDataCommand())->setName('parquet:read:data'));
$application->add((new ReadDataCommand())->setName('parquet:read')->setAliases(['parquet:read:data']));
$application->add((new ReadMetadataCommand())->setName('parquet:read:metadata'));
$application->add((new RunCommand())->setName('run'));
$application->add((new FileSchemaCommand())->setName('file:schema')->setAliases(['schema']));

$application->run();
114 changes: 114 additions & 0 deletions src/cli/src/Flow/CLI/Command/FileSchemaCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

declare(strict_types=1);

namespace Flow\CLI\Command;

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\Adapter\Parquet\from_parquet;
use function Flow\ETL\Adapter\XML\from_xml;
use function Flow\ETL\DSL\{config_builder, df, from_array, ref, schema_to_json, to_output};
use function Flow\Filesystem\DSL\path_real;
use Flow\ETL\Config;
use Flow\Filesystem\Path;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\{InputArgument, InputInterface, InputOption};
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

final class FileSchemaCommand extends Command
{
private ?Config $flowConfig = null;

private ?string $inputFormat = null;

private ?Path $sourcePath = null;

public function configure() : void
{
$this
->setName('file:schema')
->setDescription('Read data schema from a file.')
->addArgument('source', InputArgument::REQUIRED, 'Path to a file from which schema should be extracted.')
->addOption('input-format', 'if', InputArgument::OPTIONAL, 'Source file format. When not set file format is guessed from source file path extension', null)
->addOption('pretty', null, InputOption::VALUE_OPTIONAL, 'Pretty print schema', false)
->addOption('table', null, InputOption::VALUE_OPTIONAL, 'Pretty schema as ascii table', false)
->addOption('auto-cast', null, InputOption::VALUE_OPTIONAL, 'When set Flow will try to automatically cast values to more precise data types, for example datetime strings will be casted to datetime type', false);
}

protected function execute(InputInterface $input, OutputInterface $output) : int
{
$style = new SymfonyStyle($input, $output);

$autoCast = ($input->getOption('auto-cast') !== false);

$df = df($this->flowConfig)
->read(match ($this->inputFormat) {
'csv' => from_csv($this->sourcePath),
'json' => from_json($this->sourcePath),
'xml' => from_xml($this->sourcePath),
'parquet' => from_parquet($this->sourcePath),
});

if ($autoCast) {
$df->autoCast();
}

$schema = $df->schema();

$prettyValue = $input->getOption('pretty');
$prettyPrint = ($prettyValue !== false);

$tableValue = $input->getOption('table');
$tablePrint = ($tableValue !== false);

if ($tablePrint) {
ob_start();
df()
->read(from_array($schema->normalize()))
->withEntry('type', ref('type')->unpack())
->renameAll('type.', '')
->rename('ref', 'name')
->collect()
->select('name', 'type', 'nullable', 'scalar_type', 'metadata')
->write(to_output())
->run();

$style->write(ob_get_clean());

return Command::SUCCESS;
}

$style->writeln(schema_to_json($schema, $prettyPrint ? JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR : JSON_THROW_ON_ERROR));

return Command::SUCCESS;
}

protected function initialize(InputInterface $input, OutputInterface $output) : void
{
$this->flowConfig = config_builder()->build();

$source = (string) $input->getArgument('source');

$sourcePath = path_real($source);

$fs = $this->flowConfig->fstab()->for($sourcePath);

if (!$fs->status($sourcePath)) {
throw new InvalidArgumentException(\sprintf('File "%s" does not exist.', $sourcePath->path()));
}

$supportedFormats = ['csv', 'json', 'xml', 'parquet', 'txt'];

$inputFormat = \mb_strtolower($input->getOption('input-format') ?: $sourcePath->extension());

if (!\in_array($inputFormat, $supportedFormats, true)) {
throw new InvalidArgumentException(\sprintf('File format "%s" is not supported. Input file format can be set with --input-format option', $inputFormat));
}

$this->sourcePath = $sourcePath;
$this->inputFormat = $inputFormat;
}
}
17 changes: 17 additions & 0 deletions src/cli/src/Flow/CLI/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ public function configure() : void
$this
->setName('run')
->setDescription('Execute ETL pipeline from a php/json file.')
->setHelp(
<<<'HELP'
<info>input-file</info> argument must point to a valid php file that returns DataFrame instance.
<comment>Make sure to not execute run() or any other trigger function.</comment>
<fg=blue>Example of pipeline.php:</>
<?php
return df()
->read(from_array([
['id' => 1, 'name' => 'User 01', 'active' => true],
['id' => 2, 'name' => 'User 02', 'active' => false],
['id' => 3, 'name' => 'User 03', 'active' => true],
]))
->collect()
->write(to_output());
HELP
)
->addArgument('input-file', InputArgument::REQUIRED, 'Path to a php/json with DataFrame definition.');
}

Expand Down
Loading

0 comments on commit 2c34a94

Please sign in to comment.