Skip to content

Commit

Permalink
fix: queues status endpoint to handle jobs states correctly (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega authored Jan 30, 2025
1 parent 7b2b620 commit e503a31
Show file tree
Hide file tree
Showing 15 changed files with 358 additions and 79 deletions.
42 changes: 27 additions & 15 deletions src/adapters/memory-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,33 @@ export function createInMemoryCacheComponent(): ICacheStorage {

async function stop() {}

async function get(pattern: string): Promise<string[]> {
const regex = new RegExp(pattern.replace('*', '.*'))
const matchingKeys = [...cache.keys()].filter((key) => regex.test(key))

const validKeys = matchingKeys.filter((key) => {
const entry = cache.get(key)
if (entry && (!entry.expiresAt || entry.expiresAt > Date.now())) {
return true
} else {
cache.delete(key)
return false
}
})

return validKeys.map((key) => key.split(':').pop()!)
async function get<T>(pattern: string): Promise<T[]> {
if (pattern.includes('*')) {
const regex = new RegExp(pattern.replace('*', '.*'))
const matchingKeys = [...cache.keys()].filter((key) => regex.test(key))

const validKeys = matchingKeys.filter((key) => {
const entry = cache.get(key)
if (entry && (!entry.expiresAt || entry.expiresAt > Date.now())) {
return true
} else {
cache.delete(key)
return false
}
})

return validKeys.map((key) => {
const entry = cache.get(key)
return entry?.value
})
}

const entry = cache.get(pattern)
if (!entry || (entry.expiresAt && entry.expiresAt <= Date.now())) {
cache.delete(pattern)
return []
}
return [entry.value]
}

async function set<T>(key: string, value: T): Promise<void> {
Expand Down
12 changes: 8 additions & 4 deletions src/adapters/redis.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createClient } from 'redis'
import { AppComponents, ICacheStorage } from '../types'

const TWENTY_FOUR_HOURS_IN_SECONDS = 60 * 60 * 24
const SEVEN_DAYS_IN_SECONDS = 60 * 60 * 24 * 7

export async function createRedisComponent(
hostUrl: string,
Expand Down Expand Up @@ -40,10 +40,14 @@ export async function createRedisComponent(
}
}

async function get(pattern: string): Promise<string[]> {
async function get<T>(pattern: string): Promise<T[]> {
try {
const keys = await client.keys(pattern)
return keys.map((key) => key.split(':').pop()!)
if (keys.length === 0) {
return []
}
const values = (await client.mGet(keys)) || []
return values.map((value: any) => JSON.parse(value))
} catch (err: any) {
logger.error(`Error getting key "${pattern}"`, err)
throw err
Expand All @@ -54,7 +58,7 @@ export async function createRedisComponent(
try {
const serializedValue = JSON.stringify(value)
await client.set(key, serializedValue, {
EX: TWENTY_FOUR_HOURS_IN_SECONDS // expiration time (TTL)
EX: SEVEN_DAYS_IN_SECONDS // expiration time (TTL)
})
logger.debug(`Successfully set key "${key}"`)
} catch (err: any) {
Expand Down
9 changes: 5 additions & 4 deletions src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { createWorkerManagerComponent } from './logic/workers/worker-manager'
import { createRedisComponent } from './adapters/redis'
import { createInMemoryCacheComponent } from './adapters/memory-cache'
import { createWorldsAdapter } from './adapters/worlds'
import { createQueuesStatusManagerComponent } from './logic/queues-status-manager'

// Initialize all the components of the app
export async function initComponents(): Promise<AppComponents> {
Expand Down Expand Up @@ -90,16 +91,15 @@ export async function initComponents(): Promise<AppComponents> {
const worlds = await createWorldsAdapter({ logs, config, fetch })
const registryOrchestrator = createRegistryOrchestratorComponent({ logs, db, metrics })
const entityStatusFetcher = await createEntityStatusFetcherComponent({ fetch, logs, config })
const queuesStatusManager = createQueuesStatusManagerComponent({ memoryStorage })
const messageProcessor = await createMessageProcessorComponent({
catalyst,
worlds,
entityStatusFetcher,
registryOrchestrator,
db,
logs,
config,
fetch,
memoryStorage
queuesStatusManager
})
const messageConsumer = createMessagesConsumerComponent({ logs, queue, messageProcessor, metrics })
const workerManager = createWorkerManagerComponent({ metrics, logs })
Expand All @@ -121,6 +121,7 @@ export async function initComponents(): Promise<AppComponents> {
registryOrchestrator,
entityStatusFetcher,
workerManager,
memoryStorage
memoryStorage,
queuesStatusManager
}
}
14 changes: 8 additions & 6 deletions src/controllers/handlers/get-entity-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@ export async function getEntitiesStatusHandler(
}
}

export async function getQueuesStatuses(context: HandlerContextWithPath<'memoryStorage', '/queues/status'>) {
export async function getQueuesStatuses(context: HandlerContextWithPath<'queuesStatusManager', '/queues/status'>) {
const {
components: { memoryStorage }
components: { queuesStatusManager }
} = context

const platforms: string[] = ['windows', 'mac', 'webgl']
async function getEntitiesIdsOfPendingJobs(platform: 'windows' | 'mac' | 'webgl') {
return (await queuesStatusManager.getAllPendingEntities(platform)).map((pendingJob) => pendingJob.entityId)
}

const [windowsPendingJobs, macPendingJobs, webglPendingJobs] = await Promise.all(
platforms.map(async (platform) => await memoryStorage.get(`jobs:${platform}:*`))
)
const windowsPendingJobs = await getEntitiesIdsOfPendingJobs('windows')
const macPendingJobs = await getEntitiesIdsOfPendingJobs('mac')
const webglPendingJobs = await getEntitiesIdsOfPendingJobs('webgl')

return {
status: 200,
Expand Down
26 changes: 12 additions & 14 deletions src/logic/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,24 @@ export async function createMessageProcessorComponent({
registryOrchestrator,
db,
logs,
config,
fetch,
memoryStorage
queuesStatusManager
}: Pick<
AppComponents,
| 'catalyst'
| 'worlds'
| 'entityStatusFetcher'
| 'registryOrchestrator'
| 'db'
| 'logs'
| 'config'
| 'fetch'
| 'memoryStorage'
'catalyst' | 'worlds' | 'entityStatusFetcher' | 'registryOrchestrator' | 'db' | 'logs' | 'queuesStatusManager'
>): Promise<MessageProcessorComponent> {
const log = logs.getLogger('message-processor')
const processors: EventHandlerComponent[] = [
createDeploymentProcessor({ catalyst, worlds, logs, registryOrchestrator }),
createTexturesProcessor({ db, logs, catalyst, worlds, entityStatusFetcher, registryOrchestrator, memoryStorage }),
await createStatusProcessor({ logs, config, fetch, memoryStorage })
createTexturesProcessor({
db,
logs,
catalyst,
worlds,
entityStatusFetcher,
registryOrchestrator,
queuesStatusManager
}),
createStatusProcessor({ logs, queuesStatusManager })
]

async function process(message: any) {
Expand Down
39 changes: 14 additions & 25 deletions src/logic/processors/status-processor.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,54 @@
import { DeploymentToSqs } from '@dcl/schemas/dist/misc/deployments-to-sqs'
import { AppComponents, EventHandlerComponent, ProcessorResult } from '../../types'
import { AssetBundleConversionManuallyQueuedEvent, Events } from '@dcl/schemas'
import { generateCacheKey } from '../../utils/key-generator'

export const createStatusProcessor = async ({
export const createStatusProcessor = ({
logs,
memoryStorage
}: Pick<AppComponents, 'logs' | 'config' | 'fetch' | 'memoryStorage'>): Promise<EventHandlerComponent> => {
queuesStatusManager
}: Pick<AppComponents, 'logs' | 'queuesStatusManager'>): EventHandlerComponent => {
const logger = logs.getLogger('status-processor')

function getEventProperties(event: any) {
let entityId: string = ''
let isPriority: boolean = false
let isLods: boolean = false
let platform: 'webgl' | 'windows' | 'mac' | 'all' = 'all'
const platforms: ('webgl' | 'windows' | 'mac')[] = []

if (event.type === Events.Type.ASSET_BUNDLE && event.subType === Events.SubType.AssetBundle.MANUALLY_QUEUED) {
const manuallyQueuedEvent = event as AssetBundleConversionManuallyQueuedEvent

entityId = manuallyQueuedEvent.metadata.entityId
isPriority = manuallyQueuedEvent.metadata.isPriority
platform = manuallyQueuedEvent.metadata.platform
platforms.push(manuallyQueuedEvent.metadata.platform)
isLods = manuallyQueuedEvent.metadata.isLods
} else {
const deploymentEvent = event as DeploymentToSqs

entityId = deploymentEvent.entity.entityId
platforms.push('webgl')
platforms.push('windows')
platforms.push('mac')
}

return {
entityId,
isPriority,
platform,
platforms,
isLods
}
}

return {
process: async (event: any): Promise<ProcessorResult> => {
try {
const keys: string[] = []

const { entityId, platform, isLods } = getEventProperties(event)
const { entityId, platforms, isLods } = getEventProperties(event)

if (isLods) {
logger.info('Skipping processing status for LODs', { entityId, platform })
logger.info('Skipping processing status for LODs', { entityId, platforms: platforms.join(', ') })
return { ok: true }
}

logger.info('Processing status', { entityId, platform })

if (platform === 'all') {
;['windows', 'mac', 'webgl'].forEach((platform: any) => {
keys.push(generateCacheKey(platform, entityId))
})
} else {
keys.push(generateCacheKey(platform, entityId))
}
logger.info('Processing status', { entityId, platforms: platforms.join(', ') })

for (const key of keys) {
await memoryStorage.set(`${key}`, Date.now())
for (const platform of platforms) {
await queuesStatusManager.markAsQueued(platform, entityId)
}

return { ok: true }
Expand Down
7 changes: 3 additions & 4 deletions src/logic/processors/textures-processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { AssetBundleConversionFinishedEvent, Entity } from '@dcl/schemas'
import { AppComponents, EventHandlerComponent, ProcessorResult, Registry } from '../../types'
import { generateCacheKey } from '../../utils/key-generator'

export const createTexturesProcessor = ({
logs,
Expand All @@ -9,10 +8,10 @@ export const createTexturesProcessor = ({
worlds,
entityStatusFetcher,
registryOrchestrator,
memoryStorage
queuesStatusManager
}: Pick<
AppComponents,
'logs' | 'db' | 'catalyst' | 'worlds' | 'entityStatusFetcher' | 'registryOrchestrator' | 'memoryStorage'
'logs' | 'db' | 'catalyst' | 'worlds' | 'entityStatusFetcher' | 'registryOrchestrator' | 'queuesStatusManager'
>): EventHandlerComponent => {
const logger = logs.getLogger('textures-processor')

Expand Down Expand Up @@ -76,7 +75,7 @@ export const createTexturesProcessor = ({
logger.info(`Bundle stored`, { entityId: event.metadata.entityId, bundles: JSON.stringify(registry.bundles) })

await registryOrchestrator.persistAndRotateStates(registry)
await memoryStorage.purge(generateCacheKey(event.metadata.platform, event.metadata.entityId))
await queuesStatusManager.markAsFinished(event.metadata.platform, event.metadata.entityId)

return { ok: true }
} catch (errors: any) {
Expand Down
62 changes: 62 additions & 0 deletions src/logic/queues-status-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { AppComponents, EntityStatusInQueue, QueuesStatusManagerComponent } from '../types'

export function createQueuesStatusManagerComponent({
memoryStorage
}: Pick<AppComponents, 'memoryStorage'>): QueuesStatusManagerComponent {
function generateCacheKey(platform: 'windows' | 'mac' | 'webgl', entityId: string): string {
return `jobs:${platform}:${entityId}`
}

async function getStatus(key: string): Promise<EntityStatusInQueue | undefined> {
const status = await memoryStorage.get<EntityStatusInQueue>(key)

if (!status.length) return undefined

return status[0]
}

async function markAsQueued(platform: 'windows' | 'mac' | 'webgl', entityId: string): Promise<void> {
const key = generateCacheKey(platform, entityId)
const currentValue = (await getStatus(key)) || {
entityId,
platform,
status: 0
}

await memoryStorage.set(key, { ...currentValue, status: currentValue.status + 1 })
}

async function markAsFinished(platform: 'windows' | 'mac' | 'webgl', entityId: string): Promise<void> {
const key = generateCacheKey(platform, entityId)
const currentValue = (await getStatus(key)) || {
entityId,
platform,
status: 0
}

const newStatus = currentValue.status - 1

if (newStatus === 0) {
await memoryStorage.purge(key)
return
}

await memoryStorage.set<EntityStatusInQueue>(key, { ...currentValue, status: newStatus })
}

async function getAllPendingEntities(platform: 'windows' | 'mac' | 'webgl'): Promise<EntityStatusInQueue[]> {
const entities = (await memoryStorage.get<EntityStatusInQueue>(`jobs:${platform}:*`)) || []
return entities
.filter((entity: any) => entity.status > 0)
.map((entity) => ({
...entity,
status: Math.min(entity.status, 1)
}))
}

return {
markAsQueued,
markAsFinished,
getAllPendingEntities
}
}
12 changes: 9 additions & 3 deletions src/types/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Message } from '@aws-sdk/client-sqs'
import { IBaseComponent } from '@well-known-components/interfaces'
import { Registry } from './types'
import { EntityStatusInQueue, Registry } from './types'
import { Entity, EthAddress } from '@dcl/schemas'
import { DeploymentToSqs } from '@dcl/schemas/dist/misc/deployments-to-sqs'

Expand Down Expand Up @@ -74,7 +74,13 @@ export type RegistryOrchestratorComponent = {
}

export type ICacheStorage = IBaseComponent & {
get(key: string): Promise<any>
set(key: string, value: any): Promise<void>
get<T>(key: string): Promise<T[]>
set<T>(key: string, value: T): Promise<void>
purge(key: string): Promise<void>
}

export type QueuesStatusManagerComponent = {
markAsQueued(platform: 'windows' | 'mac' | 'webgl', entityId: string): Promise<void>
markAsFinished(platform: 'windows' | 'mac' | 'webgl', entityId: string): Promise<void>
getAllPendingEntities(platform: 'windows' | 'mac' | 'webgl'): Promise<EntityStatusInQueue[]>
}
Loading

0 comments on commit e503a31

Please sign in to comment.