Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pagination cursors #55

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 126 additions & 24 deletions lib/matrix_client/chat_history.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,36 @@ defmodule M51.MatrixClient.ChatHistory do
def after_(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case parse_anchor(anchor, true) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(
client,
room_id,
event_id,
limit * 2
) do
{:ok, events} -> {:ok, process_events(sup_pid, room_id, events["events_after"])}
{:error, message} -> {:error, Kernel.inspect(message)}
{:ok, events} ->
{:ok,
process_events(sup_pid, room_id, events["events_after"], Map.get(events, "end"), nil)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:ok, :cursor, cursor} ->
case M51.MatrixClient.Client.get_events_from_cursor(
client,
room_id,
"f",
cursor,
limit
) do
{:ok, events} ->
{:ok,
process_events(sup_pid, room_id, events["chunk"], Map.get(events, "end"), nil)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:error, message} ->
Expand All @@ -42,9 +62,9 @@ defmodule M51.MatrixClient.ChatHistory do
def around(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit) do
case parse_anchor(anchor, false) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit - 1) do
{:ok, events} ->
# TODO: if there aren't enough events after (resp. before), allow more
# events before (resp. after) than half the limit.
Expand All @@ -53,9 +73,16 @@ defmodule M51.MatrixClient.ChatHistory do

events_before = events["events_before"] |> Enum.slice(0, nb_before) |> Enum.reverse()
events_after = events["events_after"] |> Enum.slice(0, nb_after)
events = Enum.concat([events_before, [events["event"]], events_after])
events_list = Enum.concat([events_before, [events["event"]], events_after])

{:ok, process_events(sup_pid, room_id, events)}
{:ok,
process_events(
sup_pid,
room_id,
events_list,
Map.get(events, "end"),
Map.get(events, "start")
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
Expand All @@ -69,16 +96,45 @@ defmodule M51.MatrixClient.ChatHistory do
def before(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case parse_anchor(anchor, true) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(
client,
room_id,
event_id,
limit * 2
) do
{:ok, events} ->
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["events_before"]))}
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["events_before"]),
Map.get(events, "start"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:ok, :cursor, cursor} ->
case M51.MatrixClient.Client.get_events_from_cursor(
client,
room_id,
"b",
cursor,
limit
) do
{:ok, events} ->
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["chunk"]),
Map.get(events, "end"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
Expand All @@ -98,28 +154,41 @@ defmodule M51.MatrixClient.ChatHistory do
limit
) do
{:ok, events} ->
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["chunk"]))}
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["chunk"]),
Map.get(events, "end"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end
end

defp parse_anchor(anchor) do
defp parse_anchor(anchor, allow_cursor) do
case String.split(anchor, "=", parts: 2) do
["msgid", msgid] ->
{:ok, msgid}
{:ok, :msgid, msgid}

["cursor", cursor] when allow_cursor ->
{:ok, :cursor, cursor}

["cursor", _] ->
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}

["timestamp", _] ->
{:error,
"CHATHISTORY with timestamps is not supported. See https://github.com/progval/matrix2051/issues/1"}

_ ->
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid=' or 'cursor='."}
end
end

defp process_events(sup_pid, room_id, events) do
defp process_events(sup_pid, room_id, events, next, prev) do
pid = self()
write = fn cmd -> send(pid, {:command, cmd}) end

Expand Down Expand Up @@ -154,12 +223,45 @@ defmodule M51.MatrixClient.ChatHistory do
|> Task.await()

# Collect all commands
Stream.unfold(nil, fn _ ->
receive do
{:command, cmd} -> {cmd, nil}
{:finished_processing} -> nil
end
end)
|> Enum.to_list()
batch_content =
Stream.unfold(nil, fn _ ->
receive do
{:command, cmd} -> {cmd, nil}
{:finished_processing} -> nil
end
end)
|> Enum.to_list()

# Prepend cursors, if any
case {next, prev} do
{nil, nil} ->
batch_content

{next, nil} ->
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, next]
}

[cursors | batch_content]

{nil, prev} ->
# what do we do here?
# https://github.com/ircv3/ircv3-specifications/pull/525/files#r1214764104
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, "*", prev]
}

[cursors | batch_content]

{next, prev} ->
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, next, prev]
}

[cursors | batch_content]
end
end
end
40 changes: 40 additions & 0 deletions lib/matrix_client/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,36 @@ defmodule M51.MatrixClient.Client do
{:reply, reply, state}
end

@impl true
def handle_call({:get_events_from_cursor, channel, dir, cursor, limit}, _from, state) do
%M51.MatrixClient.Client{
state: :connected,
irc_pid: irc_pid,
raw_client: raw_client
} = state

matrix_state = M51.IrcConn.Supervisor.matrix_state(irc_pid)

reply =
case M51.MatrixClient.State.room_from_irc_channel(matrix_state, channel) do
nil ->
{:error, {:room_not_found, channel}}

{room_id, _room} ->
path =
"/_matrix/client/v3/rooms/#{urlquote(room_id)}/messages?" <>
URI.encode_query(%{"limit" => limit, "dir" => dir, "from" => cursor})

case M51.Matrix.RawClient.get(raw_client, path) do
{:ok, events} -> {:ok, events}
{:error, error} -> {:error, error}
{:error, nil, error} -> {:error, error}
end
end

{:reply, reply, state}
end

@impl true
def handle_call({:get_latest_events, channel, limit}, _from, state) do
%M51.MatrixClient.Client{
Expand Down Expand Up @@ -605,6 +635,16 @@ defmodule M51.MatrixClient.Client do
GenServer.call(pid, {:get_event_context, channel, event_id, limit}, @timeout)
end

@doc """
Returns a page of events just before/after those returned by a previous call
to get_event_context/4 or get_events_from_cursor/5

https://matrix.org/docs/spec/client_server/r0.6.1#id131
"""
def get_events_from_cursor(pid, channel, dir, cursor, limit) do
GenServer.call(pid, {:get_events_from_cursor, channel, dir, cursor, limit}, @timeout)
end

@doc """
Returns latest events of a room

Expand Down
Loading