Skip to content

Commit e00b272

Browse files
greg-rychlewskiGreg Rychlewskiwarmwaffles
authored
Add set_update_hook/2 (#260)
* Adds `set_update_hook`. * Document new update hook and caveats. * Adds tests around `set_update_hook`. --------- Co-authored-by: Greg Rychlewski <[email protected]> Co-authored-by: Matthew Johnston <[email protected]>
1 parent 68f829c commit e00b272

File tree

4 files changed

+171
-1
lines changed

4 files changed

+171
-1
lines changed

c_src/sqlite3_nif.c

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
#define MAX_PATHNAME 512
1717

1818
static ErlNifResourceType* connection_type = NULL;
19-
static ErlNifResourceType* statement_type = NULL;
19+
static ErlNifResourceType* statement_type = NULL;
2020
static sqlite3_mem_methods default_alloc_methods = {0};
2121

2222
typedef struct connection
2323
{
2424
sqlite3* db;
2525
ErlNifMutex* mutex;
26+
ErlNifPid update_hook_pid;
2627
} connection_t;
2728

2829
typedef struct statement
@@ -999,6 +1000,73 @@ exqlite_enable_load_extension(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
9991000
return make_atom(env, "ok");
10001001
}
10011002

1003+
//
1004+
// Data Change Notifications
1005+
//
1006+
1007+
void
1008+
update_callback(void* arg, int sqlite_operation_type, char const* sqlite_database, char const* sqlite_table, sqlite3_int64 sqlite_rowid)
1009+
{
1010+
connection_t* conn = (connection_t*)arg;
1011+
1012+
if (conn == NULL) {
1013+
return;
1014+
}
1015+
1016+
ErlNifEnv* msg_env = enif_alloc_env();
1017+
ERL_NIF_TERM change_type;
1018+
1019+
switch (sqlite_operation_type) {
1020+
case SQLITE_INSERT:
1021+
change_type = make_atom(msg_env, "insert");
1022+
break;
1023+
case SQLITE_DELETE:
1024+
change_type = make_atom(msg_env, "delete");
1025+
break;
1026+
case SQLITE_UPDATE:
1027+
change_type = make_atom(msg_env, "update");
1028+
break;
1029+
default:
1030+
return;
1031+
}
1032+
ERL_NIF_TERM rowid = enif_make_int64(msg_env, sqlite_rowid);
1033+
ERL_NIF_TERM database = make_binary(msg_env, sqlite_database, strlen(sqlite_database));
1034+
ERL_NIF_TERM table = make_binary(msg_env, sqlite_table, strlen(sqlite_table));
1035+
ERL_NIF_TERM msg = enif_make_tuple4(msg_env, change_type, database, table, rowid);
1036+
1037+
if (!enif_send(NULL, &conn->update_hook_pid, msg_env, msg)) {
1038+
sqlite3_update_hook(conn->db, NULL, NULL);
1039+
}
1040+
1041+
enif_free_env(msg_env);
1042+
}
1043+
1044+
static ERL_NIF_TERM
1045+
exqlite_set_update_hook(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
1046+
{
1047+
assert(env);
1048+
connection_t* conn = NULL;
1049+
1050+
if (argc != 2) {
1051+
return enif_make_badarg(env);
1052+
}
1053+
1054+
if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
1055+
return make_error_tuple(env, "invalid_connection");
1056+
}
1057+
1058+
if (!enif_get_local_pid(env, argv[1], &conn->update_hook_pid)) {
1059+
return make_error_tuple(env, "invalid_pid");
1060+
}
1061+
1062+
// Passing the connection as the third argument causes it to be
1063+
// passed as the first argument to update_callback. This allows us
1064+
// to extract the hook pid and reset the hook if the pid is not alive.
1065+
sqlite3_update_hook(conn->db, update_callback, conn);
1066+
1067+
return make_atom(env, "ok");
1068+
}
1069+
10021070
//
10031071
// Most of our nif functions are going to be IO bounded
10041072
//
@@ -1019,6 +1087,7 @@ static ErlNifFunc nif_funcs[] = {
10191087
{"deserialize", 3, exqlite_deserialize, ERL_NIF_DIRTY_JOB_IO_BOUND},
10201088
{"release", 2, exqlite_release, ERL_NIF_DIRTY_JOB_IO_BOUND},
10211089
{"enable_load_extension", 2, exqlite_enable_load_extension, ERL_NIF_DIRTY_JOB_IO_BOUND},
1090+
{"set_update_hook", 2, exqlite_set_update_hook, ERL_NIF_DIRTY_JOB_IO_BOUND},
10221091
};
10231092

10241093
ERL_NIF_INIT(Elixir.Exqlite.Sqlite3NIF, nif_funcs, on_load, NULL, NULL, on_unload)

lib/exqlite/sqlite3.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,36 @@ defmodule Exqlite.Sqlite3 do
203203
end
204204
end
205205

206+
@doc """
207+
Send data change notifications to a process.
208+
209+
Each time an insert, update, or delete is performed on the connection provided
210+
as the first argument, a message will be sent to the pid provided as the second argument.
211+
212+
The message is of the form: `{action, db_name, table, row_id}`, where:
213+
214+
* `action` is one of `:insert`, `:update` or `:delete`
215+
* `db_name` is a string representing the database name where the change took place
216+
* `table` is a string representing the table name where the change took place
217+
* `row_id` is an integer representing the unique row id assigned by SQLite
218+
219+
## Restrictions
220+
221+
* There are some conditions where the update hook will not be invoked by SQLite.
222+
See the documentation for [more details](https://www.sqlite.org/c3ref/update_hook.html)
223+
* Only one pid can listen to the changes on a given database connection at a time.
224+
If this function is called multiple times for the same connection, only the last pid will
225+
receive the notifications
226+
* Updates only happen for the connection that is opened. For example, there
227+
are two connections A and B. When an update happens on connection B, the
228+
hook set for connection A will not receive the update, but the hook for
229+
connection B will receive the update.
230+
"""
231+
@spec set_update_hook(db(), pid()) :: :ok | {:error, reason()}
232+
def set_update_hook(conn, pid) do
233+
Sqlite3NIF.set_update_hook(conn, pid)
234+
end
235+
206236
defp convert(%Date{} = val), do: Date.to_iso8601(val)
207237
defp convert(%Time{} = val), do: Time.to_iso8601(val)
208238
defp convert(%NaiveDateTime{} = val), do: NaiveDateTime.to_iso8601(val)

lib/exqlite/sqlite3_nif.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,8 @@ defmodule Exqlite.Sqlite3NIF do
6464
@spec enable_load_extension(db(), integer()) :: :ok | {:error, reason()}
6565
def enable_load_extension(_conn, _flag), do: :erlang.nif_error(:not_loaded)
6666

67+
@spec set_update_hook(db(), pid()) :: :ok | {:error, reason()}
68+
def set_update_hook(_conn, _pid), do: :erlang.nif_error(:not_loaded)
69+
6770
# add statement inspection tooling https://sqlite.org/c3ref/expanded_sql.html
6871
end

test/exqlite/sqlite3_test.exs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,4 +411,72 @@ defmodule Exqlite.Sqlite3Test do
411411
assert {:row, [1, "hello"]} = Sqlite3.step(conn, statement)
412412
end
413413
end
414+
415+
describe "set_update_hook/2" do
416+
defmodule ChangeListener do
417+
use GenServer
418+
419+
def start_link({parent, name}),
420+
do: GenServer.start_link(__MODULE__, {parent, name})
421+
422+
def init({parent, name}), do: {:ok, {parent, name}}
423+
424+
def handle_info({_action, _db, _table, _row_id} = change, {parent, name}) do
425+
send(parent, {change, name})
426+
{:noreply, {parent, name}}
427+
end
428+
end
429+
430+
setup do
431+
{:ok, path} = Temp.path()
432+
{:ok, conn} = Sqlite3.open(path)
433+
:ok = Sqlite3.execute(conn, "create table test(num integer)")
434+
435+
on_exit(fn ->
436+
Sqlite3.close(conn)
437+
File.rm(path)
438+
end)
439+
440+
[conn: conn, path: path]
441+
end
442+
443+
test "can listen to data change notifications", context do
444+
{:ok, listener_pid} = ChangeListener.start_link({self(), :listener})
445+
Sqlite3.set_update_hook(context.conn, listener_pid)
446+
447+
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
448+
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (11)")
449+
:ok = Sqlite3.execute(context.conn, "update test set num = 1000")
450+
:ok = Sqlite3.execute(context.conn, "delete from test where num = 1000")
451+
452+
assert_receive {{:insert, "main", "test", 1}, _}, 1000
453+
assert_receive {{:insert, "main", "test", 2}, _}, 1000
454+
assert_receive {{:update, "main", "test", 1}, _}, 1000
455+
assert_receive {{:update, "main", "test", 2}, _}, 1000
456+
assert_receive {{:delete, "main", "test", 1}, _}, 1000
457+
assert_receive {{:delete, "main", "test", 2}, _}, 1000
458+
end
459+
460+
test "only one pid can listen at a time", context do
461+
{:ok, listener1_pid} = ChangeListener.start_link({self(), :listener1})
462+
{:ok, listener2_pid} = ChangeListener.start_link({self(), :listener2})
463+
464+
Sqlite3.set_update_hook(context.conn, listener1_pid)
465+
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
466+
assert_receive {{:insert, "main", "test", 1}, :listener1}, 1000
467+
468+
Sqlite3.set_update_hook(context.conn, listener2_pid)
469+
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
470+
assert_receive {{:insert, "main", "test", 2}, :listener2}, 1000
471+
refute_receive {{:insert, "main", "test", 2}, :listener1}, 1000
472+
end
473+
474+
test "notifications don't cross connections", context do
475+
{:ok, listener_pid} = ChangeListener.start_link({self(), :listener})
476+
{:ok, new_conn} = Sqlite3.open(context.path)
477+
Sqlite3.set_update_hook(new_conn, listener_pid)
478+
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
479+
refute_receive {{:insert, "main", "test", 1}, _}, 1000
480+
end
481+
end
414482
end

0 commit comments

Comments
 (0)