Skip to content

add lambda function to export trajectories #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ sin autenticación.
reemplazando `PORT` por el valor del parámetro `--port` del comando de ejecución.
Si la aplicación se está ejecutando correctamente, deberías obtener como respuesta un array
JSON con 10 registros de la tabla `taxis` de la base de datos configurada en Vercel.

## Endpoint de exportación a S3

El endpoint de exportación a S3 se encuentra en el directorio 'export_function' y puede ser probado localmente ejecutando `test_locally.py`.
Empty file added export_function/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions export_function/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import boto3
import json
from psycopg import connect, OperationalError
from fleet_api.config import Config

# Initialize S3 client
s3_client = boto3.client('s3')

def lambda_handler(event, context):
# Load environment variables configured in Config
env = Config()

# Retrieve taxi ID and date from the event data
taxi_identifier = event.get('taxi_id')
date = event.get('date') # Expected format: 'YYYY-MM-DD'

if not taxi_identifier or not date:
raise ValueError("taxi_id and date must be provided")

# Get database connection
try:
connection = connect(conninfo=env.DATABASE_URL)
except OperationalError as ex:
print(f"Database connection failed: {ex}")
raise

# Fetch trajectories data for the given taxi and date
try:
with connection.cursor() as cursor:
cursor.execute(
"""SELECT date,latitude,longitude
FROM TRAJECTORIES
WHERE taxi_id = %s AND CAST(date AS DATE) = %s""",
(taxi_identifier, date)
)
resultset = cursor.fetchall()
trajectories = [{"date": str(row[0]),
"latitude": row[1],
"longitude": row[2]} for row in resultset]
except Exception as ex:
print(f"Failed to fetch data: {ex}")
connection.close()
raise

# Close the database connection
connection.close()

# Convert data to JSON
data = json.dumps(trajectories)

# Create S3 object key
object_name = f"{env.S3_OBJECT_PREFIX}taxi_{taxi_identifier}_{date}_trajectories.json"

# Upload to S3
try:
s3_client.put_object(Bucket=env.S3_BUCKET_NAME, Key=object_name, Body=data)
print(f"Data successfully uploaded to S3 bucket {env.S3_BUCKET_NAME} with key {object_name}")
except Exception as ex:
print(f"Failed to upload data to S3: {ex}")
raise

return {
'statusCode': 200,
'body': json.dumps('Data successfully processed and uploaded!')
}
2 changes: 2 additions & 0 deletions fleet_api/.env.sample
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
DATABASE_URL="postgres://USER:PASSWORD@DOMAIN:PORT/DATABASE?sslmode=require"
S3_BUCKET=export-fleet-api
S3_OBJECT_PREFIX=exports
2 changes: 2 additions & 0 deletions fleet_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ def __init__(self):
load_dotenv()
# setup non secret config variables here
self.DATABASE_URL = environ.get('DATABASE_URL')
self.S3_BUCKET_NAME = environ.get('S3_BUCKET_NAME')
self.S3_OBJECT_PREFIX = environ.get('S3_OBJECT_PREFIX')
25 changes: 25 additions & 0 deletions test_locally.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
from dotenv import load_dotenv
from export_function.export import lambda_handler

# Load the environment variables
load_dotenv()

def test_lambda_function():
# Mock input event
event = {
'taxi_id': '10133', # Replace with a valid taxi ID from your DB
'date': '2008-02-02' # Replace with a date you want to query
}

# Mock context (not used in this example but required by lambda_handler)
context = {}

# Invoke the Lambda function
result = lambda_handler(event, context)

# Output the result
print(result)

if __name__ == "__main__":
test_lambda_function()