Skip to content

Commit

Permalink
Merge pull request #918 from COS301-SE-2024/feat/backend/read-write-o…
Browse files Browse the repository at this point in the history
…ptimisation

read-write optimisation
  • Loading branch information
Tinogwanz authored Oct 24, 2024
2 parents 08d2094 + bd387fb commit 8070931
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 179 deletions.
4 changes: 2 additions & 2 deletions nodejs_api/src/config/redis.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ export const deleteAllCache = async () => {

export const processReadQueue = () => {
const queue = getReadQueue();
queue.process(readQueueProcessor);
queue.process(1, readQueueProcessor);
};

export const processWriteQueue = () => {
const queue = getWriteQueue();
queue.process(writeQueueProcessor);
queue.process(1, writeQueueProcessor);
};
10 changes: 10 additions & 0 deletions nodejs_api/src/services/jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ export const addJobToReadQueue = async (jobData: JobData, options?: JobOptions)
return job;
};

export const addMultipleJobsToReadQueue = async (jobData: JobData[]) => {
const readQueue = getReadQueue();

const jobs = await readQueue.addBulk(jobData.map((singleJobData) => ({
data: singleJobData
})));

return jobs;
};

export const addJobToWriteQueue = async (jobData: JobData, options?: JobOptions) => {
const writeQueue = getWriteQueue();
const job = await writeQueue.add({
Expand Down
29 changes: 18 additions & 11 deletions nodejs_api/src/services/tenders.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { GetCommand, GetCommandInput, GetCommandOutput, PutCommand, PutCommandInput, QueryCommand, QueryCommandInput, QueryCommandOutput, UpdateCommand, UpdateCommandInput } from "@aws-sdk/lib-dynamodb";
import { COMPANIES_TABLE, CONTRACT_TABLE, dynamoDBDocumentClient, TENDERS_TABLE, TICKETS_TABLE } from "../config/dynamodb.config";
import { GetCommandInput, GetCommandOutput, PutCommandInput, QueryCommandInput, QueryCommandOutput, UpdateCommandInput } from "@aws-sdk/lib-dynamodb";
import { COMPANIES_TABLE, CONTRACT_TABLE, TENDERS_TABLE, TICKETS_TABLE } from "../config/dynamodb.config";
import { BadRequestError, NotFoundError } from "../types/error.types";
import { generateId, getCompanyIDFromName, getTicketDateOpened, updateTicketTable } from "../utils/tickets.utils";
import { assignCompanyName, assignLongLat, assignMuni, sendWebSocketMessage, updateContractTable, updateTenderTable } from "../utils/tenders.utils";
import WebSocket from "ws";
import { deleteAllCache, DB_GET, DB_PUT, DB_QUERY, DB_UPDATE } from "../config/redis.config";
import { addJobToReadQueue, addJobToWriteQueue } from "./jobs.service";
import { addJobToReadQueue, addJobToWriteQueue, addMultipleJobsToReadQueue } from "./jobs.service";
import { JobData } from "../types/job.types";

interface TenderData {
Expand Down Expand Up @@ -625,6 +625,7 @@ export const getMunicipalityTenders = async (municipality: string) => {

const collective: any[] = [];
const tickets = responseTickets.Items;
const tenderJobs = [];

for (const item of tickets) {
const paramsTender: QueryCommandInput = {
Expand All @@ -641,15 +642,21 @@ export const getMunicipalityTenders = async (municipality: string) => {
params: paramsTender
}

const readJobTender = await addJobToReadQueue(tendersJobData);
const responseTender = await readJobTender.finished() as QueryCommandOutput;
tenderJobs.push(tendersJobData);
}

if (responseTender.Items && responseTender.Items.length > 0) {
await assignMuni(responseTender.Items);
await assignLongLat(responseTender.Items);
await assignCompanyName(responseTender.Items);
collective.push(...responseTender.Items);
}
const readJobs = await addMultipleJobsToReadQueue(tenderJobs);

for (let jobResult of readJobs) {
const responseTenders = await jobResult.finished() as QueryCommandOutput;
const tenders = responseTenders.Items || [];
collective.push(...tenders);
}

if (collective.length > 0) {
await assignMuni(collective);
await assignLongLat(collective);
await assignCompanyName(collective);
}

return collective;
Expand Down
110 changes: 67 additions & 43 deletions nodejs_api/src/services/tickets.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ASSETS_TABLE, cognitoClient, TENDERS_TABLE, TICKET_UPDATE_TABLE, TICKET
import { generateId, generateTicketNumber, getCompanyIDFromName, getMunicipality, getTicketDateOpened, getUserProfile, updateCommentCounts, updateTicketTable, validateTicketId } from "../utils/tickets.utils";
import { uploadFile } from "../config/s3bucket.config";
import WebSocket from "ws";
import { addJobToReadQueue, addJobToWriteQueue } from "./jobs.service";
import { addJobToReadQueue, addJobToWriteQueue, addMultipleJobsToReadQueue } from "./jobs.service";
import { JobData } from "../types/job.types";
import { deleteAllCache, DB_GET, DB_PUT, DB_QUERY, DB_SCAN, DB_UPDATE, deleteCacheKey } from "../config/redis.config";
import { sendWebSocketMessage } from "../utils/tenders.utils";
Expand Down Expand Up @@ -371,26 +371,30 @@ export const getWatchlist = async (userId: string, lastEvaluatedKeyString: strin

const params: QueryCommandInput = {
TableName: WATCHLIST_TABLE,
IndexName: "user_id-ticket_id-index",
KeyConditionExpression: "user_id = :user_id",
ExpressionAttributeValues: {
":user_id": userId
},
ProjectionExpression: "ticket_id",
ScanIndexForward: false,
Limit: 15
};

if (lastEvaluatedKeyString) {
params.ExclusiveStartKey = JSON.parse(lastEvaluatedKeyString);
}

const jobsData: JobData = {
const jobData: JobData = {
type: DB_QUERY,
params: params
};

const job = await addJobToReadQueue(jobsData);
const job = await addJobToReadQueue(jobData);
const response = await job.finished() as QueryCommandOutput;

const items = response.Items;
const ticketJobs: JobData[] = [];

if (items && items.length > 0) {
for (const item of items) {
Expand All @@ -408,16 +412,22 @@ export const getWatchlist = async (userId: string, lastEvaluatedKeyString: strin
params: params2
};

const job2 = await addJobToReadQueue(jobData2);
const respItem = await job2.finished() as QueryCommandOutput;
const ticketsItems = respItem.Items;
ticketJobs.push(jobData2);
}

const ticketJobResults = await addMultipleJobsToReadQueue(ticketJobs);

if (ticketsItems && ticketsItems.length > 0) {
await updateCommentCounts(ticketsItems);
await getUserProfile(ticketsItems);
collective.push(...ticketsItems);
for (const jobResult of ticketJobResults) {
const response = await jobResult.finished() as QueryCommandOutput;
const ticket = response.Items || [];
if (ticket.length > 0) {
collective.push(ticket[0]);
}
}

await updateCommentCounts(collective);
await getUserProfile(collective);

return {
lastEvaluatedKey: response.LastEvaluatedKey,
items: collective
Expand Down Expand Up @@ -809,6 +819,8 @@ export const getCompanyTickets = async (companyname: string) => {

const tenderItems = responseTender.Items;

const companyTicketJobs: JobData[] = [];

if (tenderItems && tenderItems.length > 0) {
for (const item of tenderItems) {
const params2: QueryCommandInput = {
Expand All @@ -825,15 +837,20 @@ export const getCompanyTickets = async (companyname: string) => {
params: params2
};

const job2 = await addJobToReadQueue(jobData2);
const responseCompanyTickets = await job2.finished() as QueryCommandOutput;
const companyTickets = responseCompanyTickets.Items;
companyTicketJobs.push(jobData2);
}

const ticketJobResults = await addMultipleJobsToReadQueue(companyTicketJobs);

for (const jobResult of ticketJobResults) {
const responseCompanyTickets = await jobResult.finished() as QueryCommandOutput;
const companyTickets = responseCompanyTickets.Items;
if (companyTickets && companyTickets.length > 0) {
await getUserProfile(companyTickets);
collective.push(...companyTickets);
}
}

await getUserProfile(collective);
}

const params3: QueryCommandInput = {
Expand Down Expand Up @@ -877,8 +894,6 @@ export const getCompanyTickets = async (companyname: string) => {


export const getOpenCompanyTickets = async () => {
const collective: any[] = [];

const params: QueryCommandInput = {
TableName: TICKETS_TABLE,
IndexName: "state-updatedAt-index",
Expand Down Expand Up @@ -1019,7 +1034,6 @@ export const getTicketComments = async (currTicketId: string) => {
}
};


const jobData: JobData = {
type: DB_QUERY,
params: params
Expand All @@ -1031,16 +1045,25 @@ export const getTicketComments = async (currTicketId: string) => {

if (items.length > 0) {
let cognitoUsername: string = "";
try {
const USER_POOL_ID = process.env.USER_POOL_ID;
for (let commentItem of items) {
cognitoUsername = (commentItem["user_id"] as string).toLowerCase();
const userResponse: AdminGetUserCommandOutput = await cognitoClient.send(
new AdminGetUserCommand({
UserPoolId: USER_POOL_ID,
Username: cognitoUsername
})
);
const USER_POOL_ID = process.env.USER_POOL_ID;
const cognitoPromises = [];
for (let commentItem of items) {
cognitoUsername = (commentItem["user_id"] as string).toLowerCase();
const response = cognitoClient.send(
new AdminGetUserCommand({
UserPoolId: USER_POOL_ID,
Username: cognitoUsername
})
);

cognitoPromises.push(response);
}

const userProfiles = await Promise.allSettled(cognitoPromises);

userProfiles.forEach((result, index) => {
if (result.status === "fulfilled") {
const userResponse = result.value as AdminGetUserCommandOutput;

let userImage: string | null = null;
let userGivenName: string | null = null;
Expand All @@ -1066,20 +1089,23 @@ export const getTicketComments = async (currTicketId: string) => {
}
}

commentItem["userImage"] = userImage;
commentItem["userName"] = userGivenName + " " + userFamilyName;
items[index]["userImage"] = userImage;
items[index]["userName"] = userGivenName + " " + userFamilyName;
}

return items;
} catch (error: any) {
if (error.name === "UserNotFoundException") {
console.error(`${error.message}: ${cognitoUsername}`);
} else {
console.error("An error occurred:", error);
else {
if (result.reason.name === "UserNotFoundException") {
items[index]["userImage"] = "";
items[index]["userName"] = "deleted_user";
}
else {
console.error("An error occurred:", result.reason);
}
}
}
});

return items;
}

} catch (e: any) {
if (e instanceof ClientError) {
throw new BadRequestError(`Failed to search for the ticket comments: ${e.response.Error.Message}`);
Expand All @@ -1106,14 +1132,12 @@ export const getGeodataAll = async () => {
for (const fault of faultData) {
const upvotes = fault.upvotes || 0;

if (upvotes < 10) {
if (upvotes < 100) {
fault.urgency = "non-urgent";
} else if (upvotes >= 10 && upvotes < 20) {
} else if (upvotes >= 100 && upvotes < 400) {
fault.urgency = "semi-urgent";
} else if (upvotes >= 20 && upvotes <= 40) {
} else if (upvotes >= 400) {
fault.urgency = "urgent";
} else {
fault.urgency = "non-urgent";
}

delete fault.upvotes;
Expand Down
Loading

0 comments on commit 8070931

Please sign in to comment.