Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chatbot #4523

Open
wants to merge 14 commits into
base: staging
Choose a base branch
from
Open

chatbot #4523

Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/spatial/controllers/controllers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# controller/controller.py
from flask import Blueprint, request, jsonify
from views.chatbot_views import ChatbotView
from views.getis_services import SpatialDataHandler
from views.getis_confidence_services import SpatialDataHandler_confidence
from views.localmoran_services import SpatialDataHandler_moran
Expand All @@ -12,7 +13,7 @@
from views.satellite_predictions import SatellitePredictionView
from views.site_category_view import SiteCategorizationView
from views.site_selection_views import SiteSelectionView
from views.report_view import ReportView
from views.report_view import ReportView


controller_bp = Blueprint("controller", __name__)
Expand Down Expand Up @@ -78,4 +79,9 @@ def fetch_air_quality_without_llm():

@controller_bp.route("/air_quality_report_with_customised_prompt", methods=["POST"])
def fetch_air_quality_with_customised_prompt():
return ReportView.generate_air_quality_report_with_customised_prompt_gemini()
return ReportView.generate_air_quality_report_with_customised_prompt_gemini()

@controller_bp.route("/chatbot", methods=["POST"])
def Chatbot_Views():
return ChatbotView.chat_endpoint()

120 changes: 120 additions & 0 deletions src/spatial/models/chatbot_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import requests
from configure import Config
import google.generativeai as genai
import logging
import re
from flask import Flask, request, jsonify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused Flask imports

The static analysis correctly identifies that request and jsonify are imported but never used in this file.

- from flask import Flask, request, jsonify
+ from flask import Flask
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from flask import Flask, request, jsonify
from flask import Flask
🧰 Tools
🪛 Ruff (0.8.2)

6-6: flask.request imported but unused

Remove unused import

(F401)


6-6: flask.jsonify imported but unused

Remove unused import

(F401)

from functools import lru_cache
import threading

# Configure API keys
GOOGLE_API_KEY = Config.GOOGLE_API_KEY
genai.configure(api_key=GOOGLE_API_KEY)

# Flask app
app = Flask(__name__)

class DataFetcher:
@staticmethod
@lru_cache(maxsize=100) # Cache for speed
def fetch_air_quality_data_a(grid_id, start_time, end_time):
token = Config.AIRQO_API_TOKEN
analytics_url = Config.ANALTICS_URL
if token is None:
logging.error("AIRQO_API_TOKEN not set.")
return None

url = f"{analytics_url}?token={token}"
payload = {"grid_id": grid_id, "start_time": start_time, "end_time": end_time}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider URL encoding for token in query parameters

The token is appended directly to the URL, which could lead to issues if it contains special characters. Consider using proper URL parameter encoding.

- url = f"{analytics_url}?token={token}"
+ from urllib.parse import urlencode
+ query_params = {'token': token}
+ url = f"{analytics_url}?{urlencode(query_params)}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
url = f"{analytics_url}?token={token}"
payload = {"grid_id": grid_id, "start_time": start_time, "end_time": end_time}
from urllib.parse import urlencode
query_params = {'token': token}
url = f"{analytics_url}?{urlencode(query_params)}"
payload = {"grid_id": grid_id, "start_time": start_time, "end_time": end_time}



try:
response = requests.post(url, json=payload, timeout=5)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
logging.error(f"HTTP error: {http_err}")
except requests.exceptions.RequestException as req_err:
logging.error(f"Request error: {req_err}")
except ValueError as json_err:
logging.error(f"JSON error: {json_err}")
return None

class AirQualityChatbot:
def __init__(self, data):
self.data = data
self.grid_name = data.get('airquality', {}).get('sites', {}).get('grid name', ['Unknown'])[0]
self.annual_data = data.get('airquality', {}).get('annual_pm', [{}])[0] or {}
self.daily_mean_data = data.get('airquality', {}).get('daily_mean_pm', []) or []
self.diurnal = data.get('airquality', {}).get('diurnal', []) or []
self.monthly_data = data.get('airquality', {}).get('site_monthly_mean_pm', []) or []
self.site_names = [item.get('site_name', 'Unknown') for item in data.get('airquality', {}).get('site_annual_mean_pm', [])] or ['Unknown']
self.num_sites = data.get('airquality', {}).get('sites', {}).get('number_of_sites', 'Unknown')
self.starttime = data.get('airquality', {}).get('period', {}).get('startTime', '')[:10] or 'N/A'
self.endtime = data.get('airquality', {}).get('period', {}).get('endTime', '')[:10] or 'N/A'

