Skip to content

Commit

Permalink
Merge branch 'refactor/networker-pipes'
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeDoctorDE committed Jun 21, 2024
2 parents 9fcf246 + bb0edfd commit 7838a86
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 44 deletions.
1 change: 1 addition & 0 deletions packages/networker/networker/lib/networker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ library networker;

export 'src/connection.dart';
export 'src/plugin/channel.dart';
export 'src/plugin/echo.dart';
export 'src/plugin/plugin.dart';
export 'src/plugin/json.dart';
export 'src/plugin/room.dart';
Expand Down
1 change: 1 addition & 0 deletions packages/networker/networker/lib/src/connection.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:meta/meta.dart';

Expand Down
33 changes: 26 additions & 7 deletions packages/networker/networker/lib/src/server.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
part of 'connection.dart';

abstract class ConnectionInfo {}
abstract class ConnectionInfo {
void close();
void sendMessage(Uint8List data);
bool get isClosed;
}

abstract class NetworkerServer<T extends ConnectionInfo> extends NetworkerBase {
final Set<Channel> _connections = {};
final Map<Channel, T> _connections = {};
final StreamController<Channel> _connectController =
StreamController.broadcast();
final StreamController<Channel> _disconnectController =
Expand All @@ -12,21 +16,36 @@ abstract class NetworkerServer<T extends ConnectionInfo> extends NetworkerBase {
Stream<Channel> get clientConnect => _connectController.stream;
Stream<Channel> get clientDisconnect => _disconnectController.stream;

List<Channel> get connections => List.unmodifiable(_connections);
List<Channel> get clientConnections => _connections.keys.toList();

T? getConnectionInfo();
T? getConnectionInfo(Channel channel) => _connections[channel];

@protected
bool addClientConnection(Channel id) {
if (!_connections.add(id)) return false;
bool addClientConnection(Channel id, T info) {
if (_connections.containsKey(id)) return false;
_connections[id] = info;
_connectController.add(id);
return true;
}

@protected
bool removeConnection(Channel id) {
if (!_connections.remove(id)) return false;
if (_connections.remove(id) == null) return false;
_disconnectController.add(id);
return true;
}

void _sendMessage(Uint8List data, Channel channel) =>
getConnectionInfo(channel)?.sendMessage(data);

@override
void sendMessage(Uint8List data, [Channel channel = kAnyChannel]) {
if (channel == kAnyChannel) {
for (final id in _connections.keys) {
_sendMessage(data, id);
}
} else {
_sendMessage(data, channel);
}
}
}
5 changes: 1 addition & 4 deletions packages/networker/networker_rtc/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ environment:
dependencies:
flutter_webrtc: ^0.10.6
networker:
git:
url: https://github.com/LinwoodDev/dart_pkgs.git
path: packages/networker/networker
ref: 6617b9bbfb481ef59b1a6921f1676bb90d73ecf9
path: ../networker

dev_dependencies:
lints: ^4.0.0
Expand Down
3 changes: 2 additions & 1 deletion packages/networker/networker_s5/lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class NetworkerS5 extends NetworkerClient {
bool get isClosed => subscription == null;

@override
Future<void> sendMessage(RawData data) async {
Future<void> sendMessage(Uint8List data,
[Channel channel = kAnyChannel]) async {
super.sendMessage(data);
final msg = await SignedStreamMessage.create(
kp: kp,
Expand Down
5 changes: 1 addition & 4 deletions packages/networker/networker_s5/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ environment:
sdk: '>=2.17.1 <3.0.0'
dependencies:
networker:
git:
url: https://github.com/LinwoodDev/dart_pkgs.git
path: packages/networker/networker
ref: 6617b9bbfb481ef59b1a6921f1676bb90d73ecf9
path: ../networker
s5: ^0.2.0
hive: ^2.2.3
http: ^1.2.0
Expand Down
18 changes: 10 additions & 8 deletions packages/networker/networker_socket/lib/client.dart
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
import 'dart:typed_data';

import 'package:networker/networker.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

class NetworkerSocketClient extends NetworkerClient {
final WebSocketChannel channel;
final WebSocketChannel webSocketChannel;
@override
final Uri address;

NetworkerSocketClient(Uri address, {Iterable<String>? protocols})
: this.fromChannel(
address, WebSocketChannel.connect(address, protocols: protocols));

NetworkerSocketClient.fromChannel(this.address, this.channel);
NetworkerSocketClient.fromChannel(this.address, this.webSocketChannel);

@override
void init() {
channel.stream.listen((event) {
webSocketChannel.stream.listen((event) {
onMessage(event);
});
}

@override
void close() {
channel.sink.close();
webSocketChannel.sink.close();
}

@override
bool get isClosed => channel.closeReason == null;
bool get isClosed => webSocketChannel.closeReason == null;

@override
Future<void> sendMessage(RawData data) {
Future<void> sendMessage(Uint8List data, [Channel channel = kAnyChannel]) {
super.sendMessage(data);
channel.sink.add(data);
return channel.sink.done;
webSocketChannel.sink.add(data);
return webSocketChannel.sink.done;
}
}
27 changes: 11 additions & 16 deletions packages/networker/networker_socket/lib/server.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';

import 'package:networker/networker.dart';

class NetworkerSocketServerConnection extends NetworkerConnection {
final WebSocket socket;
@override
class NetworkerSocketInfo extends ConnectionInfo {
final Uri address;
final WebSocket socket;

NetworkerSocketServerConnection(this.socket, this.address);
NetworkerSocketInfo(this.address, this.socket);

@override
void close() {
Expand All @@ -19,15 +19,15 @@ class NetworkerSocketServerConnection extends NetworkerConnection {
bool get isClosed => socket.closeReason != null;

@override
FutureOr<void> init() {}
void sendMessage(Uint8List data) {
socket.add(data);
}
}

class NetworkerSocketServer
extends NetworkerServer<NetworkerSocketServerConnection> {
class NetworkerSocketServer extends NetworkerServer<NetworkerSocketInfo> {
final HttpServer server;
bool _isClosed = false;
bool Function(HttpRequest event)? filterConnections;

NetworkerSocketServer(this.server, [this.filterConnections]);

@override
Expand All @@ -51,20 +51,15 @@ class NetworkerSocketServer
await for (var request in server.where(filterConnections ?? (e) => true)) {
try {
final socket = await WebSocketTransformer.upgrade(request);
addConnection(socket.hashCode,
NetworkerSocketServerConnection(socket, request.requestedUri));
addClientConnection(
socket.hashCode, NetworkerSocketInfo(request.requestedUri, socket));
socket.listen((event) {
onMessage(socket.hashCode, event);
onMessage(event, socket.hashCode);
}, onDone: () {
removeConnection(socket.hashCode);
});
} catch (_) {}
}
_isClosed = true;
}

@override
void sendMessage(ConnectionId id, RawData data) {
getConnection(id)?.socket.add(data);
}
}
5 changes: 1 addition & 4 deletions packages/networker/networker_socket/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ environment:
sdk: '>=2.17.1 <3.0.0'
dependencies:
networker:
git:
url: https://github.com/LinwoodDev/dart_pkgs.git
path: packages/networker/networker
ref: 6617b9bbfb481ef59b1a6921f1676bb90d73ecf9
path: ../networker
web_socket_channel: ^3.0.0

dev_dependencies:
Expand Down

0 comments on commit 7838a86

Please sign in to comment.