This module contains examples to demonstrate the use of Flink connector Table API and it uses NY Taxi Records to demonstrate the usage.
The following files are downloaded and used for this sample application
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-01.csv
https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
- Pravega running (see here for instructions)(https://github.com/pravega/pravega-samples#pravega-samples-build-instructions)
- Python and its packages, see pyflink setup
- Install Pravega Python client via
pip install pravega
- Get the latest Pravega Flink connectors at the release page
- Apache Flink running
If you wish to use Anaconda instead, replace
pip
withconda
in the steps above.
Navigate to the python source code. There is no need to touch the entire Java and Gradle things.
cd $SAMPLES-HOME/scenarios/pravega-flink-connector-sql-samples/src/main/python
Usage:
flink run --python ./bin/<popular_destination_query.py|popular_taxi_vendor.py|max_travellers_per_destination.py> --pyFiles common.py
Additional optional parameters: --scope <scope-name> --stream <stream-name> --controller-uri <controller-uri>
-
Load taxi data to Pravega
The default option assumes that the Pravega is running locally. You can override it by passing the controller URI options. It should be
ip:port
likelocalhost:9090
withouttcp://
at the beginning.python3 prepare_main.py
The above command loads the taxi dataset records to Pravega and prepares the environment for testing.
-
Popular Destination
flink run --python ./popular_destination_query.py --pyFiles ./common.py --jarfile <path/to/pravega-flink-connector>
The above command uses SQL to find the most popular destination (drop-off location) from the available trip records.
-
Popular Taxi Vendors
flink run --python ./popular_taxi_vendor.py --pyFiles ./common.py --jarfile <path/to/pravega-flink-connector>
The above command uses Table API to find the most popular taxi vendor that was used by the commuters.
-
Maximum Travellers/Destination
flink run --python ./max_travellers_per_destination.py --pyFiles ./common.py --jarfile <path/to/pravega-flink-connector>
The above command uses Table API to group the maximum number of travellers with respect to the destination/drop-off location.