Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snake sync #666

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
457 changes: 230 additions & 227 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ license = "Apache-2.0"
# See also `rust-toolchain.toml`
readme = "README.md"
repository = "https://github.com/girlbossceo/conduwuit"
rust-version = "1.83.0"
rust-version = "1.84.0"
version = "0.5.0"

[workspace.metadata.crane]
Expand Down Expand Up @@ -513,16 +513,16 @@ version = "0.2"
# https://github.com/girlbossceo/tracing/commit/b348dca742af641c47bc390261f60711c2af573c
[patch.crates-io.tracing-subscriber]
git = "https://github.com/girlbossceo/tracing"
rev = "ccc4fbd8238c2d5ba354e61ec17ac610af11401d"
rev = "05825066a6d0e9ad6b80dcf29457eb179ff4768c"
[patch.crates-io.tracing]
git = "https://github.com/girlbossceo/tracing"
rev = "ccc4fbd8238c2d5ba354e61ec17ac610af11401d"
rev = "05825066a6d0e9ad6b80dcf29457eb179ff4768c"
[patch.crates-io.tracing-core]
git = "https://github.com/girlbossceo/tracing"
rev = "ccc4fbd8238c2d5ba354e61ec17ac610af11401d"
rev = "05825066a6d0e9ad6b80dcf29457eb179ff4768c"
[patch.crates-io.tracing-log]
git = "https://github.com/girlbossceo/tracing"
rev = "ccc4fbd8238c2d5ba354e61ec17ac610af11401d"
rev = "05825066a6d0e9ad6b80dcf29457eb179ff4768c"

# adds a tab completion callback: https://github.com/girlbossceo/rustyline-async/commit/de26100b0db03e419a3d8e1dd26895d170d1fe50
# adds event for CTRL+\: https://github.com/girlbossceo/rustyline-async/commit/67d8c49aeac03a5ef4e818f663eaa94dd7bf339b
Expand Down
30 changes: 15 additions & 15 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
file = ./rust-toolchain.toml;

# See also `rust-toolchain.toml`
sha256 = "sha256-s1RPtyvDGJaX/BisLT+ifVfuhDT1nZkZ1NcK8sbwELM=";
sha256 = "sha256-lMLAupxng4Fd9F1oDw8gx+qA0RuF7ou7xhNU8wgs0PU=";
};

mkScope = pkgs: pkgs.lib.makeScope pkgs.newScope (self: {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# If you're having trouble making the relevant changes, bug a maintainer.

[toolchain]
channel = "1.83.0"
channel = "1.84.0"
profile = "minimal"
components = [
# For rust-analyzer
Expand Down
51 changes: 48 additions & 3 deletions src/api/client/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
mod v3;
mod v4;
mod v5;

use conduwuit::{
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
utils::{
stream::{BroadbandExt, ReadyExt, TryIgnore},
IterStream,
},
PduCount,
};
use futures::{pin_mut, StreamExt};
use ruma::{RoomId, UserId};
use ruma::{
directory::RoomTypeFilter,
events::TimelineEventType::{
self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
},
RoomId, UserId,
};

pub(crate) use self::{v3::sync_events_route, v4::sync_events_v4_route};
pub(crate) use self::{
v3::sync_events_route, v4::sync_events_v4_route, v5::sync_events_v5_route,
};
use crate::{service::Services, Error, PduEvent, Result};

pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];

async fn load_timeline(
services: &Services,
sender_user: &UserId,
Expand Down Expand Up @@ -69,3 +84,33 @@ async fn share_encrypted_room(
})
.await
}

pub(crate) async fn filter_rooms<'a>(
services: &Services,
rooms: &[&'a RoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<&'a RoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;

if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}

let room_type_filter = RoomTypeFilter::from(room_type.ok());

let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};

include.then_some(r)
})
.collect()
.await
}
94 changes: 35 additions & 59 deletions src/api/client/sync/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ use ruma::{
DeviceLists, UnreadNotificationsCount,
},
},
directory::RoomTypeFilter,
events::{
room::member::{MembershipState, RoomMemberEventContent},
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::{self, *},
TimelineEventType::*,
},
serde::Raw,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, UInt,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt,
};
use service::{rooms::read_receipt::pack_receipts, Services};
use service::rooms::read_receipt::pack_receipts;

use super::{load_timeline, share_encrypted_room};
use crate::{client::ignored_filter, Ruma};

const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
use crate::{
client::{filter_rooms, ignored_filter, sync::v5::TodoRooms, DEFAULT_BUMP_TYPES},
Ruma,
};

const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";

