Skip to content

Commit 3475eb8

Browse files
authored
Fix dynamic shutdown/restart (#4821)
1 parent 77fa92f commit 3475eb8

5 files changed

Lines changed: 176 additions & 15 deletions

File tree

src/core/clock.ml

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -261,19 +261,21 @@ let sync c = _sync (Unifier.deref c)
261261
let pending_clocks = WeakQueue.create ()
262262
let clocks = Queue.create ()
263263

264-
let rec _cleanup ~clock { outputs } =
265-
Queue.iter outputs (fun (a, o) -> try o#sleep a with _ -> ());
264+
let rec has_stopped ~clear_controller ~clock ~c x =
265+
Queue.iter x.outputs (fun (a, o) -> try o#sleep a with _ -> ());
266266
Queue.iter clock.sub_clocks stop;
267-
Queue.filter_out clocks (fun c -> Unifier.deref c == clock)
267+
Queue.filter_out clocks (fun c -> Unifier.deref c == clock);
268+
if clear_controller then Unifier.set clock.controller `None;
269+
Atomic.set clock.state (`Stopped x.sync);
270+
WeakQueue.push pending_clocks c;
271+
x.log#important "Clock stopped"
268272

269273
and stop c =
270274
let clock = Unifier.deref c in
271275
match Atomic.get clock.state with
272276
| `Stopped _ | `Stopping _ -> ()
273277
| `Started ({ sync = `Passive } as x) ->
274-
_cleanup ~clock x;
275-
x.log#important "Clock stopped";
276-
Atomic.set clock.state (`Stopped `Passive)
278+
has_stopped ~clear_controller:false ~clock ~c x
277279
| `Started x ->
278280
x.log#important "Clock stopping";
279281
Atomic.set clock.state (`Stopping x)
@@ -554,7 +556,7 @@ and _tick ~clock x =
554556
_after_tick ~self_sync x;
555557
check_stopped ()
556558