self.annual_pm2_5 = self.annual_data.get("pm2_5_calibrated_value", 'N/A')
self.gemini_model = genai.GenerativeModel('gemini-pro')
self.lock = threading.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider documenting the purpose of the threading lock

A threading lock is initialized but never used in the code. If it's intended for future use or for use by callers, consider adding documentation explaining its purpose.


🏁 Script executed:

#!/bin/bash
# Search for uses of the lock in this file or other files
grep -r "lock" --include="*.py" src/spatial/

Length of output: 117


Action Required: Document or Remove the Unused Threading Lock

The grep search confirms that the threading.Lock() instantiated at src/spatial/models/chatbot_model.py line 57 isn’t referenced anywhere else in the codebase. If this lock is reserved for future synchronization or intended for use by other parts of the application, please add a comment or update the documentation to clarify its purpose and usage. Otherwise, consider removing it to eliminate unnecessary code confusion.


# Precompute for rule-based speed
self.today_pm2_5 = self.daily_mean_data[0].get('pm2_5_calibrated_value', 'N/A') if self.daily_mean_data else 'N/A'
self.peak_diurnal = max(self.diurnal, key=lambda x: x.get('pm2_5_calibrated_value', 0)) if self.diurnal else {}

def _prepare_data_context(self):
return (
f"AirQo data for {self.grid_name} ({self.starttime}-{self.endtime}): "
f"Annual PM2.5={self.annual_pm2_5} µg/m³, Sites={self.num_sites}, "
f"Daily sample={self.daily_mean_data[:1]}, Diurnal sample={self.diurnal[:1]}, "
f"Monthly sample={self.monthly_data[:1]}, Site names={self.site_names}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Limit data exposure in context preparation

The context preparation method includes raw data samples which might contain sensitive information. Consider limiting the amount of data exposed to the LLM and formatting it more appropriately.

def _prepare_data_context(self):
    return (
        f"AirQo data for {self.grid_name} ({self.starttime}-{self.endtime}): "
        f"Annual PM2.5={self.annual_pm2_5} µg/m³, Sites={self.num_sites}, "
-       f"Daily sample={self.daily_mean_data[:1]}, Diurnal sample={self.diurnal[:1]}, "
-       f"Monthly sample={self.monthly_data[:1]}, Site names={self.site_names}"
+       f"Today's PM2.5={self.today_pm2_5} µg/m³, "
+       f"Peak hour={(self.peak_diurnal.get('hour', 'N/A') + ':00') if self.peak_diurnal else 'N/A'}, "
+       f"Site names={', '.join(self.site_names[:5])}{' and more' if len(self.site_names) > 5 else ''}"
    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _prepare_data_context(self):
return (
f"AirQo data for {self.grid_name} ({self.starttime}-{self.endtime}): "
f"Annual PM2.5={self.annual_pm2_5} µg/m³, Sites={self.num_sites}, "
f"Daily sample={self.daily_mean_data[:1]}, Diurnal sample={self.diurnal[:1]}, "
f"Monthly sample={self.monthly_data[:1]}, Site names={self.site_names}"
)
def _prepare_data_context(self):
return (
f"AirQo data for {self.grid_name} ({self.starttime}-{self.endtime}): "
f"Annual PM2.5={self.annual_pm2_5} µg/m³, Sites={self.num_sites}, "
f"Today's PM2.5={self.today_pm2_5} µg/m³, "
f"Peak hour={(self.peak_diurnal.get('hour', 'N/A') + ':00') if self.peak_diurnal else 'N/A'}, "
f"Site names={', '.join(self.site_names[:5])}{' and more' if len(self.site_names) > 5 else ''}"
)


def _rule_based_response(self, user_prompt):
prompt = user_prompt.lower()

if re.search(r"(today|now).*air.*quality", prompt):
return f"Today’s PM2.5 in {self.grid_name} is {self.today_pm2_5} µg/m³."

if re.search(r"(worst|highest|peak).*time", prompt):
if self.peak_diurnal:
return f"Pollution peaks at {self.peak_diurnal.get('hour', 'N/A')}:00 with {self.peak_diurnal.get('pm2_5_calibrated_value', 'N/A')} µg/m³."
return "No diurnal data available."

if re.search(r"how.*many.*(sites|monitors)", prompt):
return f"There are {self.num_sites} monitoring sites in {self.grid_name}."

if re.search(r"(year|annual).*average", prompt):
return f"The annual PM2.5 average in {self.grid_name} is {self.annual_pm2_5} µg/m³."

if re.search(r"(where|which|list).*site|sites|locations", prompt):
return f"Monitoring sites in {self.grid_name}: {', '.join(self.site_names)}."

return None

def _llm_response(self, user_prompt):
if re.search(r"(report|summary|detailed|analysis|conclusion)", user_prompt.lower()):
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Generate a concise conclusion or report based on the data. Focus on key insights."
)
else:
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Respond concisely and accurately based on the data."
)

