Skip to content

Commit 456893c

Browse files
committed
Add support for pagination cursors
Not tested live yet
1 parent 7d583b2 commit 456893c

File tree

7 files changed

+284
-32
lines changed

7 files changed

+284
-32
lines changed

lib/irc_conn/reader.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ defmodule M51.IrcConn.Reader do
3535
defp loop_serve(supervisor, sock) do
3636
case :gen_tcp.recv(sock, 0) do
3737
{:ok, line} ->
38-
Logger.debug("IRC C->S #{Regex.replace(~r/[\r\n]/, line, "")}")
38+
Logger.info("IRC C->S #{Regex.replace(~r/[\r\n]/, line, "")}")
3939
{:ok, command} = M51.Irc.Command.parse(line)
4040
Registry.send({M51.Registry, {supervisor, :irc_handler}}, command)
4141
loop_serve(supervisor, sock)

lib/irc_conn/writer.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ defmodule M51.IrcConn.Writer do
5555
case arg do
5656
{:line, line} ->
5757
{_supervisor, sock} = state
58-
Logger.debug("IRC S->C #{Regex.replace(~r/[\r\n]/, line, "")}")
58+
Logger.info("IRC S->C #{Regex.replace(~r/[\r\n]/, line, "")}")
5959
:gen_tcp.send(sock, line)
6060

6161
{:close} ->

lib/matrix/raw_client.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ defmodule M51.Matrix.RawClient do
2424

2525
def get(client, path, headers \\ [], options \\ []) do
2626
headers = [Authorization: "Bearer " <> client.access_token] ++ headers
27-
options = options |> Keyword.put_new(:timeout, 20000)
27+
options = options |> Keyword.put_new(:timeout, 60000)
2828

2929
url = client.base_url <> path
3030

lib/matrix_client/chat_history.ex

Lines changed: 126 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,36 @@ defmodule M51.MatrixClient.ChatHistory do
2222
def after_(sup_pid, room_id, anchor, limit) do
2323
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)
2424

25-
case parse_anchor(anchor) do
26-
{:ok, event_id} ->
25+
case parse_anchor(anchor, true) do
26+
{:ok, :msgid, event_id} ->
2727
case M51.MatrixClient.Client.get_event_context(
2828
client,
2929
room_id,
3030
event_id,
3131
limit * 2
3232
) do
33-
{:ok, events} -> {:ok, process_events(sup_pid, room_id, events["events_after"])}
34-
{:error, message} -> {:error, Kernel.inspect(message)}
33+
{:ok, events} ->
34+
{:ok,
35+
process_events(sup_pid, room_id, events["events_after"], Map.get(events, "end"), nil)}
36+
37+
{:error, message} ->
38+
{:error, Kernel.inspect(message)}
39+
end
40+
41+
{:ok, :cursor, cursor} ->
42+
case M51.MatrixClient.Client.get_events_from_cursor(
43+
client,
44+
room_id,
45+
"f",
46+
cursor,
47+
limit
48+
) do
49+
{:ok, events} ->
50+
{:ok,
51+
process_events(sup_pid, room_id, events["chunk"], Map.get(events, "end"), nil)}
52+
53+
{:error, message} ->
54+
{:error, Kernel.inspect(message)}
3555
end
3656

3757
{:error, message} ->
@@ -42,9 +62,9 @@ defmodule M51.MatrixClient.ChatHistory do
4262
def around(sup_pid, room_id, anchor, limit) do
4363
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)
4464

45-
case parse_anchor(anchor) do
46-
{:ok, event_id} ->
47-
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit) do
65+
case parse_anchor(anchor, false) do
66+
{:ok, :msgid, event_id} ->
67+
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit - 1) do
4868
{:ok, events} ->
4969
# TODO: if there aren't enough events after (resp. before), allow more
5070
# events before (resp. after) than half the limit.
@@ -53,9 +73,16 @@ defmodule M51.MatrixClient.ChatHistory do
5373

