Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrating device-monitoring into device-registry microservice #4017

Merged
merged 15 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions src/device-registry/bin/jobs/device-status-check-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
const constants = require("@config/constants");
const log4js = require("log4js");
const logger = log4js.getLogger(
`${constants.ENVIRONMENT} -- /bin/jobs/device-status-check-job`
);
const cron = require("node-cron");
const moment = require("moment-timezone");
const axios = require("axios");
const { logText, logObject } = require("@utils/log");
const DeviceModel = require("@models/Device");
const DeviceStatusModel = require("@models/DeviceStatus");

const TIMEZONE = moment.tz.guess();
const MAX_ONLINE_ACCEPTABLE_DURATION = 3600; // 1 hour
const DUE_FOR_MAINTENANCE_DURATION = 86400 * 7; // 7 days
const BATCH_SIZE = 100; // Process devices in batches for better memory management

const processDeviceBatch = async (devices) => {
const metrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
power: { solar: 0, mains: 0, alternator: 0 },
maintenance: { due: 0, overdue: 0, unspecified: 0 },
};

const currentDateTime = new Date();

await Promise.all(
devices.map(async (device) => {
try {
if (!device.device_number) return;

const response = await axios.get(
`${process.env.RECENT_FEEDS_URL}?channel=${device.device_number}`,
{
httpsAgent: new https.Agent({ rejectUnauthorized: false }),
timeout: 5000, // Add timeout to prevent hanging requests
}
);

const deviceStatus = {
device_id: device._id,
name: device.name,
latitude: device.latitude,
longitude: device.longitude,
};

if (response.status === 200) {
const result = response.data;
const lastFeedDateTime = new Date(result.created_at);
const timeDifference = (currentDateTime - lastFeedDateTime) / 1000;

deviceStatus.elapsed_time = timeDifference;

if (timeDifference <= MAX_ONLINE_ACCEPTABLE_DURATION) {
metrics.online.devices.push(deviceStatus);
metrics.online.count++;
} else {
metrics.offline.devices.push(deviceStatus);
metrics.offline.count++;
}

// Update power metrics
const powerType = (
device.powerType ||
device.power ||
""
).toLowerCase();
if (powerType === "solar") metrics.power.solar++;
else if (powerType === "mains") metrics.power.mains++;
else if (["alternator", "battery"].includes(powerType))
metrics.power.alternator++;

// Update maintenance metrics
if (device.nextMaintenance) {
const maintenanceDuration =
(currentDateTime - new Date(device.nextMaintenance)) / 1000;
if (
maintenanceDuration <= 0 &&
Math.abs(maintenanceDuration) <= DUE_FOR_MAINTENANCE_DURATION
) {
metrics.maintenance.due++;
} else if (maintenanceDuration > 0) {
metrics.maintenance.overdue++;
}
} else {
metrics.maintenance.unspecified++;
}
} else {
metrics.offline.devices.push({ ...deviceStatus, elapsed_time: -1 });
metrics.offline.count++;
}
} catch (error) {
logger.error(
`Error processing device ${device.name}: ${error.message}`
);
}
})
);

return metrics;
};

const computeDeviceChannelStatus = async (tenant) => {
try {
const startTime = Date.now();
logText("Starting device status check...");

// Get active devices count first
const totalActiveDevices = await DeviceModel(tenant).countDocuments({
$and: [
{
$or: [
{ isActive: true },
{
$and: [
{ mobility: { $exists: true } },
{ powerType: { $exists: true } },
],
},
],
},
{ network: "airqo" },
],
});

// Process devices in batches
let processedCount = 0;
const finalMetrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
power: { solar: 0, mains: 0, alternator: 0 },
maintenance: { due: 0, overdue: 0, unspecified: 0 },
};

while (processedCount < totalActiveDevices) {
const devices = await DeviceModel(tenant)
.find({
$and: [
{
$or: [
{ isActive: true },
{
$and: [
{ mobility: { $exists: true } },
{ powerType: { $exists: true } },
],
},
],
},
{ network: "airqo" },
],
})
.skip(processedCount)
.limit(BATCH_SIZE)
.lean();

const batchMetrics = await processDeviceBatch(devices);

// Merge batch metrics with final metrics
Object.keys(finalMetrics).forEach((key) => {
if (Array.isArray(finalMetrics[key].devices)) {
finalMetrics[key].devices.push(...batchMetrics[key].devices);
finalMetrics[key].count += batchMetrics[key].count;
} else {
Object.keys(finalMetrics[key]).forEach((subKey) => {
finalMetrics[key][subKey] += batchMetrics[key][subKey];
});
}
});

processedCount += devices.length;
logText(`Processed ${processedCount}/${totalActiveDevices} devices`);
}

// Save status record
const deviceStatusRecord = new DeviceStatusModel({
created_at: new Date(),
total_active_device_count: totalActiveDevices,
metrics: {
online: {
count: finalMetrics.online.count,
devices: finalMetrics.online.devices,
},
offline: {
count: finalMetrics.offline.count,
devices: finalMetrics.offline.devices,
},
},
power_metrics: finalMetrics.power,
maintenance_metrics: finalMetrics.maintenance,
});

await deviceStatusRecord.save();

const duration = (Date.now() - startTime) / 1000;
logger.info(`Device status check completed in ${duration}s`);
logObject("Final metrics", finalMetrics);
} catch (error) {
logger.error(`Error in device status check: ${error.message}`);
logger.error(`Stack trace: ${error.stack}`);
}
};

