Skip to content

Commit 6beeb8a

Browse files
committed
Harness some harbor values against concurrent access.
1 parent 3475eb8 commit 6beeb8a

5 files changed

Lines changed: 109 additions & 34 deletions

File tree

src/core/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
compand
7272
compress
7373
compress_exp
74+
concurrent_hashtbl
7475
content
7576
content_audio
7677
content_pcm_base

src/core/harbor/harbor.ml

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
229229
method virtual get_mime_type : string option
230230
end
231231

232-
type sources = (string, source) Hashtbl.t
232+
type sources = (string, source) Concurrent_hashtbl.t
233233
type http_verb = [ `Get | `Post | `Put | `Delete | `Head | `Options ]
234234
type source_type = [ `Put | `Post | `Source | `Xaudiocast | `Shout ]
235235

@@ -319,11 +319,13 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
319319
fds : Unix.file_descr list;
320320
}
321321

322-
let opened_ports : (int, open_port) Hashtbl.t = Hashtbl.create 1
323-
let find_handler = Hashtbl.find opened_ports
322+
let opened_ports : (int, open_port) Concurrent_hashtbl.t =
323+
Concurrent_hashtbl.create 1
324+
325+
let find_handler = Concurrent_hashtbl.find opened_ports
324326

325327
let find_source mount port =
326-
Hashtbl.find (find_handler port).handler.sources mount
328+
Concurrent_hashtbl.find (find_handler port).handler.sources mount
327329

328330
exception Assoc of string
329331

@@ -717,19 +719,25 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
717719
in
718720
let* () = exec_http_auth_check ~args ~login:s#login h headers in
719721
let* () =
720-
if
721-
not
722-
(List.mem
723-
(Option.value ~default:"" s#get_mime_type)
724-
conf_icy_metadata#get)
725-
then (
726-
log#info
727-
"Returned 405 for '%s': Source format does not support ICY \
728-
metadata update"
729-
uri;
730-
simple_reply
731-
(http_error_page 405 "Method Not Allowed" "Method Not Allowed"))
732-
else Duppy.Monad.return ()
722+
match s#get_mime_type with
723+
| None ->
724+
log#critical
725+
"No mime-type found for source at %s, this should not \
726+
happen!"
727+
uri;
728+
simple_reply
729+
(http_error_page 500 "Internal Server Error"
730+
"Internal Server Error")
731+
| Some f when List.mem f conf_icy_metadata#get ->
732+
Duppy.Monad.return ()
733+
| Some f ->
734+
log#info
735+
"Returned 405 for '%s': Source format %s does not support \
736+
ICY metadata update"
737+
uri f;
738+
simple_reply
739+
(http_error_page 405 "Method Not Allowed"
740+
"Method Not Allowed")
733741
in
734742
Hashtbl.remove args "mount";
735743
Hashtbl.remove args "mode";
@@ -1134,15 +1142,17 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
11341142
creates the handlers when they are missing. *)
11351143
let get_handler ~pos ~transport ~icy port =
11361144
try
1137-
let { handler; fds; transport = t } = Hashtbl.find opened_ports port in
1145+
let { handler; fds; transport = t } =
1146+
Concurrent_hashtbl.find opened_ports port
1147+
in
11381148
if transport#name <> t#name then
11391149
Lang.raise_error ~pos
11401150
~message:"Port is already opened with a different transport" "http";
11411151
(* If we have only one socket and icy=true,
11421152
* we need to open a second one. *)
11431153
if List.length fds = 1 && icy then (
11441154
let fds = open_port ~transport ~icy (port + 1) :: fds in
1145-
Hashtbl.replace opened_ports port { handler; fds; transport })
1155+
Concurrent_hashtbl.replace opened_ports port { handler; fds; transport })
11461156
else ();
11471157
handler
11481158
with Not_found ->
@@ -1152,31 +1162,33 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
11521162
let fds =
11531163
if icy then open_port ~transport ~icy (port + 1) :: fds else fds
11541164
in
1155-
let handler = { sources = Hashtbl.create 1; http = Atomic.make [] } in
1156-
Hashtbl.replace opened_ports port { handler; fds; transport };
1165+
let handler =
1166+
{ sources = Concurrent_hashtbl.create 1; http = Atomic.make [] }
1167+
in
1168+
Concurrent_hashtbl.replace opened_ports port { handler; fds; transport };
11571169
handler
11581170

11591171
(* Add sources... This is tied up to sources lifecycle so
11601172
no need to prevent early start *)
11611173
let add_source ~pos ~transport ~port ~mountpoint ~icy source =
11621174
let sources =
11631175
let handler = get_handler ~pos ~transport ~icy port in
1164-
if Hashtbl.mem handler.sources mountpoint then
1176+
if Concurrent_hashtbl.mem handler.sources mountpoint then
11651177
Lang.raise_error ~pos ~message:"Mountpoint is already taken!" "http"
11661178
else ();
11671179
handler.sources
11681180
in
11691181
log#important "Adding mountpoint '%s' on port %i" mountpoint port;
1170-
Hashtbl.replace sources mountpoint source
1182+
Concurrent_hashtbl.replace sources mountpoint source
11711183

11721184
(* Remove source. *)
11731185
let remove_source ~port ~mountpoint () =
1174-
let { handler; fds; _ } = Hashtbl.find opened_ports port in
1175-
assert (Hashtbl.mem handler.sources mountpoint);
1186+
let { handler; fds; _ } = Concurrent_hashtbl.find opened_ports port in
1187+
assert (Concurrent_hashtbl.mem handler.sources mountpoint);
11761188
log#important "Removing mountpoint '%s' on port %i" mountpoint port;
1177-
Hashtbl.remove handler.sources mountpoint;
1189+
Concurrent_hashtbl.remove handler.sources mountpoint;
11781190
if
1179-
Hashtbl.length handler.sources = 0
1191+
Concurrent_hashtbl.length handler.sources = 0
11801192
&& List.length (Atomic.get handler.http) = 0
11811193
then (
11821194
log#important "Nothing more on port %i: closing sockets." port;
@@ -1185,7 +1197,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
11851197
Unix.close in_s
11861198
in
11871199
List.iter f fds;
1188-
Hashtbl.remove opened_ports port)
1200+
Concurrent_hashtbl.remove opened_ports port)
11891201
else ()
11901202

11911203
(* Add http_handler... *)
@@ -1202,7 +1214,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
12021214
(* Remove http_handler. *)
12031215
let remove_http_handler ~port ~verb ~uri () =
12041216
let exec () =
1205-
let { handler; fds; _ } = Hashtbl.find opened_ports port in
1217+
let { handler; fds; _ } = Concurrent_hashtbl.find opened_ports port in
12061218
let suri = Lang.descr_of_regexp uri in
12071219
let handlers, removed =
12081220
List.partition
@@ -1214,7 +1226,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
12141226
(string_of_verb verb) suri port;
12151227
Atomic.set handler.http handlers;
12161228
if
1217-
Hashtbl.length handler.sources = 0
1229+
Concurrent_hashtbl.length handler.sources = 0
12181230
&& List.length (Atomic.get handler.http) = 0
12191231
then (
12201232
log#info "Nothing more on port %i: closing sockets." port;
@@ -1223,7 +1235,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
12231235
Unix.close in_s
12241236
in
12251237
List.iter f fds;
1226-
Hashtbl.remove opened_ports port)
1238+
Concurrent_hashtbl.remove opened_ports port)
12271239
else ()
12281240
in
12291241
Server.on_start exec

src/core/sources/harbor_input.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class http_input_server ~pos ~transport ~dumpfile ~logfile ~bufferize ~max ~icy
4646
val mutable relay_read = fun _ _ _ -> assert false
4747

4848
val mutable create_decoder = fun _ -> assert false
49-
val mutable mime_type = None
49+
val mime_type = Atomic.make None
5050
val mutable dump = None
5151
val mutable logf = None
5252
val mutable on_connect = []
@@ -89,7 +89,7 @@ class http_input_server ~pos ~transport ~dumpfile ~logfile ~bufferize ~max ~icy
8989
(try Frame.Metadata.find "title" m with _ -> "?");
9090
Generator.add_metadata self#buffer m
9191

92-
method get_mime_type = mime_type
92+
method get_mime_type = Atomic.get mime_type
9393

9494
method feed =
9595
self#log#important "Decoding...";
@@ -187,7 +187,7 @@ class http_input_server ~pos ~transport ~dumpfile ~logfile ~bufferize ~max ~icy
187187
(decoder args, buffer)
188188
in
189189
create_decoder <- decoder;
190-
mime_type <- Some mime
190+
Atomic.set mime_type (Some mime)
191191
| None -> raise Harbor.Unknown_codec
192192

193193
method relay stype (headers : (string * string) list) ?(read = Harbor.read)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
(*****************************************************************************
2+
3+
Liquidsoap, a programmable stream generator.
4+
Copyright 2003-2026 Savonet team
5+
6+
This program is free software; you can redistribute it and/or modify
7+
it under the terms of the GNU General Public License as published by
8+
the Free Software Foundation; either version 2 of the License, or
9+
(at your option) any later version.
10+
11+
This program is distributed in the hope that it will be useful,
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU General Public License for more details, fully stated in the COPYING
15+
file at the root of the liquidsoap distribution.
16+
17+
You should have received a copy of the GNU General Public License
18+
along with this program; if not, write to the Free Software
19+
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20+
21+
*****************************************************************************)
22+
23+
type ('a, 'b) t = { m : Mutex.t; h : ('a, 'b) Hashtbl.t }
24+
25+
let create n = { m = Mutex.create (); h = Hashtbl.create n }
26+
let replace { m; h } k v = Mutex_utils.mutexify m (Hashtbl.replace h k) v
27+
let mem { m; h } k = Mutex_utils.mutexify m (Hashtbl.mem h) k
28+
let find { m; h } k = Mutex_utils.mutexify m (Hashtbl.find h) k
29+
let find_opt { m; h } k = Mutex_utils.mutexify m (Hashtbl.find_opt h) k
30+
let remove { m; h } k = Mutex_utils.mutexify m (Hashtbl.remove h) k
31+
let length { m; h } = Mutex_utils.mutexify m Hashtbl.length h
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
(*****************************************************************************
2+
3+
Liquidsoap, a programmable stream generator.
4+
Copyright 2003-2026 Savonet team
5+
6+
This program is free software; you can redistribute it and/or modify
7+
it under the terms of the GNU General Public License as published by
8+
the Free Software Foundation; either version 2 of the License, or
9+
(at your option) any later version.
10+
11+
This program is distributed in the hope that it will be useful,
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU General Public License for more details, fully stated in the COPYING
15+
file at the root of the liquidsoap distribution.
16+
17+
You should have received a copy of the GNU General Public License
18+
along with this program; if not, write to the Free Software
19+
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20+
21+
*****************************************************************************)
22+
23+
type ('a, 'b) t
24+
25+
val create : int -> ('a, 'b) t
26+
val replace : ('a, 'b) t -> 'a -> 'b -> unit
27+
val mem : ('a, 'b) t -> 'a -> bool
28+
val find : ('a, 'b) t -> 'a -> 'b
29+
val find_opt : ('a, 'b) t -> 'a -> 'b option
30+
val remove : ('a, 'b) t -> 'a -> unit
31+
val length : ('a, 'b) t -> int

0 commit comments

Comments
 (0)