5474
events_before = events["events_before"] |> Enum.slice(0, nb_before) |> Enum.reverse()
5575
events_after = events["events_after"] |> Enum.slice(0, nb_after)
56-
events = Enum.concat([events_before, [events["event"]], events_after])
76+
events_list = Enum.concat([events_before, [events["event"]], events_after])
5777

58-
{:ok, process_events(sup_pid, room_id, events)}
78+
{:ok,
79+
process_events(
80+
sup_pid,
81+
room_id,
82+
events_list,
83+
Map.get(events, "end"),
84+
Map.get(events, "start")
85+
)}
5986

6087
{:error, message} ->
6188
{:error, Kernel.inspect(message)}
@@ -69,16 +96,45 @@ defmodule M51.MatrixClient.ChatHistory do
6996
def before(sup_pid, room_id, anchor, limit) do
7097
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)
7198

72-
case parse_anchor(anchor) do
73-
{:ok, event_id} ->
99+
case parse_anchor(anchor, true) do
100+
{:ok, :msgid, event_id} ->
74101
case M51.MatrixClient.Client.get_event_context(
75102
client,
76103
room_id,
77104
event_id,
78105
limit * 2
79106
) do
80107
{:ok, events} ->
81-
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["events_before"]))}
108+
{:ok,
109+
process_events(
110+
sup_pid,
111+
room_id,
112+
Enum.reverse(events["events_before"]),
113+
Map.get(events, "start"),
114+
nil
115+
)}
116+
117+
{:error, message} ->
118+
{:error, Kernel.inspect(message)}
119+
end
120+
121+
{:ok, :cursor, cursor} ->
122+
case M51.MatrixClient.Client.get_events_from_cursor(
123+
client,
124+
room_id,
125+
"b",
126+
cursor,
127+
limit
128+
) do
129+
{:ok, events} ->
130+
{:ok,
131+
process_events(
132+
sup_pid,
133+
room_id,
134+
Enum.reverse(events["chunk"]),
135+
Map.get(events, "end"),
136+
nil
137+
)}
82138

83139
{:error, message} ->
84140
{:error, Kernel.inspect(message)}
@@ -98,28 +154,41 @@ defmodule M51.MatrixClient.ChatHistory do
98154
limit
99155
) do
100156
{:ok, events} ->
101-
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["chunk"]))}
157+
{:ok,
158+
process_events(
159+
sup_pid,
160+
room_id,
161+
Enum.reverse(events["chunk"]),
162+
Map.get(events, "end"),
163+
nil
164+
)}
102165

103166
{:error, message} ->
104167
{:error, Kernel.inspect(message)}
105168
end
106169
end
107170

108-
defp parse_anchor(anchor) do
171+
defp parse_anchor(anchor, allow_cursor) do
109172
case String.split(anchor, "=", parts: 2) do
110173
["msgid", msgid] ->
111-
{:ok, msgid}
174+
{:ok, :msgid, msgid}
175+
176+
["cursor", cursor] when allow_cursor ->
177+
{:ok, :cursor, cursor}
178+
179+
["cursor", _] ->
180+
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}
112181

113182
["timestamp", _] ->
114183
{:error,
115184
"CHATHISTORY with timestamps is not supported. See https://github.com/progval/matrix2051/issues/1"}
116185

117186
_ ->
118-
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}
187+
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid=' or 'cursor='."}
119188
end
120189
end
121190

122-
defp process_events(sup_pid, room_id, events) do
191+
defp process_events(sup_pid, room_id, events, next, prev) do
123192
pid = self()
124193
write = fn cmd -> send(pid, {:command, cmd}) end
125194

@@ -154,12 +223,45 @@ defmodule M51.MatrixClient.ChatHistory do
154223
|> Task.await()
155224