try:
response = self.gemini_model.generate_content(full_prompt)
return response.text
except Exception as e:
logging.error(f"LLM error: {e}")
return "Sorry, I couldn’t process that right now."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement thread safety in LLM response method

The class initializes a threading lock but doesn't use it when making external API calls. Consider using the lock to ensure thread safety, especially if this method might be called concurrently.

def _llm_response(self, user_prompt):
-   if re.search(r"(report|summary|detailed|analysis|conclusion)", user_prompt.lower()):
-       full_prompt = (
-           f"Data: {self._prepare_data_context()}\n"
-           f"User: {user_prompt}\n"
-           "Generate a concise conclusion or report based on the data. Focus on key insights."
-       )
-   else:
-       full_prompt = (
-           f"Data: {self._prepare_data_context()}\n"
-           f"User: {user_prompt}\n"
-           "Respond concisely and accurately based on the data."
-       )
-
-   try:
-       response = self.gemini_model.generate_content(full_prompt)
-       return response.text
-   except Exception as e:
-       logging.error(f"LLM error: {e}")
-       return "Sorry, I couldn't process that right now."
+   with self.lock:  # Use lock for thread safety
+       if re.search(r"(report|summary|detailed|analysis|conclusion)", user_prompt.lower()):
+           full_prompt = (
+               f"Data: {self._prepare_data_context()}\n"
+               f"User: {user_prompt}\n"
+               "Generate a concise conclusion or report based on the data. Focus on key insights."
+           )
+       else:
+           full_prompt = (
+               f"Data: {self._prepare_data_context()}\n"
+               f"User: {user_prompt}\n"
+               "Respond concisely and accurately based on the data."
+           )
+
+       if not self.gemini_model:
+           return "AI model not available. Please try again later."
+
+       try:
+           response = self.gemini_model.generate_content(full_prompt)
+           return response.text
+       except Exception as e:
+           logging.error(f"LLM error: {e}")
+           return "Sorry, I couldn't process that right now."
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _llm_response(self, user_prompt):
if re.search(r"(report|summary|detailed|analysis|conclusion)", user_prompt.lower()):
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Generate a concise conclusion or report based on the data. Focus on key insights."
)
else:
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Respond concisely and accurately based on the data."
)
try:
response = self.gemini_model.generate_content(full_prompt)
return response.text
except Exception as e:
logging.error(f"LLM error: {e}")
return "Sorry, I couldn’t process that right now."
def _llm_response(self, user_prompt):
with self.lock: # Use lock for thread safety
if re.search(r"(report|summary|detailed|analysis|conclusion)", user_prompt.lower()):
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Generate a concise conclusion or report based on the data. Focus on key insights."
)
else:
full_prompt = (
f"Data: {self._prepare_data_context()}\n"
f"User: {user_prompt}\n"
"Respond concisely and accurately based on the data."
)
if not self.gemini_model:
return "AI model not available. Please try again later."
try:
response = self.gemini_model.generate_content(full_prompt)
return response.text
except Exception as e:
logging.error(f"LLM error: {e}")
return "Sorry, I couldn't process that right now."

