Skip to content

Commit

Permalink
Merge pull request #4017 from airqo-platform/migrating-device-monitoring
Browse files Browse the repository at this point in the history
Migrating device-monitoring into device-registry microservice
  • Loading branch information
Baalmart authored Jan 9, 2025
2 parents a890194 + db75ec3 commit 4a24d39
Show file tree
Hide file tree
Showing 29 changed files with 7,741 additions and 353 deletions.
216 changes: 216 additions & 0 deletions src/device-registry/bin/jobs/device-status-check-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
const constants = require("@config/constants");
const log4js = require("log4js");
const logger = log4js.getLogger(
`${constants.ENVIRONMENT} -- /bin/jobs/device-status-check-job`
);
const cron = require("node-cron");
const moment = require("moment-timezone");
const axios = require("axios");
const { logText, logObject } = require("@utils/log");
const DeviceModel = require("@models/Device");
const DeviceStatusModel = require("@models/DeviceStatus");

const TIMEZONE = moment.tz.guess();
const MAX_ONLINE_ACCEPTABLE_DURATION = 3600; // 1 hour
const DUE_FOR_MAINTENANCE_DURATION = 86400 * 7; // 7 days
const BATCH_SIZE = 100; // Process devices in batches for better memory management

const processDeviceBatch = async (devices) => {
const metrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
power: { solar: 0, mains: 0, alternator: 0 },
maintenance: { due: 0, overdue: 0, unspecified: 0 },
};

const currentDateTime = new Date();

await Promise.all(
devices.map(async (device) => {
try {
if (!device.device_number) return;

const response = await axios.get(
`${process.env.RECENT_FEEDS_URL}?channel=${device.device_number}`,
{
httpsAgent: new https.Agent({ rejectUnauthorized: false }),
timeout: 5000, // Add timeout to prevent hanging requests
}
);

const deviceStatus = {
device_id: device._id,
name: device.name,
latitude: device.latitude,
longitude: device.longitude,
};

if (response.status === 200) {
const result = response.data;
const lastFeedDateTime = new Date(result.created_at);
const timeDifference = (currentDateTime - lastFeedDateTime) / 1000;

deviceStatus.elapsed_time = timeDifference;

if (timeDifference <= MAX_ONLINE_ACCEPTABLE_DURATION) {
metrics.online.devices.push(deviceStatus);
metrics.online.count++;
} else {
metrics.offline.devices.push(deviceStatus);
metrics.offline.count++;
}

// Update power metrics
const powerType = (
device.powerType ||
device.power ||
""
).toLowerCase();
if (powerType === "solar") metrics.power.solar++;
else if (powerType === "mains") metrics.power.mains++;
else if (["alternator", "battery"].includes(powerType))
metrics.power.alternator++;

// Update maintenance metrics
if (device.nextMaintenance) {
const maintenanceDuration =
(currentDateTime - new Date(device.nextMaintenance)) / 1000;
if (
maintenanceDuration <= 0 &&
Math.abs(maintenanceDuration) <= DUE_FOR_MAINTENANCE_DURATION
) {
metrics.maintenance.due++;
} else if (maintenanceDuration > 0) {
metrics.maintenance.overdue++;
}
} else {
metrics.maintenance.unspecified++;
}
} else {
metrics.offline.devices.push({ ...deviceStatus, elapsed_time: -1 });
metrics.offline.count++;
}
} catch (error) {
logger.error(
`Error processing device ${device.name}: ${error.message}`
);
}
})
);

return metrics;
};

const computeDeviceChannelStatus = async (tenant) => {
try {
const startTime = Date.now();
logText("Starting device status check...");

// Get active devices count first
const totalActiveDevices = await DeviceModel(tenant).countDocuments({
$and: [
{
$or: [
{ isActive: true },
{
$and: [
{ mobility: { $exists: true } },
{ powerType: { $exists: true } },
],
},
],
},
{ network: "airqo" },
],
});

// Process devices in batches
let processedCount = 0;
const finalMetrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
power: { solar: 0, mains: 0, alternator: 0 },
maintenance: { due: 0, overdue: 0, unspecified: 0 },
};

while (processedCount < totalActiveDevices) {
const devices = await DeviceModel(tenant)
.find({
$and: [
{
$or: [
{ isActive: true },
{
$and: [
{ mobility: { $exists: true } },
{ powerType: { $exists: true } },
],
},
],
},
{ network: "airqo" },
],
})
.skip(processedCount)
.limit(BATCH_SIZE)
.lean();

const batchMetrics = await processDeviceBatch(devices);

// Merge batch metrics with final metrics
Object.keys(finalMetrics).forEach((key) => {
if (Array.isArray(finalMetrics[key].devices)) {
finalMetrics[key].devices.push(...batchMetrics[key].devices);
finalMetrics[key].count += batchMetrics[key].count;
} else {
Object.keys(finalMetrics[key]).forEach((subKey) => {
finalMetrics[key][subKey] += batchMetrics[key][subKey];
});
}
});

processedCount += devices.length;
logText(`Processed ${processedCount}/${totalActiveDevices} devices`);
}

// Save status record
const deviceStatusRecord = new DeviceStatusModel({
created_at: new Date(),
total_active_device_count: totalActiveDevices,
metrics: {
online: {
count: finalMetrics.online.count,
devices: finalMetrics.online.devices,
},
offline: {
count: finalMetrics.offline.count,
devices: finalMetrics.offline.devices,
},
},
power_metrics: finalMetrics.power,
maintenance_metrics: finalMetrics.maintenance,
});

await deviceStatusRecord.save();

const duration = (Date.now() - startTime) / 1000;
logger.info(`Device status check completed in ${duration}s`);
logObject("Final metrics", finalMetrics);
} catch (error) {
logger.error(`Error in device status check: ${error.message}`);
logger.error(`Stack trace: ${error.stack}`);
}
};

