Skip to content

Commit

Permalink
abstract: make fibers cancellable
Browse files Browse the repository at this point in the history
We are going to finish all client (non-system) fibers in the process
of Tarantool shutdown. First we cancel them and then wait for their
finishing. So if the client fibers are not finished Tarantool shutdown
is never finished too.

Currently some of queue fibers can not be finished and we got hang on
integration testing of PR [1]. Let's fix it.

In `queue_state.lua` the only error that can aries on `box.ctl.wait_rw` or
`box.ctl.wait_ro` is `FiberIsCancelled` so we can just drop pcall.

In `utubettl.lua` we just need to fix a typo. Just as already done for
fifottl in  e355387 ("fix fifottl_fiber_iteration error handling")

[1] PR with client fibers finishing on shutdown.
tarantool/tarantool#9604
  • Loading branch information
nshy committed Jan 29, 2024
1 parent e99ce45 commit 99fed57
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
2 changes: 1 addition & 1 deletion queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ local function utubettl_fiber(self)
if box.info.ro == false then
local stat, err = pcall(utubettl_fiber_iteration, self, processed)

if not stat and not err.code == box.error.READONLY then
if not stat and not (err.code == box.error.READONLY) then
log.error("error catched: %s", tostring(err))
log.error("exiting fiber '%s'", fiber.name())
return 1
Expand Down
32 changes: 14 additions & 18 deletions queue/abstract/queue_state.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,21 @@ local function create_state_fiber(on_state_change_cb)
fiber.self():name('queue_state_fiber')
while true do
if current == queue_state.states.WAITING then
local rc = pcall(box.ctl.wait_rw)
if rc then
current = queue_state.states.STARTUP
log.info('Queue state changed: STARTUP')
-- Wait for maximum upstream lag * 2.
fiber.sleep(max_lag() * 2)
on_state_change_cb(current)
current = queue_state.states.RUNNING
log.info('Queue state changed: RUNNING')
end
box.ctl.wait_rw()
current = queue_state.states.STARTUP
log.info('Queue state changed: STARTUP')
-- Wait for maximum upstream lag * 2.
fiber.sleep(max_lag() * 2)
on_state_change_cb(current)
current = queue_state.states.RUNNING
log.info('Queue state changed: RUNNING')
elseif current == queue_state.states.RUNNING then
local rc = pcall(box.ctl.wait_ro)
if rc then
current = queue_state.states.ENDING
on_state_change_cb(current)
log.info('Queue state changed: ENDING')
current = queue_state.states.WAITING
log.info('Queue state changed: WAITING')
end
box.ctl.wait_ro()
current = queue_state.states.ENDING
on_state_change_cb(current)
log.info('Queue state changed: ENDING')
current = queue_state.states.WAITING
log.info('Queue state changed: WAITING')
end
end
end)
Expand Down

0 comments on commit 99fed57

Please sign in to comment.