diff --git a/k8s/data-mgt/values-prod.yaml b/k8s/data-mgt/values-prod.yaml index 095a9fdf11..3a41d5eba1 100644 --- a/k8s/data-mgt/values-prod.yaml +++ b/k8s/data-mgt/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api - tag: prod-35663377-1741203957 + tag: prod-5eb51703-1741348989 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 79b20d2f86..dfe9c65775 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-35663377-1741203957 + tag: prod-5eb51703-1741348989 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 14b0be706a..60e8f9b7b9 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-35663377-1741203957 + tag: prod-5eb51703-1741348989 api: name: airqo-prediction-api label: prediction-api diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index b6359d466d..83ab836669 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-35663377-1741203957 + tag: prod-5eb51703-1741348989 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/workflows/values-stage.yaml b/k8s/workflows/values-stage.yaml index 90407dd164..1febd0caa4 100644 --- a/k8s/workflows/values-stage.yaml +++ b/k8s/workflows/values-stage.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis containers: eu.gcr.io/airqo-250220/airqo-stage-workflows - tag: stage-bc9720b8-1741203916 + tag: stage-2116d62a-1741348949 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/src/device-registry/controllers/event.controller.js b/src/device-registry/controllers/event.controller.js index 2bf9b7ef59..adefb4f33c 100644 --- a/src/device-registry/controllers/event.controller.js +++ b/src/device-registry/controllers/event.controller.js @@ -449,6 +449,9 @@ const processCohortIds = async (cohort_ids, request) => { (result) => !(result.success === false) ); + // join the array of arrays into a single array + const flattened = [].concat(...validDeviceIdResults); + if (isEmpty(invalidDeviceIdResults) && validDeviceIdResults.length > 0) { request.query.device_id = validDeviceIdResults.join(","); } @@ -2975,6 +2978,168 @@ const createEvent = { return; } }, + getWorstReadingForSites: async (req, res, next) => { + try { + const errors = extractErrorsFromRequest(req); + if (errors) { + next( + new HttpError("bad request errors", httpStatus.BAD_REQUEST, errors) + ); + return; + } + + const request = req; + const defaultTenant = constants.DEFAULT_TENANT || "airqo"; + request.query.tenant = isEmpty(req.query.tenant) + ? defaultTenant + : req.query.tenant; + + const { site_id, grid_id } = { ...req.query, ...req.params }; + let locationErrors = 0; + + let siteIds = []; + if (Array.isArray(site_id)) { + siteIds = site_id.map(String); + } else if (site_id) { + siteIds = [String(site_id)]; + } + + if (isEmpty(siteIds) && !isEmpty(grid_id)) { + await processGridIds(grid_id, request); + if (isEmpty(request.query.site_id)) { + locationErrors++; + } else { + siteIds = request.query.site_id.split(","); + } + } + + if (locationErrors === 0) { + const result = await createEventUtil.getWorstReadingForSites({ + siteIds, + next, + }); + + if (isEmpty(result) || res.headersSent) { + return; + } + + if (result.success === true) { + const status = result.status || httpStatus.OK; + res.status(status).json({ + success: true, + message: result.message, + data: result.data, + }); + } else { + const errorStatus = result.status || httpStatus.INTERNAL_SERVER_ERROR; + res.status(errorStatus).json({ + success: false, + errors: result.errors || { message: "" }, + message: result.message, + }); + } + } else { + res.status(httpStatus.BAD_REQUEST).json({ + success: false, + errors: { + message: `Unable to process measurements for the provided site IDs`, + }, + message: "Bad Request Error", + }); + } + } catch (error) { + logger.error(`🐛🐛 Internal Server Error ${error.message}`); + next( + new HttpError( + "Internal Server Error", + httpStatus.INTERNAL_SERVER_ERROR, + { message: error.message } + ) + ); + return; + } + }, + getWorstReadingForDevices: async (req, res, next) => { + try { + const errors = extractErrorsFromRequest(req); + if (errors) { + next( + new HttpError("bad request errors", httpStatus.BAD_REQUEST, errors) + ); + return; + } + + const request = req; + const defaultTenant = constants.DEFAULT_TENANT || "airqo"; + request.query.tenant = isEmpty(req.query.tenant) + ? defaultTenant + : req.query.tenant; + + const { device_id, cohort_id } = { ...req.query, ...req.params }; + let locationErrors = 0; + + let deviceIds = []; + if (Array.isArray(device_id)) { + deviceIds = device_id.map(String); + } else if (device_id) { + deviceIds = [String(device_id)]; + } + + if (isEmpty(deviceIds) && !isEmpty(cohort_id)) { + await processCohortIds(cohort_id, request); + if (isEmpty(request.query.device_id)) { + locationErrors++; + } else { + deviceIds = request.query.device_id.split(","); + } + } + logObject("deviceIds", deviceIds); + if (locationErrors === 0) { + const result = await createEventUtil.getWorstReadingForDevices({ + deviceIds, + next, + }); + + if (isEmpty(result) || res.headersSent) { + return; + } + + if (result.success === true) { + const status = result.status || httpStatus.OK; + res.status(status).json({ + success: true, + message: result.message, + data: result.data, + }); + } else { + const errorStatus = result.status || httpStatus.INTERNAL_SERVER_ERROR; + res.status(errorStatus).json({ + success: false, + errors: result.errors || { message: "" }, + message: result.message, + }); + } + } else { + res.status(httpStatus.BAD_REQUEST).json({ + success: false, + errors: { + message: `Unable to process measurements for the provided device IDs`, + }, + message: "Bad Request Error", + }); + } + } catch (error) { + logger.error(`🐛🐛 Internal Server Error ${error.message}`); + next( + new HttpError( + "Internal Server Error", + httpStatus.INTERNAL_SERVER_ERROR, + { message: error.message } + ) + ); + return; + } + }, }; module.exports = createEvent; diff --git a/src/device-registry/models/Reading.js b/src/device-registry/models/Reading.js index a0b599145e..e607401c3c 100644 --- a/src/device-registry/models/Reading.js +++ b/src/device-registry/models/Reading.js @@ -766,6 +766,97 @@ ReadingsSchema.statics.getAirQualityAnalytics = async function(siteId, next) { } }; +ReadingsSchema.statics.getWorstPm2_5Reading = async function({ + siteIds = [], + next, +} = {}) { + try { + if (isEmpty(siteIds) || !Array.isArray(siteIds)) { + next( + new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, { + message: "siteIds array is required", + }) + ); + return; + } + if (siteIds.length === 0) { + return { + success: true, + message: "No site_ids were provided", + data: [], + status: httpStatus.OK, + }; + } + + // Validate siteIds type + if (!siteIds.every((id) => typeof id === "string")) { + next( + new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, { + message: "siteIds must be an array of strings", + }) + ); + return; + } + + const formattedSiteIds = siteIds.map((id) => id.toString()); + const threeDaysAgo = new Date(); + threeDaysAgo.setDate(threeDaysAgo.getDate() - 3); + const pipeline = this.aggregate([ + { + $match: { + site_id: { $in: formattedSiteIds }, + time: { $gte: threeDaysAgo }, + "pm2_5.value": { $exists: true }, // Ensure pm2_5.value exists + }, + }, + { + $sort: { "pm2_5.value": -1, time: -1 }, // Sort by pm2_5 descending, then by time + }, + { + $limit: 1, // Take only the worst reading + }, + { + $project: { + _id: 0, // Exclude the MongoDB-generated _id + site_id: 1, + time: 1, + pm2_5: 1, + device: 1, + device_id: 1, + siteDetails: 1, + }, + }, + ]).allowDiskUse(true); + + const worstReading = await pipeline.exec(); + + if (!isEmpty(worstReading)) { + return { + success: true, + message: "Successfully retrieved the worst pm2_5 reading.", + data: worstReading[0], + status: httpStatus.OK, + }; + } else { + return { + success: true, + message: + "No pm2_5 readings found for the specified site_ids in the last three days.", + data: {}, + status: httpStatus.OK, + }; + } + } catch (error) { + logger.error(`🐛🐛 Internal Server Error -- ${error.message}`); + next( + new HttpError("Internal Server Error", httpStatus.INTERNAL_SERVER_ERROR, { + message: error.message, + }) + ); + return; + } +}; + const ReadingModel = (tenant) => { const defaultTenant = constants.DEFAULT_TENANT || "airqo"; const dbTenant = isEmpty(tenant) ? defaultTenant : tenant; diff --git a/src/device-registry/routes/v2/readings.routes.js b/src/device-registry/routes/v2/readings.routes.js index a9b7a44b6f..43be57a180 100644 --- a/src/device-registry/routes/v2/readings.routes.js +++ b/src/device-registry/routes/v2/readings.routes.js @@ -23,6 +23,18 @@ router.get( eventController.recentReadings ); +router.get( + "/worst/devices", + readingsValidations.worstReadingForDevices, + eventController.getWorstReadingForDevices +); + +router.get( + "/worst/sites", + readingsValidations.worstReadingForSites, + eventController.getWorstReadingForSites +); + router.get( "/sites/:site_id/averages", readingsValidations.listAverages, diff --git a/src/device-registry/utils/event.util.js b/src/device-registry/utils/event.util.js index b4fb39140d..3241b76367 100644 --- a/src/device-registry/utils/event.util.js +++ b/src/device-registry/utils/event.util.js @@ -1583,6 +1583,118 @@ const createEvent = { return; } }, + getWorstReadingForSites: async (req, res, next) => { + try { + const siteIds = req.body.siteIds; // Assuming you pass the siteIds in the request body + + const result = await ReadingModel("airqo").getWorstPm2_5Reading({ + siteIds, + next, + }); + + if (result.success) { + res.status(result.status).json(result); + } else { + // Handle errors based on result.message and result.errors + next(result); + } + } catch (error) { + // Handle unexpected errors + next(error); + } + }, + getWorstReadingForDevices: async ({ deviceIds = [], next } = {}) => { + try { + if (isEmpty(deviceIds) || !Array.isArray(deviceIds)) { + next( + new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, { + message: "deviceIds array is required", + }) + ); + return; + } + if (deviceIds.length === 0) { + return { + success: true, + message: "No device_ids were provided", + data: [], + status: httpStatus.OK, + }; + } + + // Validate deviceIds type + if (!deviceIds.every((id) => typeof id === "string")) { + next( + new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, { + message: "deviceIds must be an array of strings", + }) + ); + return; + } + + const formattedDeviceIds = deviceIds.map((id) => id.toString()); + const threeDaysAgo = new Date(); + threeDaysAgo.setDate(threeDaysAgo.getDate() - 3); + const pipeline = ReadingModel("airqo") + .aggregate([ + { + $match: { + device_id: { $in: formattedDeviceIds }, + time: { $gte: threeDaysAgo }, + "pm2_5.value": { $exists: true }, // Ensure pm2_5.value exists + }, + }, + { + $sort: { "pm2_5.value": -1, time: -1 }, // Sort by pm2_5 descending, then by time + }, + { + $limit: 1, // Take only the worst reading + }, + { + $project: { + _id: 0, // Exclude the MongoDB-generated _id + device_id: 1, + time: 1, + pm2_5: 1, + device: 1, + siteDetails: 1, + }, + }, + ]) + .allowDiskUse(true); + + const worstReading = await pipeline.exec(); + + if (!isEmpty(worstReading)) { + return { + success: true, + message: "Successfully retrieved the worst pm2_5 reading.", + data: worstReading[0], + status: httpStatus.OK, + }; + } else { + return { + success: true, + message: + "No pm2_5 readings found for the specified device_ids in the last three days.", + data: {}, + status: httpStatus.OK, + }; + } + } catch (error) { + logger.error(`🐛🐛 Internal Server Error -- ${error.message}`); + next( + new HttpError( + "Internal Server Error", + httpStatus.INTERNAL_SERVER_ERROR, + { + message: error.message, + } + ) + ); + return; + } + }, listReadingAverages: async (request, next) => { try { let missingDataMessage = ""; diff --git a/src/device-registry/validators/readings.validators.js b/src/device-registry/validators/readings.validators.js index ab9de4a007..383d3fbcb1 100644 --- a/src/device-registry/validators/readings.validators.js +++ b/src/device-registry/validators/readings.validators.js @@ -338,6 +338,15 @@ const commonValidations = { return true; }), ], + atLeastOneRequired: (fields, message) => [ + query().custom((value, { req }) => { + const hasAtLeastOne = fields.some((field) => req.query[field]); + if (!hasAtLeastOne) { + throw new Error(message); + } + return true; + }), + ], }; const readingsValidations = { @@ -413,6 +422,26 @@ const readingsValidations = { ...commonValidations.test, ], listRecent: [...commonValidations.tenant], + worstReadingForDevices: [ + ...commonValidations.atLeastOneRequired( + ["cohort_id", "device_id"], + "At least one of cohort_id or device_id is required." + ), + commonValidations.objectId("cohort_id"), + commonValidations.objectId("device_id"), + ...commonValidations.checkConflictingParams("cohort_id", "device_id"), + ...commonValidations.checkForEmptyArrays(["cohort_id", "device_id"]), + ], + worstReadingForSites: [ + ...commonValidations.atLeastOneRequired( + ["grid_id", "site_id"], + "At least one of grid_id or site_id is required." + ), + commonValidations.objectId("grid_id"), + commonValidations.objectId("site_id"), + ...commonValidations.checkConflictingParams("grid_id", "site_id"), + ...commonValidations.checkForEmptyArrays(["grid_id", "site_id"]), + ], }; module.exports = {