Skip to content

Commit 448dbcb

Browse files
feat(medusa): Rollout index engine behind feature flag (medusajs#11431)
**What** - Add index engine feature flag - apply it to the `store/products` end point as well as `admin/products` - Query builder various fixes - search capabilities on full data of every entities. The `q` search will be applied to all involved joined table for selection/where clauses Co-authored-by: Carlos R. L. Rodrigues <[email protected]>
1 parent 3b69f5a commit 448dbcb

File tree

27 files changed

+881
-135
lines changed

27 files changed

+881
-135
lines changed

.changeset/silver-baboons-drop.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@medusajs/index": patch
3+
"@medusajs/medusa": patch
4+
"@medusajs/utils": patch
5+
"@medusajs/modules-sdk": patch
6+
---
7+
8+
chore(medusa): index engine feature flag

integration-tests/modules/__tests__/index/query.index.ts

+123-12
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99

1010
jest.setTimeout(120000)
1111

12+
// NOTE: In this tests, both API are used to query, we use object pattern and string pattern
13+
1214
process.env.ENABLE_INDEX_MODULE = "true"
1315

1416
medusaIntegrationTestRunner({
@@ -23,12 +25,9 @@ medusaIntegrationTestRunner({
2325
process.env.ENABLE_INDEX_MODULE = "false"
2426
})
2527

26-
beforeEach(async () => {
27-
await createAdminUser(dbConnection, adminHeaders, appContainer)
28-
})
29-
3028
describe("Index engine - Query.index", () => {
31-
it("should use query.index to query the index module and hydrate the data", async () => {
29+
beforeEach(async () => {
30+
await createAdminUser(dbConnection, adminHeaders, appContainer)
3231
const shippingProfile = (
3332
await api.post(
3433
`/admin/shipping-profiles`,
@@ -40,6 +39,7 @@ medusaIntegrationTestRunner({
4039
const payload = [
4140
{
4241
title: "Test Product",
42+
status: "published",
4343
description: "test-product-description",
4444
shipping_profile_id: shippingProfile.id,
4545
options: [{ title: "Denominations", values: ["100"] }],
@@ -66,6 +66,7 @@ medusaIntegrationTestRunner({
6666
{
6767
title: "Extra product",
6868
description: "extra description",
69+
status: "published",
6970
shipping_profile_id: shippingProfile.id,
7071
options: [{ title: "Colors", values: ["Red"] }],
7172
variants: new Array(2).fill(0).map((_, i) => ({
@@ -88,13 +89,16 @@ medusaIntegrationTestRunner({
8889
},
8990
]
9091

91-
for (const data of payload) {
92-
await api.post("/admin/products", data, adminHeaders).catch((err) => {
92+
await api
93+
.post("/admin/products/batch", { create: payload }, adminHeaders)
94+
.catch((err) => {
9395
console.log(err)
9496
})
95-
}
96-
await setTimeout(5000)
9797

98+
await setTimeout(2000)
99+
})
100+
101+
it("should use query.index to query the index module and hydrate the data", async () => {
98102
const query = appContainer.resolve(
99103
ContainerRegistrationKeys.QUERY
100104
) as RemoteQueryFunction
@@ -105,7 +109,7 @@ medusaIntegrationTestRunner({
105109
"id",
106110
"description",
107111
"status",
108-
112+
"title",
109113
"variants.sku",
110114
"variants.barcode",
111115
"variants.material",
@@ -120,17 +124,25 @@ medusaIntegrationTestRunner({
120124
"variants.prices.amount": { $gt: 30 },
121125
},
122126
pagination: {
127+
take: 10,
128+
skip: 0,
123129
order: {
124130
"variants.prices.amount": "DESC",
125131
},
126132
},
127133
})
128134

135+
expect(resultset.metadata).toEqual({
136+
count: 2,
137+
skip: 0,
138+
take: 10,
139+
})
129140
expect(resultset.data).toEqual([
130141
{
131142
id: expect.any(String),
132143
description: "extra description",
133-
status: "draft",
144+
title: "Extra product",
145+
status: "published",
134146
variants: [
135147
{
136148
sku: "extra-variant-0",
@@ -194,7 +206,8 @@ medusaIntegrationTestRunner({
194206
{
195207
id: expect.any(String),
196208
description: "test-product-description",
197-
status: "draft",
209+
title: "Test Product",
210+
status: "published",
198211
variants: [
199212
{
200213
sku: "test-variant-1",
@@ -234,6 +247,104 @@ medusaIntegrationTestRunner({
234247
},
235248
])
236249
})
250+
251+
it("should use query.index to query the index module sorting by price desc", async () => {
252+
const query = appContainer.resolve(
253+
ContainerRegistrationKeys.QUERY
254+
) as RemoteQueryFunction
255+
256+
const resultset = await query.index({
257+
entity: "product",
258+
fields: [
259+
"id",
260+
"variants.prices.amount",
261+
"variants.prices.currency_code",
262+
],
263+
filters: {
264+
"variants.prices.currency_code": "USD",
265+
},
266+
pagination: {
267+
take: 1,
268+
skip: 0,
269+
order: {
270+
"variants.prices.amount": "DESC",
271+
},
272+
},
273+
})
274+
275+
// Limiting to 1 on purpose to keep it simple and check the correct order is maintained
276+
expect(resultset.data).toEqual([
277+
{
278+
id: expect.any(String),
279+
variants: expect.arrayContaining([
280+
expect.objectContaining({
281+
prices: expect.arrayContaining([
282+
{
283+
amount: 20,
284+
currency_code: "CAD",
285+
id: expect.any(String),
286+
},
287+
{
288+
amount: 80,
289+
currency_code: "USD",
290+
id: expect.any(String),
291+
},
292+
]),
293+
}),
294+
]),
295+
},
296+
])
297+
298+
const resultset2 = await query.index({
299+
entity: "product",
300+
fields: [
301+
"id",
302+
"variants.prices.amount",
303+
"variants.prices.currency_code",
304+
],
305+
filters: {
306+
variants: {
307+
prices: {
308+
currency_code: "USD",
309+
},
310+
},
311+
},
312+
pagination: {
313+
take: 1,
314+
skip: 0,
315+
order: {
316+
variants: {
317+
prices: {
318+
amount: "ASC",
319+
},
320+
},
321+
},
322+
},
323+
})
324+
325+
// Limiting to 1 on purpose to keep it simple and check the correct order is maintained
326+
expect(resultset2.data).toEqual([
327+
{
328+
id: expect.any(String),
329+
variants: [
330+
expect.objectContaining({
331+
prices: expect.arrayContaining([
332+
{
333+
amount: 30,
334+
currency_code: "USD",
335+
id: expect.any(String),
336+
},
337+
{
338+
amount: 50,
339+
currency_code: "EUR",
340+
id: expect.any(String),
341+
},
342+
]),
343+
}),
344+
],
345+
},
346+
])
347+
})
237348
})
238349
},
239350
})

packages/core/modules-sdk/src/remote-query/query.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
MedusaError,
1515
isObject,
1616
remoteQueryObjectFromString,
17+
unflattenObjectKeys,
1718
} from "@medusajs/utils"
1819
import { RemoteQuery } from "./remote-query"
1920
import { toRemoteQuery } from "./to-remote-query"
@@ -211,7 +212,9 @@ export class Query {
211212
: ({} as any)
212213
const pagination = queryOptions.pagination as any
213214
if (pagination?.order) {
214-
pagination.order = { [mainEntity]: pagination.order }
215+
pagination.order = {
216+
[mainEntity]: unflattenObjectKeys(pagination?.order),
217+
}
215218
}
216219

217220
const indexResponse = (await this.#indexModule.query({

packages/core/modules-sdk/src/remote-query/remote-query.ts

+111
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import { isPresent, isString, toPascalCase } from "@medusajs/utils"
1818
import { MedusaModule } from "../medusa-module"
1919

2020
const BASE_PREFIX = ""
21+
const MAX_BATCH_SIZE = 4000
22+
const MAX_CONCURRENT_REQUESTS = 10
2123
export class RemoteQuery {
2224
private remoteJoiner: RemoteJoiner
2325
private modulesMap: Map<string, LoadedModule> = new Map()
@@ -182,6 +184,102 @@ export class RemoteQuery {
182184
}
183185
}
184186

187+
private async fetchRemoteDataBatched(args: {
188+
serviceName: string
189+
keyField: string
190+
service: any
191+
methodName: string
192+
filters: any
193+
options: any
194+
ids: (unknown | unknown[])[]
195+
}): Promise<any[]> {
196+
const {
197+
serviceName,
198+
keyField,
199+
service,
200+
methodName,
201+
filters,
202+
options,
203+
ids,
204+
} = args
205+
206+
const getBatch = function* (
207+
idArray: (unknown | unknown[])[],
208+
batchSize: number
209+
) {
210+
for (let i = 0; i < idArray.length; i += batchSize) {
211+
yield idArray.slice(i, i + batchSize)
212+
}
213+
}
214+
215+
const idsToFetch = getBatch(ids, MAX_BATCH_SIZE)
216+
const results: any[] = []
217+
let running = 0
218+
const fetchPromises: Promise<void>[] = []
219+
220+
const processBatch = async (batch: (unknown | unknown[])[]) => {
221+
running++
222+
const batchFilters = { ...filters, [keyField]: batch }
223+
let result
224+
225+
try {
226+
if (RemoteQuery.traceFetchRemoteData) {
227+
result = await RemoteQuery.traceFetchRemoteData(
228+
async () => service[methodName](batchFilters, options),
229+
serviceName,
230+
methodName,
231+
options
232+
)
233+
} else {
234+
result = await service[methodName](batchFilters, options)
235+
}
236+
results.push(result)
237+
} finally {
238+
running--
239+
processAllBatches()
240+
}
241+
}
242+
243+
let batchesDone: (value: void) => void = () => {}
244+
const awaitBatches = new Promise((ok) => {
245+
batchesDone = ok
246+
})
247+
const processAllBatches = async () => {
248+
let isDone = false
249+
while (running < MAX_CONCURRENT_REQUESTS) {
250+
const nextBatch = idsToFetch.next()
251+
if (nextBatch.done) {
252+
isDone = true
253+
break
254+
}
255+
256+
const batch = nextBatch.value
257+
fetchPromises.push(processBatch(batch))
258+
}
259+
260+
if (isDone) {
261+
await Promise.all(fetchPromises)
262+
batchesDone()
263+
}
264+
}
265+
266+
processAllBatches()
267+
await awaitBatches
268+
269+
const flattenedResults = results.reduce((acc, result) => {
270+
if (
271+
Array.isArray(result) &&
272+
result.length === 2 &&
273+
Array.isArray(result[0])
274+
) {
275+
return acc.concat(result[0])
276+
}
277+
return acc.concat(result)
278+
}, [])
279+
280+
return flattenedResults
281+
}
282+
185283
public async remoteFetchData(
186284
expand: RemoteExpandProperty,
187285
keyField: string,
@@ -267,6 +365,19 @@ export class RemoteQuery {
267365
options.take = null
268366
}
269367

368+
if (ids && ids.length >= MAX_BATCH_SIZE && !hasPagination) {
369+
const data = await this.fetchRemoteDataBatched({
370+
serviceName: serviceConfig.serviceName,
371+
keyField,
372+
service,
373+
methodName,
374+
filters,
375+
options,
376+
ids,
377+
})
378+
return { data }
379+
}
380+
270381
let result: any
271382
if (RemoteQuery.traceFetchRemoteData) {
272383
result = await RemoteQuery.traceFetchRemoteData(

0 commit comments

Comments
 (0)