Skip to content

Commit c250de7

Browse files
authored
chore(): Prevent sub workflow events release early + redis unlink (medusajs#11641)
**What** - Prevent event release when a workflow is run as step and finish - Use `unlink` instead of `del` when removing keys from redist to push the execution to async thread
1 parent ca6a157 commit c250de7

File tree

7 files changed

+26
-14
lines changed

7 files changed

+26
-14
lines changed

.changeset/large-experts-provide.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@medusajs/cache-redis": patch
3+
"@medusajs/event-bus-redis": patch
4+
"@medusajs/workflow-engine-redis": patch
5+
"@medusajs/locking-redis": patch
6+
"@medusajs/workflows-sdk": patch
7+
---
8+
9+
chore(): Prevent sub workflow events release early + redis unlink

packages/core/workflows-sdk/src/helper/workflow-export.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ function createContextualWorkflowRunner<
9393

9494
const { eventGroupId, parentStepIdempotencyKey } = context
9595

96-
attachOnFinishReleaseEvents(events, flow, { logOnError })
96+
if (!parentStepIdempotencyKey) {
97+
attachOnFinishReleaseEvents(events, flow, { logOnError })
98+
}
9799

98100
const flowMetadata = {
99101
eventGroupId,

packages/modules/cache-redis/src/services/redis-cache.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class RedisCacheService implements ICacheService {
6666
return JSON.parse(cached)
6767
}
6868
} catch (err) {
69-
await this.redis.del(cacheKey)
69+
await this.redis.unlink(cacheKey)
7070
}
7171
return null
7272
}
@@ -92,7 +92,7 @@ class RedisCacheService implements ICacheService {
9292
if (keys.length > 0) {
9393
const deletePipeline = this.redis.pipeline()
9494
for (const key of keys) {
95-
deletePipeline.del(key)
95+
deletePipeline.unlink(key)
9696
}
9797

9898
await deletePipeline.exec()

packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts

+9-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const redisMock = {
2727
lrange: () => jest.fn(),
2828
disconnect: () => jest.fn(),
2929
expire: () => jest.fn(),
30+
unlink: () => jest.fn(),
3031
} as unknown as Redis
3132

3233
const simpleModuleOptions = { redisUrl: "test-url" }
@@ -63,7 +64,7 @@ describe("RedisEventBusService", () => {
6364
{
6465
connection: expect.any(Object),
6566
prefix: "RedisEventBusService",
66-
autorun: false
67+
autorun: false,
6768
}
6869
)
6970
})
@@ -269,15 +270,15 @@ describe("RedisEventBusService", () => {
269270
},
270271
]
271272

272-
redis.del = jest.fn()
273+
redis.unlink = jest.fn()
273274

274275
await eventBus.emit(events, options)
275276

276277
// Expect 1 event to have been send
277278
// Expect 2 pushes to redis as there are 2 groups of events to push
278279
expect(queue.addBulk).toHaveBeenCalledTimes(1)
279280
expect(redis.rpush).toHaveBeenCalledTimes(2)
280-
expect(redis.del).not.toHaveBeenCalled()
281+
expect(redis.unlink).not.toHaveBeenCalled()
281282

282283
const [testGroup1Event] = (eventBus as any).buildEvents(
283284
[events[0]],
@@ -314,12 +315,12 @@ describe("RedisEventBusService", () => {
314315

315316
expect(queue.addBulk).toHaveBeenCalledTimes(1)
316317
expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event])
317-
expect(redis.del).toHaveBeenCalledTimes(1)
318-
expect(redis.del).toHaveBeenCalledWith("staging:test-group-1")
318+
expect(redis.unlink).toHaveBeenCalledTimes(1)
319+
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-1")
319320

320321
queue = (eventBus as any).queue_
321322
queue.addBulk = jest.fn()
322-
redis.del = jest.fn()
323+
redis.unlink = jest.fn()
323324

324325
await eventBus.releaseGroupedEvents("test-group-2")
325326

@@ -328,8 +329,8 @@ describe("RedisEventBusService", () => {
328329
testGroup2Event,
329330
testGroup2Event2,
330331
])
331-
expect(redis.del).toHaveBeenCalledTimes(1)
332-
expect(redis.del).toHaveBeenCalledWith("staging:test-group-2")
332+
expect(redis.unlink).toHaveBeenCalledTimes(1)
333+
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2")
333334
})
334335
})
335336
})

packages/modules/event-bus-redis/src/services/event-bus-redis.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
223223
return
224224
}
225225

226-
await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`)
226+
await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`)
227227
}
228228

229229
/**

packages/modules/providers/locking-redis/src/services/redis-lock.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ export class RedisLockingProvider implements ILockingProvider {
258258
const currentOwner = currentOwners?.[idx]?.[1]
259259

260260
if (currentOwner === ownerId) {
261-
deletePipeline.del(key)
261+
deletePipeline.unlink(key)
262262
}
263263
})
264264

packages/modules/workflow-engine-redis/integration-tests/utils/database.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async function deleteKeysByPattern(pattern) {
3333
for await (const keys of stream) {
3434
if (keys.length) {
3535
const pipeline = redis.pipeline()
36-
keys.forEach((key) => pipeline.del(key))
36+
keys.forEach((key) => pipeline.unlink(key))
3737
await pipeline.exec()
3838
}
3939
}

0 commit comments

Comments
 (0)