Skip to content

Commit 77f37c5

Browse files
authored
Chore(index): Sync logs management (#11522)
**What** - Add index engine sync log information - Ad `setTimeout(0)` to give breath to the event loop and ensuring not blocking the event loop and allow for other tasks queue execution to happen while syncing here is an example: **LOG_LEVEL=info** ![Screenshot 2025-02-19 at 10 09 25](https://github.com/user-attachments/assets/fc74dc32-1bc1-4123-9de3-f37817b7e783) **LOG_LEVEL=debug** ![Screenshot 2025-02-19 at 10 10 35](https://github.com/user-attachments/assets/222a1ce1-9267-4cb0-9518-dc4c7aa2b6f4)
1 parent 6470770 commit 77f37c5

File tree

6 files changed

+57
-9
lines changed

6 files changed

+57
-9
lines changed

.changeset/nice-seas-hunt.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@medusajs/index": patch
3+
---
4+
5+
Chore(index): Sync logs management

packages/modules/index/integration-tests/__fixtures__/medusa-config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Object.keys(config.modules).forEach((key) => {
3131
config.modules[Modules.INDEX] = {
3232
resolve: "@medusajs/index",
3333
dependencies: [
34+
ContainerRegistrationKeys.LOGGER,
3435
Modules.EVENT_BUS,
3536
Modules.LOCKING,
3637
ContainerRegistrationKeys.REMOTE_QUERY,

packages/modules/index/integration-tests/__tests__/config-sync.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ describe("IndexModuleService syncIndexConfig", function () {
133133

134134
afterEach(afterEach_)
135135

136-
it.only("should full sync all entities when the config has changed", async () => {
136+
it("should full sync all entities when the config has changed", async () => {
137137
await setTimeout(1000)
138138

139139
const currentMetadata = await indexMetadataService.list()
@@ -191,6 +191,7 @@ describe("IndexModuleService syncIndexConfig", function () {
191191
;(index as any).buildSchemaObjectRepresentation_()
192192

193193
let configurationChecker = new Configuration({
194+
logger,
194195
schemaObjectRepresentation: (index as any).schemaObjectRepresentation_,
195196
indexMetadataService,
196197
indexSyncService,
@@ -277,6 +278,7 @@ describe("IndexModuleService syncIndexConfig", function () {
277278
)
278279

279280
configurationChecker = new Configuration({
281+
logger,
280282
schemaObjectRepresentation: (index as any).schemaObjectRepresentation_,
281283
indexMetadataService,
282284
indexSyncService,

packages/modules/index/src/services/data-synchronizer.ts

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
SchemaObjectEntityRepresentation,
1616
} from "@medusajs/types"
1717
import { IndexMetadataStatus, Orchestrator } from "@utils"
18-
18+
import { setTimeout } from "timers/promises"
1919
export class DataSynchronizer {
2020
#container: Record<string, any>
2121
#isReady: boolean = false
@@ -160,6 +160,8 @@ export class DataSynchronizer {
160160
}
161161

162162
async #taskRunner(entity: string) {
163+
this.#logger.info(`[Index engine] syncing entity '${entity}'`)
164+
163165
const [[lastCursor]] = await promiseAll([
164166
this.#indexSyncService.list(
165167
{
@@ -176,15 +178,24 @@ export class DataSynchronizer {
176178
),
177179
])
178180

181+
let startTime = performance.now()
182+
let chunkStartTime = startTime
183+
179184
const finalAcknoledgement = await this.syncEntity({
180185
entityName: entity,
181186
pagination: {
182187
cursor: lastCursor?.last_key,
183188
},
184189
ack: async (ack) => {
190+
const endTime = performance.now()
191+
const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2)
185192
const promises: Promise<any>[] = []
186193

187194
if (ack.lastCursor) {
195+
this.#logger.debug(
196+
`[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)`
197+
)
198+
188199
promises.push(
189200
this.#indexSyncService.update({
190201
data: {
@@ -201,7 +212,22 @@ export class DataSynchronizer {
201212
}
202213
}
203214

215+
if (ack.err) {
216+
this.#logger.error(
217+
`[Index engine] syncing entity '${entity}' failed with error (+${chunkElapsedTime}ms):\n${ack.err.message}`
218+
)
219+
}
220+
221+
if (ack.done) {
222+
const elapsedTime = (endTime - startTime).toFixed(2)
223+
this.#logger.info(
224+
`[Index engine] syncing entity '${entity}' done (+${elapsedTime}ms)`
225+
)
226+
}
227+
204228
await promiseAll(promises)
229+
230+
chunkStartTime = performance.now()
205231
},
206232
})
207233

@@ -272,7 +298,7 @@ export class DataSynchronizer {
272298
const acknoledgement = {
273299
lastCursor: pagination.cursor ?? null,
274300
err: new Error(
275-
"Entity does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)"
301+
`Entity ${entityName} does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)`
276302
),
277303
}
278304

@@ -329,13 +355,11 @@ export class DataSynchronizer {
329355

330356
await ack({ lastCursor: currentCursor })
331357
} catch (err) {
332-
this.#logger.error(
333-
`Index engine] sync failed for entity ${entityName}`,
334-
err
335-
)
336358
error = err
337359
break
338360
}
361+
362+
await setTimeout(0)
339363
}
340364

341365
let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = {

packages/modules/index/src/services/index-module-service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ export default class IndexModuleService
130130
})
131131

132132
const configurationChecker = new Configuration({
133+
logger: this.logger_,
133134
schemaObjectRepresentation: this.schemaObjectRepresentation_,
134135
indexMetadataService: this.indexMetadataService_,
135136
indexSyncService: this.indexSyncService_,

packages/modules/index/src/utils/sync/configuration.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { simpleHash } from "@medusajs/framework/utils"
2-
import { IndexTypes, InferEntityType } from "@medusajs/types"
2+
import { IndexTypes, InferEntityType, Logger } from "@medusajs/types"
33
import { IndexMetadata } from "@models"
44
import { schemaObjectRepresentationPropertiesToOmit } from "@types"
55
import { DataSynchronizer } from "../../services/data-synchronizer"
@@ -12,25 +12,32 @@ export class Configuration {
1212
#indexMetadataService: IndexMetadataService
1313
#indexSyncService: IndexSyncService
1414
#dataSynchronizer: DataSynchronizer
15+
#logger: Logger
1516

1617
constructor({
1718
schemaObjectRepresentation,
1819
indexMetadataService,
1920
indexSyncService,
2021
dataSynchronizer,
22+
logger,
2123
}: {
2224
schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
2325
indexMetadataService: IndexMetadataService
2426
indexSyncService: IndexSyncService
2527
dataSynchronizer: DataSynchronizer
28+
logger: Logger
2629
}) {
2730
this.#schemaObjectRepresentation = schemaObjectRepresentation ?? {}
2831
this.#indexMetadataService = indexMetadataService
2932
this.#indexSyncService = indexSyncService
3033
this.#dataSynchronizer = dataSynchronizer
34+
this.#logger = logger
3135
}
3236

3337
async checkChanges(): Promise<InferEntityType<typeof IndexMetadata>[]> {
38+
this.#logger.info(
39+
"[Index engine] Checking for changes in the index configuration"
40+
)
3441
const schemaObjectRepresentation = this.#schemaObjectRepresentation
3542

3643
const currentConfig = await this.#indexMetadataService.list()
@@ -135,8 +142,16 @@ export class Configuration {
135142
await this.#indexSyncService.upsert(idxSyncData)
136143
}
137144

138-
return await this.#indexMetadataService.list({
145+
const changes = await this.#indexMetadataService.list({
139146
status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING],
140147
})
148+
149+
this.#logger.info(
150+
`[Index engine] Found ${changes.length} change${
151+
changes.length > 1 ? "s" : ""
152+
} in the index configuration that are either pending or processing`
153+
)
154+
155+
return changes
141156
}
142157
}

0 commit comments

Comments
 (0)