Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to emit/watch incoming and outgoing payload #328

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions lib/services/connection/call_manager/call_history.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import 'dart:async';

import 'package:flutter_deriv_api/services/connection/call_manager/call_history_entry.dart';
import 'package:flutter_deriv_api/services/interfaces/call_history_provider.dart';

/// Provides storage for messages sent/received via the web socket connection
class CallHistory {
class CallHistory implements CallHistoryProvider {
/// It initializes [CallHistory] instance.
CallHistory() {
_callHistoryBroadcaster = StreamController<NetworkPayload>.broadcast();
}

late final StreamController<NetworkPayload> _callHistoryBroadcaster;

/// Messages that were sent to the remote endpoint
final List<CallHistoryEntry> outgoing = <CallHistoryEntry>[];

Expand Down Expand Up @@ -29,7 +39,15 @@ class CallHistory {
incoming.add(
CallHistoryEntry(timeStamp: timestamp, method: method, message: message),
);

if (!method.contains('ping')) {
_callHistoryBroadcaster.add(
NetworkPayload(
method: method,
body: message,
direction: 'RECEIVED',
timeStamp: timestamp),
);
}
_trimHistory(incoming);
}

Expand All @@ -42,6 +60,15 @@ class CallHistory {
outgoing.add(
CallHistoryEntry(timeStamp: timestamp, method: method, message: message),
);
if (!method.contains('ping')) {
_callHistoryBroadcaster.add(
NetworkPayload(
method: method,
body: message,
direction: 'SENT',
timeStamp: timestamp),
);
}

_trimHistory(outgoing);
}
Expand All @@ -52,4 +79,7 @@ class CallHistory {
callHistory.removeRange(0, callHistory.length - limit);
}
}

@override
Stream<NetworkPayload> get stream => _callHistoryBroadcaster.stream;
}
29 changes: 29 additions & 0 deletions lib/services/interfaces/call_history_provider.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/// This interface provides stream of network payload that is going out(SENT)
/// and coming in(RECEIVED) to the application.
abstract class CallHistoryProvider {
/// Stream of network payload that is going out(SENT) and coming in(RECEIVED)
Stream<NetworkPayload> get stream;
}

/// Network payload that is going out and coming in from the web socket.
class NetworkPayload {
/// Initializes [NetworkPayload] instance.
NetworkPayload({
required this.method,
required this.body,
required this.direction,
required this.timeStamp,
});

/// name of the api.
final String method;

/// content of the api.
final Object body;

/// direction of the api i.e SENT or RECEIVED.
final String direction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be better if we made this one enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds better.


/// time of the api.
final int timeStamp;
}
9 changes: 6 additions & 3 deletions lib/state/connection/connection_cubit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConnectionCubit extends Cubit<ConnectionState> {

final String _key = '${UniqueKey()}';

late final BaseAPI? _api;
late final BaseAPI _api;

/// Enables debug mode.
///
Expand Down Expand Up @@ -86,6 +86,9 @@ class ConnectionCubit extends Cubit<ConnectionState> {
/// Stream subscription for connectivity.
StreamSubscription<ConnectivityResult>? connectivitySubscription;

/// Getter for [BaseAPI] implementation class. By default, it will be [BinaryAPI].
BaseAPI get api => _api;

/// Reconnect to Websocket.
Future<void> reconnect({
ConnectionInformation? connectionInformation,
Expand All @@ -109,7 +112,7 @@ class ConnectionCubit extends Cubit<ConnectionState> {
emit(const ConnectionConnectingState());

try {
await _api!.disconnect().timeout(_pingTimeout);
await _api.disconnect().timeout(_pingTimeout);
} on Exception catch (e) {
dev.log('$runtimeType disconnect exception: $e', error: e);

Expand All @@ -118,7 +121,7 @@ class ConnectionCubit extends Cubit<ConnectionState> {
return;
}

await _api!.connect(
await _api.connect(
_connectionInformation,
printResponse: enableDebug && printResponse,
onOpen: (String key) {
Expand Down
Loading