def chat(self, user_prompt):
rule_response = self._rule_based_response(user_prompt)
if rule_response:
return rule_response
return self._llm_response(user_prompt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for user prompts

The chat method lacks input validation for the user_prompt parameter. Consider adding validation to handle empty, overly long, or potentially harmful inputs.

def chat(self, user_prompt):
+   # Input validation
+   if not user_prompt or not isinstance(user_prompt, str):
+       return "Please provide a valid question about air quality."
+   if len(user_prompt) > 500:  # Reasonable limit
+       return "Your question is too long. Please keep it under 500 characters."
+
    rule_response = self._rule_based_response(user_prompt)
    if rule_response:
        return rule_response
    return self._llm_response(user_prompt)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def chat(self, user_prompt):
rule_response = self._rule_based_response(user_prompt)
if rule_response:
return rule_response
return self._llm_response(user_prompt)
def chat(self, user_prompt):
# Input validation
if not user_prompt or not isinstance(user_prompt, str):
return "Please provide a valid question about air quality."
if len(user_prompt) > 500: # Reasonable limit
return "Your question is too long. Please keep it under 500 characters."
rule_response = self._rule_based_response(user_prompt)
if rule_response:
return rule_response
return self._llm_response(user_prompt)


81 changes: 81 additions & 0 deletions src/spatial/views/chatbot_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from flask import request, jsonify
# Assuming these are in models/chatbot_model.py based on the import
from models.chatbot_model import AirQualityChatbot, DataFetcher
import numpy as np
import logging

#logging.basicConfig(filename="report_log.log", level=logging.INFO, filemode="w")
logger = logging.getLogger(__name__)

class ChatbotView:
@staticmethod
def chat_endpoint():
"""
Handles chatbot API requests for air quality information
Expects JSON payload with grid_id, start_time, end_time, and prompt
Returns JSON response with chatbot's answer or error message
"""
# Validate request payload
payload = request.json
if not payload or not all(key in payload for key in ["grid_id", "start_time", "end_time", "prompt"]):
logger.error("Invalid payload: missing required fields")
return jsonify({
"error": "Missing required fields: grid_id, start_time, end_time, prompt",
"status": "failure"
}), 400

# Extract parameters
grid_id = payload["grid_id"]
start_time = payload["start_time"]
end_time = payload["end_time"]
user_prompt = payload["prompt"]

# Validate prompt
if not user_prompt or not isinstance(user_prompt, str):
logger.error(f"Invalid prompt received: {user_prompt}")
return jsonify({
"error": "No valid prompt provided",
"status": "failure"
}), 400
Comment on lines +33 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add additional validation for time format and grid_id.

The current implementation only validates the presence of grid_id, start_time, and end_time, and does some basic validation on the prompt. Consider adding more thorough validation for date/time formats and grid_id format/existence.

# After extracting parameters, add:
+        # Validate time formats
+        try:
+            # Use appropriate datetime parsing based on your expected format
+            from datetime import datetime
+            datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ")
+            datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ")
+            
+            # Ensure start_time is before end_time
+            if datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") >= datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ"):
+                logger.error(f"Invalid time range: start_time must be before end_time")
+                return jsonify({
+                    "error": "Invalid time range: start_time must be before end_time",
+                    "status": "failure"
+                }), 400
+        except ValueError:
+            logger.error(f"Invalid time format for start_time or end_time")
+            return jsonify({
+                "error": "Invalid time format. Expected format: YYYY-MM-DDThh:mm:ssZ",
+                "status": "failure"
+            }), 400
+            
+        # Validate grid_id format if needed
+        # For example, if grid_id should be a UUID or follow a specific pattern
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Validate prompt
if not user_prompt or not isinstance(user_prompt, str):
logger.error(f"Invalid prompt received: {user_prompt}")
return jsonify({
"error": "No valid prompt provided",
"status": "failure"
}), 400
# Assume parameters such as start_time, end_time, and grid_id have already been extracted
# Validate time formats
try:
# Use appropriate datetime parsing based on your expected format
from datetime import datetime
datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ")
datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ")
# Ensure start_time is before end_time
if datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") >= datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ"):
logger.error(f"Invalid time range: start_time must be before end_time")
return jsonify({
"error": "Invalid time range: start_time must be before end_time",
"status": "failure"
}), 400
except ValueError:
logger.error(f"Invalid time format for start_time or end_time")
return jsonify({
"error": "Invalid time format. Expected format: YYYY-MM-DDThh:mm:ssZ",
"status": "failure"
}), 400
# Validate grid_id format if needed
# For example, if grid_id should be a UUID or follow a specific pattern
# Validate prompt
if not user_prompt or not isinstance(user_prompt, str):
logger.error(f"Invalid prompt received: {user_prompt}")
return jsonify({
"error": "No valid prompt provided",
"status": "failure"
}), 400


try:
# Fetch air quality data with logging
logger.info(f"Fetching data for grid_id: {grid_id}, {start_time} to {end_time}")
air_quality_data = DataFetcher.fetch_air_quality_data_a(grid_id, start_time, end_time)

if not air_quality_data or 'airquality' not in air_quality_data:
logger.error(f"No valid air quality data returned for grid_id: {grid_id}")
return jsonify({
"error": "Failed to fetch air quality data",
"status": "failure"
}), 500

# Initialize chatbot and get response
chatbot = AirQualityChatbot(air_quality_data)
response = chatbot.chat(user_prompt)

if not response:
logger.warning(f"Empty response generated for prompt: {user_prompt}")
return jsonify({
"error": "No response generated",
"status": "failure"
}), 500

logger.info(f"Successfully processed request for {grid_id}")
return jsonify({
"response": response,
"status": "success",
"grid_id": grid_id,
"period": {
"start_time": start_time,
"end_time": end_time
}
}), 200

except Exception as e:
logger.error(f"Unhandled exception: {str(e)}")
return jsonify({
"error": "Internal server error",
"status": "failure",
"details": str(e)
}), 500
Loading