Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(slot-reservations): support SlotReservationsFull event #926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions codex/contracts/market.nim
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ method subscribeSlotFreed*(market: OnChainMarket,
let subscription = await market.contract.subscribe(SlotFreed, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)

method subscribeSlotReservationsFull*(
market: OnChainMarket,
callback: OnSlotReservationsFull): Future[MarketSubscription] {.async.} =

proc onEvent(event: SlotReservationsFull) {.upraises:[].} =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant for this PR, but we should probably get rid of upraises and replace it with regular raises at some point.

callback(event.requestId, event.slotIndex)

convertEthersError:
let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)

method subscribeFulfillment(market: OnChainMarket,
callback: OnFulfillment):
Future[MarketSubscription] {.async.} =
Expand Down
10 changes: 10 additions & 0 deletions codex/market.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type
OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].}
OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].}
OnSlotFreed* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSlotReservationsFull* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises: [].}
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].}
Expand All @@ -42,6 +43,9 @@ type
SlotFreed* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
SlotReservationsFull* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
RequestFulfilled* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
RequestCancelled* = object of MarketplaceEvent
Expand Down Expand Up @@ -203,6 +207,12 @@ method subscribeSlotFreed*(market: Market,
Future[Subscription] {.base, async.} =
raiseAssert("not implemented")

method subscribeSlotReservationsFull*(
market: Market,
callback: OnSlotReservationsFull): Future[Subscription] {.base, async.} =

raiseAssert("not implemented")

method subscribeRequestCancelled*(market: Market,
callback: OnRequestCancelled):
Future[Subscription] {.base, async.} =
Expand Down
18 changes: 18 additions & 0 deletions codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,23 @@ proc subscribeSlotFreed(sales: Sales) {.async.} =
except CatchableError as e:
error "Unable to subscribe to slot freed events", msg = e.msg

proc subscribeSlotReservationsFull(sales: Sales) {.async.} =
let context = sales.context
let market = context.market
let queue = context.slotQueue

proc onSlotReservationsFull(requestId: RequestId, slotIndex: UInt256) =
trace "reservations for slot full, removing from slot queue", requestId, slotIndex
queue.delete(requestId, slotIndex.truncate(uint16))

try:
let sub = await market.subscribeSlotReservationsFull(onSlotReservationsFull)
sales.subscriptions.add(sub)
except CancelledError as error:
raise error
except CatchableError as e:
error "Unable to subscribe to slot filled events", msg = e.msg

proc startSlotQueue(sales: Sales) {.async.} =
let slotQueue = sales.context.slotQueue
let reservations = sales.context.reservations
Expand All @@ -488,6 +505,7 @@ proc subscribe(sales: Sales) {.async.} =
await sales.subscribeSlotFilled()
await sales.subscribeSlotFreed()
await sales.subscribeCancellation()
await sales.subscribeSlotReservationsFull()

proc unsubscribe(sales: Sales) {.async.} =
for sub in sales.subscriptions:
Expand Down
25 changes: 25 additions & 0 deletions tests/codex/helpers/mockmarket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type
onFulfillment: seq[FulfillmentSubscription]
onSlotFilled: seq[SlotFilledSubscription]
onSlotFreed: seq[SlotFreedSubscription]
onSlotReservationsFull: seq[SlotReservationsFullSubscription]
onRequestCancelled: seq[RequestCancelledSubscription]
onRequestFailed: seq[RequestFailedSubscription]
onProofSubmitted: seq[ProofSubmittedSubscription]
Expand All @@ -72,6 +73,9 @@ type
SlotFreedSubscription* = ref object of Subscription
market: MockMarket
callback: OnSlotFreed
SlotReservationsFullSubscription* = ref object of Subscription
market: MockMarket
callback: OnSlotReservationsFull
RequestCancelledSubscription* = ref object of Subscription
market: MockMarket
requestId: ?RequestId
Expand Down Expand Up @@ -202,6 +206,15 @@ proc emitSlotFreed*(market: MockMarket,
for subscription in subscriptions:
subscription.callback(requestId, slotIndex)

proc emitSlotReservationsFull*(
market: MockMarket,
requestId: RequestId,
slotIndex: UInt256) =

var subscriptions = market.subscriptions.onSlotReservationsFull
for subscription in subscriptions:
subscription.callback(requestId, slotIndex)

proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) =
var subscriptions = market.subscriptions.onRequestCancelled
for subscription in subscriptions:
Expand Down Expand Up @@ -389,6 +402,15 @@ method subscribeSlotFreed*(market: MockMarket,
market.subscriptions.onSlotFreed.add(subscription)
return subscription

method subscribeSlotReservationsFull*(
market: MockMarket,
callback: OnSlotReservationsFull): Future[Subscription] {.async.} =

let subscription =
SlotReservationsFullSubscription(market: market, callback: callback)
market.subscriptions.onSlotReservationsFull.add(subscription)
return subscription

method subscribeRequestCancelled*(market: MockMarket,
callback: OnRequestCancelled):
Future[Subscription] {.async.} =
Expand Down Expand Up @@ -481,3 +503,6 @@ method unsubscribe*(subscription: RequestFailedSubscription) {.async.} =

method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} =
subscription.market.subscriptions.onProofSubmitted.keepItIf(it != subscription)

method unsubscribe*(subscription: SlotReservationsFullSubscription) {.async.} =
subscription.market.subscriptions.onSlotReservationsFull.keepItIf(it != subscription)
6 changes: 6 additions & 0 deletions tests/codex/sales/testsales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))

test "removes slot index from slot queue once SlotReservationsFull emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotReservationsFull(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))

test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
Expand Down
24 changes: 24 additions & 0 deletions tests/contracts/testMarket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ ethersuite "On-Chain Market":
check receivedIdxs == @[slotIndex]
await subscription.unsubscribe()

test "supports slot reservations full subscriptions":
let account2 = ethProvider.getSigner(accounts[2])
let account3 = ethProvider.getSigner(accounts[3])

await market.requestStorage(request)

var receivedRequestIds: seq[RequestId] = @[]
var receivedIdxs: seq[UInt256] = @[]
proc onSlotReservationsFull(requestId: RequestId, idx: UInt256) =
receivedRequestIds.add(requestId)
receivedIdxs.add(idx)
let subscription =
await market.subscribeSlotReservationsFull(onSlotReservationsFull)

await market.reserveSlot(request.id, slotIndex)
switchAccount(account2)
await market.reserveSlot(request.id, slotIndex)
switchAccount(account3)
await market.reserveSlot(request.id, slotIndex)

check receivedRequestIds == @[request.id]
check receivedIdxs == @[slotIndex]
await subscription.unsubscribe()

test "support fulfillment subscriptions":
await market.requestStorage(request)
var receivedIds: seq[RequestId]
Expand Down