156225
# Collect all commands
157-
Stream.unfold(nil, fn _ ->
158-
receive do
159-
{:command, cmd} -> {cmd, nil}
160-
{:finished_processing} -> nil
161-
end
162-
end)
163-
|> Enum.to_list()
226+
batch_content =
227+
Stream.unfold(nil, fn _ ->
228+
receive do
229+
{:command, cmd} -> {cmd, nil}
230+
{:finished_processing} -> nil
231+
end
232+
end)
233+
|> Enum.to_list()
234+
235+
# Prepend cursors, if any
236+
case {next, prev} do
237+
{nil, nil} ->
238+
batch_content
239+
240+
{next, nil} ->
241+
cursors = %M51.Irc.Command{
242+
command: "CHATHISTORY",
243+
params: ["CURSORS", room_id, next]
244+
}
245+
246+
[cursors | batch_content]
247+
248+
{nil, prev} ->
249+
# what do we do here?
250+
# https://github.com/ircv3/ircv3-specifications/pull/525/files#r1214764104
251+
cursors = %M51.Irc.Command{
252+
command: "CHATHISTORY",
253+
params: ["CURSORS", room_id, "*", prev]
254+
}
255+
256+
[cursors | batch_content]
257+
258+
{next, prev} ->
259+
cursors = %M51.Irc.Command{
260+
command: "CHATHISTORY",
261+
params: ["CURSORS", room_id, next, prev]
262+
}
263+
264+
[cursors | batch_content]
265+
end
164266
end
165267
end

lib/matrix_client/client.ex

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule M51.MatrixClient.Client do
3838

3939
# timeout used for all requests sent to a homeserver.
4040
# It should be slightly larger than M51.Matrix.RawClient's timeout,
41-
@timeout 25000
41+
@timeout 65000
4242

4343
def start_link(opts) do
4444
{sup_pid, _extra_args} = opts
@@ -325,6 +325,36 @@ defmodule M51.MatrixClient.Client do
325325
{:reply, reply, state}
326326
end
327327

328+
@impl true
329+
def handle_call({:get_events_from_cursor, channel, dir, cursor, limit}, _from, state) do
330+
%M51.MatrixClient.Client{
331+
state: :connected,
332+
irc_pid: irc_pid,
333+
raw_client: raw_client
334+
} = state
335+
336+
matrix_state = M51.IrcConn.Supervisor.matrix_state(irc_pid)
337+
338+
reply =
339+
case M51.MatrixClient.State.room_from_irc_channel(matrix_state, channel) do
340+
nil ->
341+
{:error, {:room_not_found, channel}}
342+
343+
{room_id, _room} ->
344+
path =
345+
"/_matrix/client/v3/rooms/#{urlquote(room_id)}/messages?" <>
346+
URI.encode_query(%{"limit" => limit, "dir" => dir, "from" => cursor})
347+
348+
case M51.Matrix.RawClient.get(raw_client, path) do
349+
{:ok, events} -> {:ok, events}
350+
{:error, error} -> {:error, error}
351+
{:error, nil, error} -> {:error, error}
352+
end
353+
end
354+
355+
{:reply, reply, state}
356+
end
357+
328358
@impl true
329359
def handle_call({:get_latest_events, channel, limit}, _from, state) do
330360
%M51.MatrixClient.Client{
@@ -348,6 +378,7 @@ defmodule M51.MatrixClient.Client do
348378
case M51.Matrix.RawClient.get(raw_client, path) do
349379
{:ok, events} -> {:ok, events}
350380
{:error, error} -> {:error, error}
381+
{:error, nil, error} -> {:error, error}
351382
end
352383
end
353384

@@ -559,6 +590,16 @@ defmodule M51.MatrixClient.Client do
559590
GenServer.call(pid, {:get_event_context, channel, event_id, limit}, @timeout)
560591
end
561592

593+
@doc """
594+
Returns a page of events just before/after those returned by a previous call
595+
to get_event_context/4 or get_events_from_cursor/5
596+
597+
https://matrix.org/docs/spec/client_server/r0.6.1#id131
598+
"""
599+
def get_events_from_cursor(pid, channel, dir, cursor, limit) do
600+
GenServer.call(pid, {:get_events_from_cursor, channel, dir, cursor, limit}, @timeout)
601+
end
602+
562603
@doc """
563604
Returns latest events of a room
564605

0 commit comments

Comments
 (0)