// Run the job for 'airqo' tenant
const runDeviceStatusCheck = async () => {
await computeDeviceChannelStatus("airqo");
};

// Schedule the job (every 2 hours)
cron.schedule("0 */2 * * *", runDeviceStatusCheck, {
scheduled: true,
timezone: TIMEZONE,
});

module.exports = { computeDeviceChannelStatus, runDeviceStatusCheck };
176 changes: 176 additions & 0 deletions src/device-registry/bin/jobs/device-status-hourly-check-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
const constants = require("@config/constants");
const log4js = require("log4js");
const logger = log4js.getLogger(
`${constants.ENVIRONMENT} -- /bin/jobs/device-status-hourly-check-job`
);
const cron = require("node-cron");
const moment = require("moment-timezone");
const math = require("mathjs");
const DeviceModel = require("@models/Device");
const DeviceStatusModel = require("@models/DeviceStatus");
const createFeedUtil = require("@utils/create-feed");

const TIMEZONE = moment.tz.guess();
const BATCH_SIZE = 50;
const OFFLINE_THRESHOLD_HOURS = 24;

const convertSecondsToReadableFormat = (secondsToConvert) => {
const days = Math.floor(secondsToConvert / (24 * 3600));
secondsToConvert %= 24 * 3600;
const hours = Math.floor(secondsToConvert / 3600);
secondsToConvert %= 3600;
const minutes = Math.floor(secondsToConvert / 60);
const seconds = Math.floor(secondsToConvert % 60);

return `${days} days ${hours} hours ${minutes} minutes ${seconds} seconds`;
};

const getDeviceLastFeed = async (channelID) => {
try {
const api_key = await createFeedUtil.getAPIKey(channelID);
const request = { channel: channelID, api_key };
const thingspeakData = await createFeedUtil.fetchThingspeakData(request);
const { status, data } = createFeedUtil.handleThingspeakResponse(
thingspeakData
);

if (status === 200) {
return data;
}
return null;
} catch (error) {
logger.error(
`Error getting last feed for channel ${channelID}: ${error.message}`
);
return null;
}
};

const processDeviceBatch = async (devices) => {
const metrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
};

await Promise.all(
devices.map(async (device) => {
try {
if (!device.channelID) return;

const lastFeed = await getDeviceLastFeed(device.channelID);

if (lastFeed) {
const currentDateTime = new Date();
const lastFeedDateTime = new Date(lastFeed.created_at);

const timeDifferenceHours =
(currentDateTime - lastFeedDateTime) / 3600000;
const timeDifferenceSeconds =
(currentDateTime - lastFeedDateTime) / 1000;

const deviceMetric = {
device_id: device._id,
name: device.name,
channelID: device.channelID,
elapsed_time: timeDifferenceHours,
elapsed_time_readable: convertSecondsToReadableFormat(
timeDifferenceSeconds
),
latitude: device.latitude,
longitude: device.longitude,
};

if (timeDifferenceHours > OFFLINE_THRESHOLD_HOURS) {
metrics.offline.devices.push(deviceMetric);
metrics.offline.count++;
} else {
metrics.online.devices.push(deviceMetric);
metrics.online.count++;
}
}
} catch (error) {
logger.error(
`Error processing device ${device.name}: ${error.message}`
);
}
})
);

return metrics;
};

const deviceStatusHourlyCheck = async () => {
try {
const startTime = Date.now();
logger.info("Starting hourly device status check...");

const totalActiveDevices = await DeviceModel("airqo").countDocuments({
locationID: { $ne: "" },
isActive: true,
});

let processedCount = 0;
const finalMetrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
};

while (processedCount < totalActiveDevices) {
const devices = await DeviceModel("airqo")
.find({
locationID: { $ne: "" },
isActive: true,
})
.skip(processedCount)
.limit(BATCH_SIZE)
.lean();

const batchMetrics = await processDeviceBatch(devices);

["online", "offline"].forEach((status) => {
finalMetrics[status].devices.push(...batchMetrics[status].devices);
finalMetrics[status].count += batchMetrics[status].count;
});

processedCount += devices.length;
logger.info(`Processed ${processedCount}/${totalActiveDevices} devices`);
}

const total = finalMetrics.online.count + finalMetrics.offline.count;
finalMetrics.online.percentage = math.floor(
(finalMetrics.online.count / total) * 100
);
finalMetrics.offline.percentage = math.floor(
(finalMetrics.offline.count / total) * 100
);

const deviceStatusRecord = new DeviceStatusModel({
created_at: new Date(),
total_active_device_count: total,
metrics: finalMetrics,
check_type: "hourly",
});

await deviceStatusRecord.save();

const duration = (Date.now() - startTime) / 1000;
logger.info(`
Device Status Check Complete (${duration}s)
Total Devices: ${total}
Online Devices: ${finalMetrics.online.count} (${finalMetrics.online.percentage}%)
Offline Devices: ${finalMetrics.offline.count} (${finalMetrics.offline.percentage}%)
`);
} catch (error) {
logger.error(`Error in device status hourly check: ${error.message}`);
logger.error(`Stack trace: ${error.stack}`);
}
};

logger.info("Device status hourly check job is now running.....");

cron.schedule("0 * * * *", deviceStatusHourlyCheck, {
scheduled: true,
timezone: TIMEZONE,
});

module.exports = { deviceStatusHourlyCheck };
Loading

0 comments on commit 4a24d39

Please sign in to comment.