Skip to content

Commit dfd8d3d

Browse files
authored
Revert "Revert "MVP of just producing JSON from RT protos""
1 parent 986c7c5 commit dfd8d3d

15 files changed

+302
-199
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Vim
22
.*.sw[po]
33

4+
# PyCharm, etc.
5+
.idea/
6+
.DS_Store
7+
48
# Byte-compiled / optimized / DLL files
59
__pycache__/
610
*.py[cod]

.pre-commit-config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ repos:
33
rev: v2.4.0
44
hooks:
55
- id: flake8
6+
args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these)
67
types:
78
- python
89
- id: trailing-whitespace

airflow/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ This project is developed using docker and docker-compose. Before getting starte
1717
First, if you're on linux, you'll need to make sure that the UID and GID of the container match, to do so, run
1818

1919
```console
20+
cd airflow (if you are not already in the airflow directory)
2021
mkdir ./dags ./logs ./plugins
2122
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
2223
```
@@ -37,7 +38,7 @@ To run the gtfs_downloader dags, it is required to generate a yml file of the ag
3738
secrets filled in. To run this, do the following:
3839

3940
1. Setup a virtual environment within the `script` folder by running `python -m venv .venv` in the `script` folder.
40-
2. Install the needed requirements via `pip install -r requirements.txt`
41+
2. Install the needed requirements via `pip install -r requirements.txt` (the requirements in `script`)
4142
3. Copy `airflow/data/example-secrets.csv` to `airflow/data/secrets.csv` and fill in the secret keys as needed
4243
4. run `python yml_convert.py ../airflow/data/agencies.yml ../airflow/data/agencies.filled.yml ../airflow/data/secrets.csv`
4344
5. Copy `/airflow/data/agencies.yml` to `/airflow/data/agencies_raw.yml`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
operator: operators.SqlQueryOperator
3+
4+
dependencies:
5+
- parse_rt_service_alerts
6+
---
7+
8+
CREATE OR REPLACE EXTERNAL TABLE `gtfs_rt.service_alerts`
9+
OPTIONS (
10+
format = "JSON",
11+
uris = ["{{get_bucket()}}/rt-processed/service_alerts/*.jsonl.gz"]
12+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
operator: operators.SqlQueryOperator
3+
4+
dependencies:
5+
- parse_rt_trip_updates
6+
---
7+
8+
CREATE OR REPLACE EXTERNAL TABLE `gtfs_rt.trip_updates`
9+
OPTIONS (
10+
format = "JSON",
11+
uris = ["{{get_bucket()}}/rt-processed/trip_updates/*.jsonl.gz"]
12+
)
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
---
22
operator: operators.SqlQueryOperator
3+
4+
dependencies:
5+
- parse_rt_vehicle_positions
36
---
47

58
CREATE OR REPLACE EXTERNAL TABLE `gtfs_rt.vehicle_positions`
69
OPTIONS (
7-
format = "PARQUET",
8-
uris = ["gs://gtfs-data/rt-processed/vehicle_positions/*.parquet"]
10+
format = "JSON",
11+
uris = ["{{get_bucket()}}/rt-processed/vehicle_positions/*.jsonl.gz"]
912
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
operator: operators.RealtimeToFlattenedJSONOperator
2+
provide_context: true
3+
4+
filename_prefix: al
5+
rt_file_substring: service_alerts
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
operator: operators.RealtimeToFlattenedJSONOperator
2+
provide_context: true
3+
4+
filename_prefix: tu
5+
rt_file_substring: trip_updates

airflow/dags/rt_loader/parse_rt_vehicle_positions.py

-196
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
operator: operators.RealtimeToFlattenedJSONOperator
2+
provide_context: true
3+
4+
filename_prefix: rt
5+
rt_file_substring: vehicle_positions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
operator: operators.RealtimeToFlattenedJSONOperator
2+
provide_context: true
3+
4+
filename_prefix: tu
5+
rt_file_substring: trip_updates
6+
subfolder: rt-processed-sandbox
7+
limit: 5

airflow/plugins/operators/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@
88
from operators.python_to_warehouse_operator import PythonToWarehouseOperator
99
from operators.sql_query_operator import SqlQueryOperator
1010
from operators.airtable_to_warehouse import AirtableToWarehouseOperator
11+
from operators.realtime_to_flattened_json_operator import (
12+
RealtimeToFlattenedJSONOperator,
13+
)

0 commit comments

Comments
 (0)