Skip to content

Commit

Permalink
Merge pull request #4537 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Mar 7, 2025
2 parents 5eb5170 + 142d01d commit 031d46b
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 5 deletions.
2 changes: 1 addition & 1 deletion k8s/data-mgt/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api
tag: prod-35663377-1741203957
tag: prod-5eb51703-1741348989
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-35663377-1741203957
tag: prod-5eb51703-1741348989
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-35663377-1741203957
tag: prod-5eb51703-1741348989
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-35663377-1741203957
tag: prod-5eb51703-1741348989
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-bc9720b8-1741203916
tag: stage-2116d62a-1741348949
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
165 changes: 165 additions & 0 deletions src/device-registry/controllers/event.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ const processCohortIds = async (cohort_ids, request) => {
(result) => !(result.success === false)
);

// join the array of arrays into a single array
const flattened = [].concat(...validDeviceIdResults);

if (isEmpty(invalidDeviceIdResults) && validDeviceIdResults.length > 0) {
request.query.device_id = validDeviceIdResults.join(",");
}
Expand Down Expand Up @@ -2975,6 +2978,168 @@ const createEvent = {
return;
}
},
getWorstReadingForSites: async (req, res, next) => {
try {
const errors = extractErrorsFromRequest(req);
if (errors) {
next(
new HttpError("bad request errors", httpStatus.BAD_REQUEST, errors)
);
return;
}

const request = req;
const defaultTenant = constants.DEFAULT_TENANT || "airqo";
request.query.tenant = isEmpty(req.query.tenant)
? defaultTenant
: req.query.tenant;

const { site_id, grid_id } = { ...req.query, ...req.params };
let locationErrors = 0;

let siteIds = [];
if (Array.isArray(site_id)) {
siteIds = site_id.map(String);
} else if (site_id) {
siteIds = [String(site_id)];
}

if (isEmpty(siteIds) && !isEmpty(grid_id)) {
await processGridIds(grid_id, request);
if (isEmpty(request.query.site_id)) {
locationErrors++;
} else {
siteIds = request.query.site_id.split(",");
}
}

if (locationErrors === 0) {
const result = await createEventUtil.getWorstReadingForSites({
siteIds,
next,
});

if (isEmpty(result) || res.headersSent) {
return;
}

if (result.success === true) {
const status = result.status || httpStatus.OK;
res.status(status).json({
success: true,
message: result.message,
data: result.data,
});
} else {
const errorStatus = result.status || httpStatus.INTERNAL_SERVER_ERROR;
res.status(errorStatus).json({
success: false,
errors: result.errors || { message: "" },
message: result.message,
});
}
} else {
res.status(httpStatus.BAD_REQUEST).json({
success: false,
errors: {
message: `Unable to process measurements for the provided site IDs`,
},
message: "Bad Request Error",
});
}
} catch (error) {
logger.error(`🐛🐛 Internal Server Error ${error.message}`);
next(
new HttpError(
"Internal Server Error",
httpStatus.INTERNAL_SERVER_ERROR,
{ message: error.message }
)
);
return;
}
},
getWorstReadingForDevices: async (req, res, next) => {
try {
const errors = extractErrorsFromRequest(req);
if (errors) {
next(
new HttpError("bad request errors", httpStatus.BAD_REQUEST, errors)
);
return;
}

const request = req;
const defaultTenant = constants.DEFAULT_TENANT || "airqo";
request.query.tenant = isEmpty(req.query.tenant)
? defaultTenant
: req.query.tenant;

const { device_id, cohort_id } = { ...req.query, ...req.params };
let locationErrors = 0;

let deviceIds = [];
if (Array.isArray(device_id)) {
deviceIds = device_id.map(String);
} else if (device_id) {
deviceIds = [String(device_id)];
}

if (isEmpty(deviceIds) && !isEmpty(cohort_id)) {
await processCohortIds(cohort_id, request);
if (isEmpty(request.query.device_id)) {
locationErrors++;
} else {
deviceIds = request.query.device_id.split(",");
}
}
logObject("deviceIds", deviceIds);
if (locationErrors === 0) {
const result = await createEventUtil.getWorstReadingForDevices({
deviceIds,
next,
});

if (isEmpty(result) || res.headersSent) {
return;
}

if (result.success === true) {
const status = result.status || httpStatus.OK;
res.status(status).json({
success: true,
message: result.message,
data: result.data,
});
} else {
const errorStatus = result.status || httpStatus.INTERNAL_SERVER_ERROR;
res.status(errorStatus).json({
success: false,
errors: result.errors || { message: "" },
message: result.message,
});
}
} else {
res.status(httpStatus.BAD_REQUEST).json({
success: false,
errors: {
message: `Unable to process measurements for the provided device IDs`,
},
message: "Bad Request Error",
});
}
} catch (error) {
logger.error(`🐛🐛 Internal Server Error ${error.message}`);
next(
new HttpError(
"Internal Server Error",
httpStatus.INTERNAL_SERVER_ERROR,
{ message: error.message }
)
);
return;
}
},
};