/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
Expand Down Expand Up @@ -121,13 +120,19 @@ pub(crate) async fn sync_events_v4_route(
.collect()
.await;

let all_rooms = all_joined_rooms
let all_invited_rooms: Vec<&RoomId> = all_invited_rooms.iter().map(AsRef::as_ref).collect();
let all_knocked_rooms: Vec<&RoomId> = all_knocked_rooms.iter().map(AsRef::as_ref).collect();

let all_rooms: Vec<&RoomId> = all_joined_rooms
.iter()
.chain(all_invited_rooms.iter())
.chain(all_knocked_rooms.iter())
.map(Clone::clone)
.map(AsRef::as_ref)
.chain(all_invited_rooms.iter().map(AsRef::as_ref))
.chain(all_knocked_rooms.iter().map(AsRef::as_ref))
.collect();

let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect();
let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect();

if body.extensions.to_device.enabled.unwrap_or(false) {
services
.users
Expand Down Expand Up @@ -180,6 +185,7 @@ pub(crate) async fn sync_events_v4_route(
);

for room_id in &all_joined_rooms {
let room_id: &&RoomId = room_id;
let Ok(current_shortstatehash) =
services.rooms.state.get_room_shortstatehash(room_id).await
else {
Expand Down Expand Up @@ -332,7 +338,7 @@ pub(crate) async fn sync_events_v4_route(
}

let mut lists = BTreeMap::new();
let mut todo_rooms = BTreeMap::new(); // and required state
let mut todo_rooms: TodoRooms = BTreeMap::new(); // and required state

for (list_id, list) in &body.lists {
let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) {
Expand All @@ -353,7 +359,7 @@ pub(crate) async fn sync_events_v4_route(
| None => active_rooms,
};

let mut new_known_rooms = BTreeSet::new();
let mut new_known_rooms: BTreeSet<OwnedRoomId> = BTreeSet::new();

let ranges = list.ranges.clone();
lists.insert(list_id.clone(), sync_events::v4::SyncList {
Expand All @@ -375,9 +381,9 @@ pub(crate) async fn sync_events_v4_route(
Vec::new()
};

new_known_rooms.extend(room_ids.iter().cloned());
new_known_rooms.extend(room_ids.clone().into_iter().map(ToOwned::to_owned));
for room_id in &room_ids {
let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
let todo_room = todo_rooms.entry((*room_id).to_owned()).or_insert((
BTreeSet::new(),
0_usize,
u64::MAX,
Expand All @@ -399,7 +405,7 @@ pub(crate) async fn sync_events_v4_route(
todo_room.2 = todo_room.2.min(
known_rooms
.get(list_id.as_str())
.and_then(|k| k.get(room_id))
.and_then(|k| k.get(*room_id))
.copied()
.unwrap_or(0),
);
Expand All @@ -408,7 +414,7 @@ pub(crate) async fn sync_events_v4_route(
op: SlidingOp::Sync,
range: Some(r),
index: None,
room_ids,
room_ids: room_ids.into_iter().map(ToOwned::to_owned).collect(),
room_id: None,
}
})
Expand All @@ -418,8 +424,8 @@ pub(crate) async fn sync_events_v4_route(

if let Some(conn_id) = &body.conn_id {
services.sync.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
sender_user,
&sender_device,
conn_id.clone(),
list_id.clone(),
new_known_rooms,
Expand Down Expand Up @@ -464,8 +470,8 @@ pub(crate) async fn sync_events_v4_route(

if let Some(conn_id) = &body.conn_id {
services.sync.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
sender_user,
&sender_device,
conn_id.clone(),
"subscriptions".to_owned(),
known_subscription_rooms,
Expand All @@ -489,7 +495,8 @@ pub(crate) async fn sync_events_v4_route(
let mut timestamp: Option<_> = None;
let mut invite_state = None;
let (timeline_pdus, limited);
if all_invited_rooms.contains(room_id) {
let new_room_id: &RoomId = (*room_id).as_ref();
if all_invited_rooms.contains(&new_room_id) {
// TODO: figure out a timestamp we can use for remote invites
invite_state = services
.rooms
Expand Down Expand Up @@ -519,7 +526,7 @@ pub(crate) async fn sync_events_v4_route(
}

account_data.rooms.insert(
room_id.clone(),
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince)
Expand Down Expand Up @@ -749,10 +756,9 @@ pub(crate) async fn sync_events_v4_route(
});
}

if rooms
.iter()
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
{
if rooms.iter().all(|(id, r)| {
r.timeline.is_empty() && r.required_state.is_empty() && !receipts.rooms.contains_key(id)
}) {
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
Expand Down Expand Up @@ -798,33 +804,3 @@ pub(crate) async fn sync_events_v4_route(
delta_token: None,
})
}

async fn filter_rooms(
services: &Services,
rooms: &[OwnedRoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<OwnedRoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;

if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}

let room_type_filter = RoomTypeFilter::from(room_type.ok());

let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};

include.then_some(r.to_owned())
})
.collect()
.await
}
Loading
Loading