557-
and _clock_thread ~clock x =
559+
and _clock_thread ~clock ~c x =
558560
let has_sources_to_process () =
559561
0 < Queue.length clock.pending_activations
560562
|| 0 < Queue.length x.outputs
@@ -579,8 +581,7 @@ and _clock_thread ~clock x =
579581
[] reasons
580582
in
581583
x.log#important "Clock thread has stopped: %s." (String.concat ", " reasons);
582-
_cleanup ~clock x;
583-
Atomic.set clock.state (`Stopped x.sync)
584+
has_stopped ~clear_controller:true ~clock ~c x
584585
in
585586
let run () =
586587
try
@@ -613,7 +614,7 @@ and _can_start ?(force = false) clock =
613614
`True sync
614615
| _ -> `False
615616

616-
and _start ?force ~sync clock =
617+
and _start ?force ~sync ~c clock =
617618
_set_id clock (_id clock);
618619
let id = _id clock in
619620
let sources =
@@ -675,7 +676,7 @@ and _start ?force ~sync clock =
675676
Queue.iter clock.sub_clocks (fun c -> start ?force c);
676677
Atomic.set clock.state (`Started x);
677678
if sync <> `Passive then (
678-
let th = _clock_thread ~clock x in
679+
let th = _clock_thread ~clock ~c x in
679680
match _controller with
680681
| `None ->
681682
let controller =
@@ -689,7 +690,7 @@ and _start ?force ~sync clock =
689690
and start ?force c =
690691
let clock = Unifier.deref c in
691692
match _can_start ?force clock with
692-
| `True sync -> _start ?force ~sync clock
693+
| `True sync -> _start ?force ~sync ~c clock
693694
| `False -> ()
694695

695696
let add_pending_clock =
@@ -700,7 +701,7 @@ let add_pending_clock =
700701
let clock = Unifier.deref c in
701702
match _can_start clock with
702703
| `True sync when sync <> `Passive ->
703-
_start ~sync clock;
704+
_start ~sync ~c clock;
704705
Queue.push clocks c
705706
| _ -> ()
706707
in
@@ -747,7 +748,7 @@ let start_pending () =
747748
match _can_start clock with
748749
| `True `Passive -> ()
749750
| `True sync ->
750-
_start ~sync clock;
751+
_start ~sync ~c clock;
751752
Queue.push clocks c
752753
| `False -> WeakQueue.push pending_clocks c)
753754
| _ -> ())

src/core/outputs/output.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class virtual output ~output_kind ?clock ?(name = "") ~infallible
172172
(* Output that frame if it has some data *)
173173
if Frame.position data > 0 then self#send_frame data;
174174
if Frame.is_partial data then (
175-
if not self#fallible then (
175+
if self#is_ready && not self#fallible then (
176176
self#log#critical "Infallible source produced a partial frame!";
177177
assert false);
178178
self#log#info "Source ended (no more tracks) stopping output...";

tests/regression/GH4819-bis.liq

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
log.level := 4
2+
log.file.path := "/tmp/bla.log"
3+
log.stdout := true
4+
5+
s = sine()
6+
7+
recordings = ref([])
8+
9+
def shutdown_recordings() =
10+
log(
11+
"Stopping recordings"
12+
)
13+
14+
list.iter(
15+
fun (recording) ->
16+
begin
17+
log(
18+
"Shutting down #{recording.id()}"
19+
)
20+
recording.shutdown()
21+
end,
22+
recordings()
23+
)
24+
25+
recordings := []
26+
end
27+
28+
def start_recording(path) =
29+
log(
30+
"Starting recording"
31+
)
32+
33+
log(
34+
"Recording file: #{path}"
35+
)
36+
37+
recording = output.file(%mp3, append=true, {path}, s)
38+
39+
recordings := [recording, ...recordings()]
40+
end
41+
42+
tmp = file.temp(cleanup=true, "bla", "mp3")
43+
44+
start_recording(tmp)
45+
46+
tmp2 = file.temp(cleanup=true, "bla", "mp3")
47+
48+
start_recording(tmp2)
49+
50+
thread.run(
51+
delay=1.,
52+
fun () ->
53+
begin
54+
shutdown_recordings()
55+
tmp = file.temp(cleanup=true, "bla", "mp3")
56+
start_recording(tmp)
57+
tmp2 = file.temp(cleanup=true, "bla", "mp3")
58+
start_recording(tmp2)
59+
let [o] = recordings()
60+
o.on_start(synchronous=true, test.pass)
61+
end
62+
)

tests/regression/GH4819.liq

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
log.level := 4
2+
log.file.path := "/tmp/bla.log"
3+
log.stdout := true
4+
5+
s = sine()
6+
7+
s = mksafe(buffer(s))
8+
9+
recordings = ref([])
10+
11+
def shutdown_recordings() =
12+
log(
13+
"Stopping recordings"
14+
)
15+
16+
list.iter(
17+
fun (recording) ->
18+
begin
19+
log(
20+
"Shutting down #{recording.id()}"
21+
)
22+
recording.shutdown()
23+
end,
24+
recordings()
25+
)
26+
27+
recordings := []
28+
end
29+
30+
def start_recording(path) =
31+
log(
32+
"Starting recording"
33+
)
34+
35+
log(
36+
"Recording file: #{path}"
37+
)
38+
39+
recording = output.file(%mp3, append=true, {path}, s)
40+
41+
recordings := [recording, ...recordings()]
42+
end
43+
44+
tmp = file.temp(cleanup=true, "bla", "mp3")
45+
46+
start_recording(tmp)
47+
48+
tmp2 = file.temp(cleanup=true, "bla", "mp3")
49+
50+
start_recording(tmp2)
51+
52+
thread.run(
53+
delay=1.,
54+
fun () ->
55+
begin
56+
shutdown_recordings()
57+
tmp = file.temp(cleanup=true, "bla", "mp3")
58+
start_recording(tmp)
59+
tmp2 = file.temp(cleanup=true, "bla", "mp3")
60+
start_recording(tmp2)
61+
let [o] = recordings()
62+
o.on_start(synchronous=true, test.pass)
63+
end
64+
)

tests/regression/dune.inc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -927,6 +927,38 @@
927927
(:run_test ../run_test.exe))
928928
(action (run %{run_test} GH4748.liq liquidsoap %{test_liq} GH4748.liq)))
929929

930+
(rule
931+
(alias GH4819-bis)
932+
(package liquidsoap)
933+
(deps
934+
GH4819-bis.liq
935+
../media/all_media_files
936+
../../src/bin/liquidsoap.exe
937+
../streams/file1.png
938+
../streams/file1.mp3
939+
./theora-test.mp4
940+
(package liquidsoap)
941+
(source_tree ../../src/libs)
942+
(:test_liq ../test.liq)
943+
(:run_test ../run_test.exe))
944+
(action (run %{run_test} GH4819-bis.liq liquidsoap %{test_liq} GH4819-bis.liq)))
945+
946+
(rule
947+
(alias GH4819)
948+
(package liquidsoap)
949+
(deps
950+
GH4819.liq
951+
../media/all_media_files
952+
../../src/bin/liquidsoap.exe
953+
../streams/file1.png
954+
../streams/file1.mp3
955+
./theora-test.mp4
956+
(package liquidsoap)
957+
(source_tree ../../src/libs)
958+
(:test_liq ../test.liq)
959+
(:run_test ../run_test.exe))
960+
(action (run %{run_test} GH4819.liq liquidsoap %{test_liq} GH4819.liq)))
961+
930962
(rule
931963
(alias LS268)
932964
(package liquidsoap)
@@ -1499,6 +1531,8 @@
14991531
(alias GH4711)
15001532
(alias GH4745)
15011533
(alias GH4748)
1534+
(alias GH4819)
1535+
(alias GH4819-bis)
15021536
(alias LS268)
15031537
(alias LS354-1)
15041538
(alias LS354-2)

0 commit comments

Comments
 (0)