module.exports = createEvent;
91 changes: 91 additions & 0 deletions src/device-registry/models/Reading.js
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,97 @@ ReadingsSchema.statics.getAirQualityAnalytics = async function(siteId, next) {
}
};

ReadingsSchema.statics.getWorstPm2_5Reading = async function({
siteIds = [],
next,
} = {}) {
try {
if (isEmpty(siteIds) || !Array.isArray(siteIds)) {
next(
new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, {
message: "siteIds array is required",
})
);
return;
}
if (siteIds.length === 0) {
return {
success: true,
message: "No site_ids were provided",
data: [],
status: httpStatus.OK,
};
}

// Validate siteIds type
if (!siteIds.every((id) => typeof id === "string")) {
next(
new HttpError("Bad Request Error", httpStatus.BAD_REQUEST, {
message: "siteIds must be an array of strings",
})
);
return;
}

const formattedSiteIds = siteIds.map((id) => id.toString());
const threeDaysAgo = new Date();
threeDaysAgo.setDate(threeDaysAgo.getDate() - 3);
const pipeline = this.aggregate([
{
$match: {
site_id: { $in: formattedSiteIds },
time: { $gte: threeDaysAgo },
"pm2_5.value": { $exists: true }, // Ensure pm2_5.value exists
},
},
{
$sort: { "pm2_5.value": -1, time: -1 }, // Sort by pm2_5 descending, then by time
},
{
$limit: 1, // Take only the worst reading
},
{
$project: {
_id: 0, // Exclude the MongoDB-generated _id
site_id: 1,
time: 1,
pm2_5: 1,
device: 1,
device_id: 1,
siteDetails: 1,
},
},
]).allowDiskUse(true);

const worstReading = await pipeline.exec();

if (!isEmpty(worstReading)) {
return {
success: true,
message: "Successfully retrieved the worst pm2_5 reading.",
data: worstReading[0],
status: httpStatus.OK,
};
} else {
return {
success: true,
message:
"No pm2_5 readings found for the specified site_ids in the last three days.",
data: {},
status: httpStatus.OK,
};
}
} catch (error) {
logger.error(`🐛🐛 Internal Server Error -- ${error.message}`);
next(
new HttpError("Internal Server Error", httpStatus.INTERNAL_SERVER_ERROR, {
message: error.message,
})
);
return;
}
};

const ReadingModel = (tenant) => {
const defaultTenant = constants.DEFAULT_TENANT || "airqo";
const dbTenant = isEmpty(tenant) ? defaultTenant : tenant;
Expand Down
12 changes: 12 additions & 0 deletions src/device-registry/routes/v2/readings.routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ router.get(
eventController.recentReadings
);

router.get(
"/worst/devices",
readingsValidations.worstReadingForDevices,
eventController.getWorstReadingForDevices
);

router.get(
"/worst/sites",
readingsValidations.worstReadingForSites,
eventController.getWorstReadingForSites
);

router.get(
"/sites/:site_id/averages",
readingsValidations.listAverages,
Expand Down
Loading

0 comments on commit 031d46b

Please sign in to comment.