// Run the job for 'airqo' tenant
const runDeviceStatusCheck = async () => {
await computeDeviceChannelStatus("airqo");
};

// Schedule the job (every 2 hours)
cron.schedule("0 */2 * * *", runDeviceStatusCheck, {
scheduled: true,
timezone: TIMEZONE,
});

module.exports = { computeDeviceChannelStatus, runDeviceStatusCheck };
176 changes: 176 additions & 0 deletions src/device-registry/bin/jobs/device-status-hourly-check-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
const constants = require("@config/constants");
const log4js = require("log4js");
const logger = log4js.getLogger(
`${constants.ENVIRONMENT} -- /bin/jobs/device-status-hourly-check-job`
);
const cron = require("node-cron");
const moment = require("moment-timezone");
const math = require("mathjs");
const DeviceModel = require("@models/Device");
const DeviceStatusModel = require("@models/DeviceStatus");
const createFeedUtil = require("@utils/create-feed");

const TIMEZONE = moment.tz.guess();
const BATCH_SIZE = 50;
const OFFLINE_THRESHOLD_HOURS = 24;

const convertSecondsToReadableFormat = (secondsToConvert) => {
const days = Math.floor(secondsToConvert / (24 * 3600));
secondsToConvert %= 24 * 3600;
const hours = Math.floor(secondsToConvert / 3600);
secondsToConvert %= 3600;
const minutes = Math.floor(secondsToConvert / 60);
const seconds = Math.floor(secondsToConvert % 60);

return `${days} days ${hours} hours ${minutes} minutes ${seconds} seconds`;
};

const getDeviceLastFeed = async (channelID) => {
try {
const api_key = await createFeedUtil.getAPIKey(channelID);
const request = { channel: channelID, api_key };
const thingspeakData = await createFeedUtil.fetchThingspeakData(request);
const { status, data } = createFeedUtil.handleThingspeakResponse(
thingspeakData
);

if (status === 200) {
return data;
}
return null;
} catch (error) {
logger.error(
`Error getting last feed for channel ${channelID}: ${error.message}`
);
return null;
}
};

const processDeviceBatch = async (devices) => {
const metrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
};

await Promise.all(
devices.map(async (device) => {
try {
if (!device.channelID) return;

const lastFeed = await getDeviceLastFeed(device.channelID);

if (lastFeed) {
const currentDateTime = new Date();
const lastFeedDateTime = new Date(lastFeed.created_at);

const timeDifferenceHours =
(currentDateTime - lastFeedDateTime) / 3600000;
const timeDifferenceSeconds =
(currentDateTime - lastFeedDateTime) / 1000;

const deviceMetric = {
device_id: device._id,
name: device.name,
channelID: device.channelID,
elapsed_time: timeDifferenceHours,
elapsed_time_readable: convertSecondsToReadableFormat(
timeDifferenceSeconds
),
latitude: device.latitude,
longitude: device.longitude,
};

if (timeDifferenceHours > OFFLINE_THRESHOLD_HOURS) {
metrics.offline.devices.push(deviceMetric);
metrics.offline.count++;
} else {
metrics.online.devices.push(deviceMetric);
metrics.online.count++;
}
}
} catch (error) {
logger.error(
`Error processing device ${device.name}: ${error.message}`
);
}
})
);

return metrics;
};

const deviceStatusHourlyCheck = async () => {
try {
const startTime = Date.now();
logger.info("Starting hourly device status check...");

const totalActiveDevices = await DeviceModel("airqo").countDocuments({
locationID: { $ne: "" },
isActive: true,
});

let processedCount = 0;
const finalMetrics = {
online: { count: 0, devices: [] },
offline: { count: 0, devices: [] },
};

while (processedCount < totalActiveDevices) {
const devices = await DeviceModel("airqo")
.find({
locationID: { $ne: "" },
isActive: true,
})
.skip(processedCount)
.limit(BATCH_SIZE)
.lean();

const batchMetrics = await processDeviceBatch(devices);

["online", "offline"].forEach((status) => {
finalMetrics[status].devices.push(...batchMetrics[status].devices);
finalMetrics[status].count += batchMetrics[status].count;
});

processedCount += devices.length;
logger.info(`Processed ${processedCount}/${totalActiveDevices} devices`);
}

const total = finalMetrics.online.count + finalMetrics.offline.count;
finalMetrics.online.percentage = math.floor(
(finalMetrics.online.count / total) * 100
);
finalMetrics.offline.percentage = math.floor(
(finalMetrics.offline.count / total) * 100
);

const deviceStatusRecord = new DeviceStatusModel({
created_at: new Date(),
total_active_device_count: total,
metrics: finalMetrics,
check_type: "hourly",
});

await deviceStatusRecord.save();

const duration = (Date.now() - startTime) / 1000;
logger.info(`
Device Status Check Complete (${duration}s)
Total Devices: ${total}
Online Devices: ${finalMetrics.online.count} (${finalMetrics.online.percentage}%)
Offline Devices: ${finalMetrics.offline.count} (${finalMetrics.offline.percentage}%)
`);
} catch (error) {
logger.error(`Error in device status hourly check: ${error.message}`);
logger.error(`Stack trace: ${error.stack}`);
}
};

logger.info("Device status hourly check job is now running.....");

cron.schedule("0 * * * *", deviceStatusHourlyCheck, {
scheduled: true,
timezone: TIMEZONE,
});

module.exports = { deviceStatusHourlyCheck };
Loading
Loading