From 783f1f83d3c92ab6a352639e299b738b334b082c Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Thu, 17 Oct 2024 16:00:47 +0800 Subject: [PATCH 01/17] WIP adding isolate for WS connection --- .../connection/api_manager/binary_api.dart | 194 ++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 64a7041c70..0128361409 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:developer' as dev; import 'dart:io'; +import 'dart:isolate'; import 'package:flutter/widgets.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; @@ -258,3 +259,196 @@ class BinaryAPI extends BaseAPI { } } } + +/// This class is for handling Binary API connection and calling Binary APIs. +class IsolateWrappingAPI extends BaseAPI { + /// Initializes [BinaryAPI] instance. + IsolateWrappingAPI({ + String? key, + bool enableDebug = false, + this.proxyAwareConnection = false, + }) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug) { + Isolate.spawn(_isolateTask, _isolateIncomingPort.sendPort); + + _isolateIncomingPort.listen((dynamic message) { + if (message is SendPort) { + _isolateSendPort = message; + } + + // Check for other messages coming out from Isolate. + }); + } + + static const Duration _disconnectTimeOut = Duration(seconds: 5); + static const Duration _websocketConnectTimeOut = Duration(seconds: 10); + + final ReceivePort _isolateIncomingPort = ReceivePort(); + SendPort? _isolateSendPort; + int _eventId = 0; + + int get _getEventId => _eventId++; + + /// A flag to indicate if the connection is proxy aware. + final bool proxyAwareConnection; + + /// Represents the active websocket connection. + /// + /// This is used to send and receive data from the websocket server. + IOWebSocketChannel? _webSocketChannel; + + /// Stream subscription to API data. + StreamSubscription?>? _webSocketListener; + + /// Call manager instance. + CallManager? _callManager; + + /// Subscription manager instance. + SubscriptionManager? _subscriptionManager; + + /// Gets API call history. + CallHistory? get callHistory => _callManager?.callHistory; + + /// Gets API subscription history. + CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory; + + @override + Future connect( + ConnectionInformation? connectionInformation, { + ConnectionCallback? onOpen, + ConnectionCallback? onDone, + ConnectionCallback? onError, + bool printResponse = false, + }) async { + _isolateSendPort?.send(_WSConnectConfig( + connectionInformation: connectionInformation, + onOpen: onOpen, + onError: onError, + onClosed: onDone, + printResponse: printResponse, + )); + } + + @override + void addToChannel(Map request) => _isolateSendPort + ?.send(_AddToChannelEvent(request: request, eventId: _getEventId)); + + @override + Future call({ + required Request request, + List nullableKeys = const [], + }) async { + final event = _CallEvent(request: request, eventId: _getEventId); + _isolateSendPort?.send(event); + return event.completer.future; + } + + @override + Stream? subscribe({ + required Request request, + int cacheSize = 0, + RequestCompareFunction? comparePredicate, + }) => + (_subscriptionManager ??= SubscriptionManager(this))( + request: request, + cacheSize: cacheSize, + comparePredicate: comparePredicate, + ); + + @override + Future unsubscribe({required String subscriptionId}) => + (_subscriptionManager ??= SubscriptionManager(this)).unsubscribe( + subscriptionId: subscriptionId, + ); + + @override + Future unsubscribeAll({ + required ForgetStreamType method, + }) => + (_subscriptionManager ??= SubscriptionManager(this)) + .unsubscribeAll(method: method); + + @override + Future disconnect() async { + try { + await _webSocketListener?.cancel(); + + await _webSocketChannel?.sink.close().timeout( + _disconnectTimeOut, + onTimeout: () => throw TimeoutException('Could not close sink.'), + ); + // ignore: avoid_catches_without_on_clauses + } catch (e) { + _logDebugInfo('disconnect error.', error: e); + } finally { + _webSocketListener = null; + _webSocketChannel = null; + } + } + + void _logDebugInfo(String message, {Object? error}) { + if (enableDebug) { + dev.log('$runtimeType $key $message', error: error); + } + } +} + +void _isolateTask(SendPort sendPort) { + final ReceivePort receivePort = ReceivePort(); + + final BinaryAPI binaryAPI = BinaryAPI(); + + sendPort.send(receivePort.sendPort); + + receivePort.listen((dynamic message) async { + if (message is _WSConnectConfig) { + await binaryAPI.connect(message.connectionInformation); + // Connect WS + } + + switch (message) { + case _AddToChannelEvent(): + binaryAPI.addToChannel(message.request); + break; + case _CallEvent(): + final response = await binaryAPI.call(request: message.request); + message.completer.complete(response); + break; + } + }); +} + +class _WSConnectConfig { + _WSConnectConfig({ + this.onOpen, + this.onError, + this.onClosed, + this.connectionInformation, + this.printResponse = false, + }); + + final ConnectionInformation? connectionInformation; + + final ConnectionCallback? onOpen; + final ConnectionCallback? onError; + final ConnectionCallback? onClosed; + final bool printResponse; +} + +sealed class _IsolateEvent { + _IsolateEvent({required this.eventId}); + + final int eventId; +} + +class _AddToChannelEvent extends _IsolateEvent { + _AddToChannelEvent({required this.request, required super.eventId}); + + final Map request; +} + +class _CallEvent extends _IsolateEvent { + _CallEvent({required this.request, required super.eventId}); + + final Request request; + final Completer completer = Completer(); +} From 9f3c8a8dca2a47570984ad16fb92311413d91dca Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Thu, 17 Oct 2024 16:28:22 +0800 Subject: [PATCH 02/17] add SubEvent --- .../connection/api_manager/binary_api.dart | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 0128361409..089dc177d1 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -6,6 +6,7 @@ import 'dart:isolate'; import 'package:flutter/widgets.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; +import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; import 'package:flutter_deriv_api/api/models/enums.dart'; @@ -280,7 +281,6 @@ class IsolateWrappingAPI extends BaseAPI { } static const Duration _disconnectTimeOut = Duration(seconds: 5); - static const Duration _websocketConnectTimeOut = Duration(seconds: 10); final ReceivePort _isolateIncomingPort = ReceivePort(); SendPort? _isolateSendPort; @@ -347,12 +347,15 @@ class IsolateWrappingAPI extends BaseAPI { required Request request, int cacheSize = 0, RequestCompareFunction? comparePredicate, - }) => - (_subscriptionManager ??= SubscriptionManager(this))( - request: request, - cacheSize: cacheSize, - comparePredicate: comparePredicate, - ); + }) { + final BehaviorSubject stream = BehaviorSubject(); + _isolateSendPort?.send(_SubEvent( + request: request, + eventId: _getEventId, + stream: stream, + )); + return stream; + } @override Future unsubscribe({required String subscriptionId}) => @@ -413,6 +416,13 @@ void _isolateTask(SendPort sendPort) { final response = await binaryAPI.call(request: message.request); message.completer.complete(response); break; + + case _SubEvent(): + final stream = binaryAPI.subscribe(request: message.request); + stream?.listen((event) { + message.stream.add(event); + }); + break; } }); } @@ -452,3 +462,15 @@ class _CallEvent extends _IsolateEvent { final Request request; final Completer completer = Completer(); } + +class _SubEvent extends _IsolateEvent { + _SubEvent({ + required this.request, + required super.eventId, + required this.stream, + }); + + final Request request; + + final BehaviorSubject stream; +} From d00e9fa00df7012f1f5a951dacb79f226a437693 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Thu, 17 Oct 2024 17:32:54 +0800 Subject: [PATCH 03/17] some changes --- .../connection/api_manager/binary_api.dart | 122 ++++++++++-------- lib/state/connection/connection_cubit.dart | 4 +- 2 files changed, 67 insertions(+), 59 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 089dc177d1..54996d7dbb 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -5,6 +5,7 @@ import 'dart:io'; import 'dart:isolate'; import 'package:flutter/widgets.dart'; +import 'package:flutter_deriv_api/api/response/forget_response_result.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; @@ -280,8 +281,6 @@ class IsolateWrappingAPI extends BaseAPI { }); } - static const Duration _disconnectTimeOut = Duration(seconds: 5); - final ReceivePort _isolateIncomingPort = ReceivePort(); SendPort? _isolateSendPort; int _eventId = 0; @@ -291,14 +290,6 @@ class IsolateWrappingAPI extends BaseAPI { /// A flag to indicate if the connection is proxy aware. final bool proxyAwareConnection; - /// Represents the active websocket connection. - /// - /// This is used to send and receive data from the websocket server. - IOWebSocketChannel? _webSocketChannel; - - /// Stream subscription to API data. - StreamSubscription?>? _webSocketListener; - /// Call manager instance. CallManager? _callManager; @@ -358,44 +349,30 @@ class IsolateWrappingAPI extends BaseAPI { } @override - Future unsubscribe({required String subscriptionId}) => - (_subscriptionManager ??= SubscriptionManager(this)).unsubscribe( - subscriptionId: subscriptionId, - ); + Future unsubscribe({required String subscriptionId}) { + final event = + _UnSubEvent(subscriptionId: subscriptionId, eventId: _getEventId); + _isolateSendPort?.send(event); + return event.completer.future; + } @override Future unsubscribeAll({ required ForgetStreamType method, - }) => - (_subscriptionManager ??= SubscriptionManager(this)) - .unsubscribeAll(method: method); + }) { + final event = _UnSubAllEvent(streamType: method, eventId: _getEventId); + _isolateSendPort?.send(event); + return event.completer.future; + } @override Future disconnect() async { - try { - await _webSocketListener?.cancel(); - - await _webSocketChannel?.sink.close().timeout( - _disconnectTimeOut, - onTimeout: () => throw TimeoutException('Could not close sink.'), - ); - // ignore: avoid_catches_without_on_clauses - } catch (e) { - _logDebugInfo('disconnect error.', error: e); - } finally { - _webSocketListener = null; - _webSocketChannel = null; - } - } - - void _logDebugInfo(String message, {Object? error}) { - if (enableDebug) { - dev.log('$runtimeType $key $message', error: error); - } + _isolateSendPort?.send(_DisconnectEvent(eventId: _getEventId)); } } void _isolateTask(SendPort sendPort) { + print('##### Isolate spawned'); final ReceivePort receivePort = ReceivePort(); final BinaryAPI binaryAPI = BinaryAPI(); @@ -408,21 +385,37 @@ void _isolateTask(SendPort sendPort) { // Connect WS } - switch (message) { - case _AddToChannelEvent(): - binaryAPI.addToChannel(message.request); - break; - case _CallEvent(): - final response = await binaryAPI.call(request: message.request); - message.completer.complete(response); - break; - - case _SubEvent(): - final stream = binaryAPI.subscribe(request: message.request); - stream?.listen((event) { - message.stream.add(event); - }); - break; + if (message is _IsolateEvent) { + switch (message) { + case _AddToChannelEvent(): + binaryAPI.addToChannel(message.request); + break; + case _CallEvent(): + final response = await binaryAPI.call(request: message.request); + message.completer.complete(response); + break; + + case _SubEvent(): + final stream = binaryAPI.subscribe(request: message.request); + stream?.listen((event) { + message.stream.add(event); + }); + break; + case _UnSubEvent(): + final response = await binaryAPI.unsubscribe( + subscriptionId: message.subscriptionId, + ); + message.completer.complete(response); + break; + case _UnSubAllEvent(): + final response = + await binaryAPI.unsubscribeAll(method: message.streamType); + message.completer.complete(response); + break; + case _DisconnectEvent(): + await binaryAPI.disconnect(); + break; + } } }); } @@ -464,13 +457,28 @@ class _CallEvent extends _IsolateEvent { } class _SubEvent extends _IsolateEvent { - _SubEvent({ - required this.request, - required super.eventId, - required this.stream, - }); + _SubEvent( + {required this.request, required super.eventId, required this.stream}); final Request request; final BehaviorSubject stream; } + +class _UnSubEvent extends _IsolateEvent { + _UnSubEvent({required this.subscriptionId, required super.eventId}); + + final String subscriptionId; + final Completer completer = Completer(); +} + +class _UnSubAllEvent extends _IsolateEvent { + _UnSubAllEvent({required this.streamType, required super.eventId}); + + final ForgetStreamType streamType; + final Completer completer = Completer(); +} + +class _DisconnectEvent extends _IsolateEvent { + _DisconnectEvent({required super.eventId}); +} diff --git a/lib/state/connection/connection_cubit.dart b/lib/state/connection/connection_cubit.dart index 49a8daaae9..19812fc2ea 100644 --- a/lib/state/connection/connection_cubit.dart +++ b/lib/state/connection/connection_cubit.dart @@ -30,7 +30,7 @@ class ConnectionCubit extends Cubit { APIInitializer().initialize( api: api ?? - BinaryAPI( + IsolateWrappingAPI( key: _key, proxyAwareConnection: proxyAwareConnection, enableDebug: enableDebug, @@ -141,7 +141,7 @@ class ConnectionCubit extends Cubit { }, ); - if (_api is BinaryAPI) { + if (_api is IsolateWrappingAPI) { _setupConnectivityListener(); } } From ba2a0db4f47d4dca75f23f834de6865234c05d65 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Fri, 18 Oct 2024 12:03:28 +0800 Subject: [PATCH 04/17] handle connection callbacks in isolate --- .../connection/api_manager/binary_api.dart | 96 +++++++++++++++---- 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 54996d7dbb..0caebeefc5 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -3,9 +3,10 @@ import 'dart:convert'; import 'dart:developer' as dev; import 'dart:io'; import 'dart:isolate'; +import 'dart:ui' as ui; +import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; -import 'package:flutter_deriv_api/api/response/forget_response_result.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; @@ -270,17 +271,44 @@ class IsolateWrappingAPI extends BaseAPI { bool enableDebug = false, this.proxyAwareConnection = false, }) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug) { - Isolate.spawn(_isolateTask, _isolateIncomingPort.sendPort); + _canStartCompleter = Completer(); + + Isolate.spawn( + _isolateTask, + IsolateConfig( + sendPort: _isolateIncomingPort.sendPort, + rootIsolateToken: ServicesBinding?.rootIsolateToken), + ); _isolateIncomingPort.listen((dynamic message) { if (message is SendPort) { _isolateSendPort = message; + print('Isolate send port is ready ${DateTime.now()}'); + if (!_canStartCompleter.isCompleted) { + _canStartCompleter.complete(); + } + } + + if (message is ConnectionEventReply) { + switch (message.callback) { + case ConnectionCallbacks.onOpen: + _onOpen?.call(message.key); + break; + case ConnectionCallbacks.onDone: + _onDone?.call(message.key); + break; + case ConnectionCallbacks.onError: + _onError?.call(message.key); + break; + } } // Check for other messages coming out from Isolate. }); } + late final Completer _canStartCompleter; + final ReceivePort _isolateIncomingPort = ReceivePort(); SendPort? _isolateSendPort; int _eventId = 0; @@ -302,6 +330,10 @@ class IsolateWrappingAPI extends BaseAPI { /// Gets API subscription history. CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory; + ConnectionCallback? _onOpen; + ConnectionCallback? _onDone; + ConnectionCallback? _onError; + @override Future connect( ConnectionInformation? connectionInformation, { @@ -310,13 +342,14 @@ class IsolateWrappingAPI extends BaseAPI { ConnectionCallback? onError, bool printResponse = false, }) async { + await _canStartCompleter.future; + print('Sending Connect event to Ioslate ${DateTime.now()}'); _isolateSendPort?.send(_WSConnectConfig( connectionInformation: connectionInformation, - onOpen: onOpen, - onError: onError, - onClosed: onDone, - printResponse: printResponse, )); + _onOpen = onOpen; + _onDone = onDone; + _onError = onError; } @override @@ -371,18 +404,38 @@ class IsolateWrappingAPI extends BaseAPI { } } -void _isolateTask(SendPort sendPort) { - print('##### Isolate spawned'); +class IsolateConfig { + IsolateConfig({required this.sendPort, required this.rootIsolateToken}); + + final SendPort sendPort; + final ui.RootIsolateToken? rootIsolateToken; +} + +void _isolateTask(IsolateConfig isolateConfig) { + if (isolateConfig.rootIsolateToken != null) { + ui.PlatformDispatcher.instance + .registerBackgroundIsolate(isolateConfig.rootIsolateToken!); + BackgroundIsolateBinaryMessenger.ensureInitialized( + isolateConfig.rootIsolateToken!); + } + final sendPort = isolateConfig.sendPort; + final ReceivePort receivePort = ReceivePort(); final BinaryAPI binaryAPI = BinaryAPI(); sendPort.send(receivePort.sendPort); - - receivePort.listen((dynamic message) async { + receivePort.listen((message) async { if (message is _WSConnectConfig) { - await binaryAPI.connect(message.connectionInformation); - // Connect WS + await binaryAPI.connect( + message.connectionInformation, + onOpen: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onOpen)), + onDone: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onDone)), + onError: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onError)), + ); } if (message is _IsolateEvent) { @@ -422,18 +475,12 @@ void _isolateTask(SendPort sendPort) { class _WSConnectConfig { _WSConnectConfig({ - this.onOpen, - this.onError, - this.onClosed, this.connectionInformation, this.printResponse = false, }); final ConnectionInformation? connectionInformation; - final ConnectionCallback? onOpen; - final ConnectionCallback? onError; - final ConnectionCallback? onClosed; final bool printResponse; } @@ -482,3 +529,16 @@ class _UnSubAllEvent extends _IsolateEvent { class _DisconnectEvent extends _IsolateEvent { _DisconnectEvent({required super.eventId}); } + +enum ConnectionCallbacks { + onOpen, + onDone, + onError, +} + +class ConnectionEventReply { + ConnectionEventReply({required this.key, required this.callback}); + + final String key; + final ConnectionCallbacks callback; +} From 783ef87c2a3be447babfacfd46af5e7f870aa3bc Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Fri, 18 Oct 2024 15:52:57 +0800 Subject: [PATCH 05/17] add IsolateResponse class --- .../connection/api_manager/binary_api.dart | 72 ++++++++++++++----- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 0caebeefc5..f78947fc31 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -276,8 +276,9 @@ class IsolateWrappingAPI extends BaseAPI { Isolate.spawn( _isolateTask, IsolateConfig( - sendPort: _isolateIncomingPort.sendPort, - rootIsolateToken: ServicesBinding?.rootIsolateToken), + sendPort: _isolateIncomingPort.sendPort, + rootIsolateToken: ServicesBinding.rootIsolateToken, + ), ); _isolateIncomingPort.listen((dynamic message) { @@ -292,21 +293,36 @@ class IsolateWrappingAPI extends BaseAPI { if (message is ConnectionEventReply) { switch (message.callback) { case ConnectionCallbacks.onOpen: + print('##### WS OnOpen'); _onOpen?.call(message.key); break; case ConnectionCallbacks.onDone: + print('##### WS OnDone'); _onDone?.call(message.key); break; case ConnectionCallbacks.onError: + print('##### WS OnError'); _onError?.call(message.key); break; } } + if (message is IsolateResponse) { + print('##### On Message $message'); + final Completer? completer = _pendingEvents[message.eventId]; + if (completer != null) { + completer.complete(message.response); + _pendingEvents.remove(message.eventId); + } + } + // Check for other messages coming out from Isolate. }); } + final Map> _pendingEvents = + >{}; + late final Completer _canStartCompleter; final ReceivePort _isolateIncomingPort = ReceivePort(); @@ -352,6 +368,15 @@ class IsolateWrappingAPI extends BaseAPI { _onError = onError; } + Future _callEvent(_IsolateEvent event) { + final Completer responseCompleter = Completer(); + _pendingEvents[event.eventId] = responseCompleter; + + _isolateSendPort?.send(event); + + return responseCompleter.future; + } + @override void addToChannel(Map request) => _isolateSendPort ?.send(_AddToChannelEvent(request: request, eventId: _getEventId)); @@ -362,8 +387,7 @@ class IsolateWrappingAPI extends BaseAPI { List nullableKeys = const [], }) async { final event = _CallEvent(request: request, eventId: _getEventId); - _isolateSendPort?.send(event); - return event.completer.future; + return _callEvent(event); } @override @@ -385,8 +409,7 @@ class IsolateWrappingAPI extends BaseAPI { Future unsubscribe({required String subscriptionId}) { final event = _UnSubEvent(subscriptionId: subscriptionId, eventId: _getEventId); - _isolateSendPort?.send(event); - return event.completer.future; + return _callEvent(event); } @override @@ -394,8 +417,7 @@ class IsolateWrappingAPI extends BaseAPI { required ForgetStreamType method, }) { final event = _UnSubAllEvent(streamType: method, eventId: _getEventId); - _isolateSendPort?.send(event); - return event.completer.future; + return _callEvent(event); } @override @@ -445,7 +467,9 @@ void _isolateTask(IsolateConfig isolateConfig) { break; case _CallEvent(): final response = await binaryAPI.call(request: message.request); - message.completer.complete(response); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); break; case _SubEvent(): @@ -458,15 +482,19 @@ void _isolateTask(IsolateConfig isolateConfig) { final response = await binaryAPI.unsubscribe( subscriptionId: message.subscriptionId, ); - message.completer.complete(response); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); break; case _UnSubAllEvent(): final response = await binaryAPI.unsubscribeAll(method: message.streamType); - message.completer.complete(response); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId)); break; case _DisconnectEvent(): - await binaryAPI.disconnect(); + final response = await binaryAPI.disconnect(); + break; } } @@ -500,12 +528,14 @@ class _CallEvent extends _IsolateEvent { _CallEvent({required this.request, required super.eventId}); final Request request; - final Completer completer = Completer(); } class _SubEvent extends _IsolateEvent { - _SubEvent( - {required this.request, required super.eventId, required this.stream}); + _SubEvent({ + required this.request, + required super.eventId, + required this.stream, + }); final Request request; @@ -516,14 +546,12 @@ class _UnSubEvent extends _IsolateEvent { _UnSubEvent({required this.subscriptionId, required super.eventId}); final String subscriptionId; - final Completer completer = Completer(); } class _UnSubAllEvent extends _IsolateEvent { _UnSubAllEvent({required this.streamType, required super.eventId}); final ForgetStreamType streamType; - final Completer completer = Completer(); } class _DisconnectEvent extends _IsolateEvent { @@ -542,3 +570,13 @@ class ConnectionEventReply { final String key; final ConnectionCallbacks callback; } + +class IsolateResponse { + IsolateResponse({required this.response, required this.eventId}); + + final T response; + final int eventId; + + @override + String toString() => 'ResponseEvent: [$eventId]: $response'; +} From b8762a13745585b63b87b76521b579edd27b39ae Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Fri, 18 Oct 2024 16:14:01 +0800 Subject: [PATCH 06/17] Remove stream from Sub event - Because it's not a sendable item --- .../connection/api_manager/binary_api.dart | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index f78947fc31..9e57cc2a78 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -278,6 +278,7 @@ class IsolateWrappingAPI extends BaseAPI { IsolateConfig( sendPort: _isolateIncomingPort.sendPort, rootIsolateToken: ServicesBinding.rootIsolateToken, + apiInstanceKey: super.key, ), ); @@ -400,7 +401,6 @@ class IsolateWrappingAPI extends BaseAPI { _isolateSendPort?.send(_SubEvent( request: request, eventId: _getEventId, - stream: stream, )); return stream; } @@ -427,10 +427,16 @@ class IsolateWrappingAPI extends BaseAPI { } class IsolateConfig { - IsolateConfig({required this.sendPort, required this.rootIsolateToken}); + IsolateConfig({ + required this.sendPort, + required this.rootIsolateToken, + required this.apiInstanceKey, + }); final SendPort sendPort; final ui.RootIsolateToken? rootIsolateToken; + + final String apiInstanceKey; } void _isolateTask(IsolateConfig isolateConfig) { @@ -444,7 +450,7 @@ void _isolateTask(IsolateConfig isolateConfig) { final ReceivePort receivePort = ReceivePort(); - final BinaryAPI binaryAPI = BinaryAPI(); + final BinaryAPI binaryAPI = BinaryAPI(key: isolateConfig.apiInstanceKey); sendPort.send(receivePort.sendPort); receivePort.listen((message) async { @@ -474,9 +480,9 @@ void _isolateTask(IsolateConfig isolateConfig) { case _SubEvent(): final stream = binaryAPI.subscribe(request: message.request); - stream?.listen((event) { - message.stream.add(event); - }); + // stream?.listen((event) { + // message.stream.add(event); + // }); break; case _UnSubEvent(): final response = await binaryAPI.unsubscribe( @@ -534,12 +540,9 @@ class _SubEvent extends _IsolateEvent { _SubEvent({ required this.request, required super.eventId, - required this.stream, }); final Request request; - - final BehaviorSubject stream; } class _UnSubEvent extends _IsolateEvent { From b2f76bc24b017c7bf465becda52426d096216cf5 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Fri, 18 Oct 2024 16:34:00 +0800 Subject: [PATCH 07/17] handle subscription calls --- .../connection/api_manager/binary_api.dart | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 9e57cc2a78..ae30ab1c92 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -285,7 +285,6 @@ class IsolateWrappingAPI extends BaseAPI { _isolateIncomingPort.listen((dynamic message) { if (message is SendPort) { _isolateSendPort = message; - print('Isolate send port is ready ${DateTime.now()}'); if (!_canStartCompleter.isCompleted) { _canStartCompleter.complete(); } @@ -294,26 +293,26 @@ class IsolateWrappingAPI extends BaseAPI { if (message is ConnectionEventReply) { switch (message.callback) { case ConnectionCallbacks.onOpen: - print('##### WS OnOpen'); _onOpen?.call(message.key); break; case ConnectionCallbacks.onDone: - print('##### WS OnDone'); _onDone?.call(message.key); break; case ConnectionCallbacks.onError: - print('##### WS OnError'); _onError?.call(message.key); break; } } if (message is IsolateResponse) { - print('##### On Message $message'); - final Completer? completer = _pendingEvents[message.eventId]; - if (completer != null) { - completer.complete(message.response); - _pendingEvents.remove(message.eventId); + if (message.isSubscription) { + _pendingSubscriptions[message.eventId]?.add(message.response); + } else { + final Completer? completer = _pendingEvents[message.eventId]; + if (completer != null) { + completer.complete(message.response); + _pendingEvents.remove(message.eventId); + } } } @@ -324,6 +323,9 @@ class IsolateWrappingAPI extends BaseAPI { final Map> _pendingEvents = >{}; + final Map> _pendingSubscriptions = + >{}; + late final Completer _canStartCompleter; final ReceivePort _isolateIncomingPort = ReceivePort(); @@ -397,12 +399,16 @@ class IsolateWrappingAPI extends BaseAPI { int cacheSize = 0, RequestCompareFunction? comparePredicate, }) { - final BehaviorSubject stream = BehaviorSubject(); - _isolateSendPort?.send(_SubEvent( + final StreamController responseStream = + StreamController.broadcast(); + final subEvent = _SubEvent( request: request, eventId: _getEventId, - )); - return stream; + ); + _pendingSubscriptions[subEvent.eventId] = responseStream; + + _isolateSendPort?.send(subEvent); + return responseStream.stream; } @override @@ -480,9 +486,15 @@ void _isolateTask(IsolateConfig isolateConfig) { case _SubEvent(): final stream = binaryAPI.subscribe(request: message.request); - // stream?.listen((event) { - // message.stream.add(event); - // }); + stream?.listen((event) { + sendPort.send( + IsolateResponse( + response: event, + eventId: message.eventId, + isSubscription: true, + ), + ); + }); break; case _UnSubEvent(): final response = await binaryAPI.unsubscribe( @@ -519,9 +531,10 @@ class _WSConnectConfig { } sealed class _IsolateEvent { - _IsolateEvent({required this.eventId}); + _IsolateEvent({required this.eventId, this.isSubscription = false}); final int eventId; + final bool isSubscription; } class _AddToChannelEvent extends _IsolateEvent { @@ -540,7 +553,7 @@ class _SubEvent extends _IsolateEvent { _SubEvent({ required this.request, required super.eventId, - }); + }) : super(isSubscription: true); final Request request; } @@ -575,10 +588,15 @@ class ConnectionEventReply { } class IsolateResponse { - IsolateResponse({required this.response, required this.eventId}); + IsolateResponse({ + required this.response, + required this.eventId, + this.isSubscription = false, + }); final T response; final int eventId; + final bool isSubscription; @override String toString() => 'ResponseEvent: [$eventId]: $response'; From 5eaa606d94b89436ea16cb5dd3407beb10deaf72 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Mon, 21 Oct 2024 10:41:35 +0800 Subject: [PATCH 08/17] organize isolate files --- .../connection/api_manager/binary_api.dart | 172 +----------------- .../api_manager/isolate_events.dart | 84 +++++++++ .../connection/api_manager/isolate_task.dart | 88 +++++++++ 3 files changed, 175 insertions(+), 169 deletions(-) create mode 100644 lib/services/connection/api_manager/isolate_events.dart create mode 100644 lib/services/connection/api_manager/isolate_task.dart diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index ae30ab1c92..c70210fd34 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -25,6 +25,9 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_manager. import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart'; +part 'isolate_events.dart'; +part 'isolate_task.dart'; + /// This class is for handling Binary API connection and calling Binary APIs. class BinaryAPI extends BaseAPI { /// Initializes [BinaryAPI] instance. @@ -432,172 +435,3 @@ class IsolateWrappingAPI extends BaseAPI { } } -class IsolateConfig { - IsolateConfig({ - required this.sendPort, - required this.rootIsolateToken, - required this.apiInstanceKey, - }); - - final SendPort sendPort; - final ui.RootIsolateToken? rootIsolateToken; - - final String apiInstanceKey; -} - -void _isolateTask(IsolateConfig isolateConfig) { - if (isolateConfig.rootIsolateToken != null) { - ui.PlatformDispatcher.instance - .registerBackgroundIsolate(isolateConfig.rootIsolateToken!); - BackgroundIsolateBinaryMessenger.ensureInitialized( - isolateConfig.rootIsolateToken!); - } - final sendPort = isolateConfig.sendPort; - - final ReceivePort receivePort = ReceivePort(); - - final BinaryAPI binaryAPI = BinaryAPI(key: isolateConfig.apiInstanceKey); - - sendPort.send(receivePort.sendPort); - receivePort.listen((message) async { - if (message is _WSConnectConfig) { - await binaryAPI.connect( - message.connectionInformation, - onOpen: (key) => sendPort.send(ConnectionEventReply( - key: key, callback: ConnectionCallbacks.onOpen)), - onDone: (key) => sendPort.send(ConnectionEventReply( - key: key, callback: ConnectionCallbacks.onDone)), - onError: (key) => sendPort.send(ConnectionEventReply( - key: key, callback: ConnectionCallbacks.onError)), - ); - } - - if (message is _IsolateEvent) { - switch (message) { - case _AddToChannelEvent(): - binaryAPI.addToChannel(message.request); - break; - case _CallEvent(): - final response = await binaryAPI.call(request: message.request); - sendPort.send( - IsolateResponse(response: response, eventId: message.eventId), - ); - break; - - case _SubEvent(): - final stream = binaryAPI.subscribe(request: message.request); - stream?.listen((event) { - sendPort.send( - IsolateResponse( - response: event, - eventId: message.eventId, - isSubscription: true, - ), - ); - }); - break; - case _UnSubEvent(): - final response = await binaryAPI.unsubscribe( - subscriptionId: message.subscriptionId, - ); - sendPort.send( - IsolateResponse(response: response, eventId: message.eventId), - ); - break; - case _UnSubAllEvent(): - final response = - await binaryAPI.unsubscribeAll(method: message.streamType); - sendPort.send( - IsolateResponse(response: response, eventId: message.eventId)); - break; - case _DisconnectEvent(): - final response = await binaryAPI.disconnect(); - - break; - } - } - }); -} - -class _WSConnectConfig { - _WSConnectConfig({ - this.connectionInformation, - this.printResponse = false, - }); - - final ConnectionInformation? connectionInformation; - - final bool printResponse; -} - -sealed class _IsolateEvent { - _IsolateEvent({required this.eventId, this.isSubscription = false}); - - final int eventId; - final bool isSubscription; -} - -class _AddToChannelEvent extends _IsolateEvent { - _AddToChannelEvent({required this.request, required super.eventId}); - - final Map request; -} - -class _CallEvent extends _IsolateEvent { - _CallEvent({required this.request, required super.eventId}); - - final Request request; -} - -class _SubEvent extends _IsolateEvent { - _SubEvent({ - required this.request, - required super.eventId, - }) : super(isSubscription: true); - - final Request request; -} - -class _UnSubEvent extends _IsolateEvent { - _UnSubEvent({required this.subscriptionId, required super.eventId}); - - final String subscriptionId; -} - -class _UnSubAllEvent extends _IsolateEvent { - _UnSubAllEvent({required this.streamType, required super.eventId}); - - final ForgetStreamType streamType; -} - -class _DisconnectEvent extends _IsolateEvent { - _DisconnectEvent({required super.eventId}); -} - -enum ConnectionCallbacks { - onOpen, - onDone, - onError, -} - -class ConnectionEventReply { - ConnectionEventReply({required this.key, required this.callback}); - - final String key; - final ConnectionCallbacks callback; -} - -class IsolateResponse { - IsolateResponse({ - required this.response, - required this.eventId, - this.isSubscription = false, - }); - - final T response; - final int eventId; - final bool isSubscription; - - @override - String toString() => 'ResponseEvent: [$eventId]: $response'; -} diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart new file mode 100644 index 0000000000..449687e676 --- /dev/null +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -0,0 +1,84 @@ +part of 'binary_api.dart'; + +class _WSConnectConfig { + _WSConnectConfig({ + this.connectionInformation, + this.printResponse = false, + }); + + final ConnectionInformation? connectionInformation; + + final bool printResponse; +} + +sealed class _IsolateEvent { + _IsolateEvent({required this.eventId, this.isSubscription = false}); + + final int eventId; + final bool isSubscription; +} + +class _AddToChannelEvent extends _IsolateEvent { + _AddToChannelEvent({required this.request, required super.eventId}); + + final Map request; +} + +class _CallEvent extends _IsolateEvent { + _CallEvent({required this.request, required super.eventId}); + + final Request request; +} + +class _SubEvent extends _IsolateEvent { + _SubEvent({ + required this.request, + required super.eventId, + }) : super(isSubscription: true); + + final Request request; +} + +class _UnSubEvent extends _IsolateEvent { + _UnSubEvent({required this.subscriptionId, required super.eventId}); + + final String subscriptionId; +} + +class _UnSubAllEvent extends _IsolateEvent { + _UnSubAllEvent({required this.streamType, required super.eventId}); + + final ForgetStreamType streamType; +} + +class _DisconnectEvent extends _IsolateEvent { + _DisconnectEvent({required super.eventId}); +} + +enum ConnectionCallbacks { + onOpen, + onDone, + onError, +} + +class ConnectionEventReply { + ConnectionEventReply({required this.key, required this.callback}); + + final String key; + final ConnectionCallbacks callback; +} + +class IsolateResponse { + IsolateResponse({ + required this.response, + required this.eventId, + this.isSubscription = false, + }); + + final T response; + final int eventId; + final bool isSubscription; + + @override + String toString() => 'ResponseEvent: [$eventId]: $response'; +} diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart new file mode 100644 index 0000000000..b653c9797c --- /dev/null +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -0,0 +1,88 @@ +part of 'binary_api.dart'; + +class IsolateConfig { + IsolateConfig({ + required this.sendPort, + required this.rootIsolateToken, + required this.apiInstanceKey, + }); + + final SendPort sendPort; + final ui.RootIsolateToken? rootIsolateToken; + + final String apiInstanceKey; +} + +void _isolateTask(IsolateConfig isolateConfig) { + if (isolateConfig.rootIsolateToken != null) { + ui.PlatformDispatcher.instance + .registerBackgroundIsolate(isolateConfig.rootIsolateToken!); + BackgroundIsolateBinaryMessenger.ensureInitialized( + isolateConfig.rootIsolateToken!); + } + final sendPort = isolateConfig.sendPort; + + final ReceivePort receivePort = ReceivePort(); + + final BinaryAPI binaryAPI = BinaryAPI(key: isolateConfig.apiInstanceKey); + + sendPort.send(receivePort.sendPort); + receivePort.listen((message) async { + if (message is _WSConnectConfig) { + await binaryAPI.connect( + message.connectionInformation, + onOpen: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onOpen)), + onDone: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onDone)), + onError: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onError)), + ); + } + + if (message is _IsolateEvent) { + switch (message) { + case _AddToChannelEvent(): + binaryAPI.addToChannel(message.request); + break; + case _CallEvent(): + final response = await binaryAPI.call(request: message.request); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); + break; + + case _SubEvent(): + final stream = binaryAPI.subscribe(request: message.request); + stream?.listen((event) { + sendPort.send( + IsolateResponse( + response: event, + eventId: message.eventId, + isSubscription: true, + ), + ); + }); + break; + case _UnSubEvent(): + final response = await binaryAPI.unsubscribe( + subscriptionId: message.subscriptionId, + ); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); + break; + case _UnSubAllEvent(): + final response = + await binaryAPI.unsubscribeAll(method: message.streamType); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId)); + break; + case _DisconnectEvent(): + final response = await binaryAPI.disconnect(); + + break; + } + } + }); +} From bd8baaf38ed9c9c2efdc9ed178be06c0d467fb3e Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Mon, 21 Oct 2024 10:51:50 +0800 Subject: [PATCH 09/17] WIP: add CustomIsolateEvent class --- .../api_manager/isolate_events.dart | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index 449687e676..75639c507f 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -82,3 +82,49 @@ class IsolateResponse { @override String toString() => 'ResponseEvent: [$eventId]: $response'; } + +/// To communicate event and get the response for custom events that in addition +/// to get the BE JSON the deserialization of the JSON response to dart models +/// also happens inside the isolate. +abstract class CustomIsolateEvent extends _IsolateEvent { + CustomIsolateEvent({ + required super.eventId, + required this.event, + required this.request, + this.data, + }); + + final CustomEvent event; + + final Request request; + + final T? data; +} + +enum CustomEvent { + ping, + activeSymbols, + assetIndex, + balance, + buy, + accountList, + accountClosure, + cancel, + cashierPayment, + changeEmail, + changePassword, + confirmEmail, + contractUpdateHistory, + contractUpdate, + contractsFor, + getAccountStatus, + getAccountTypes, + getAvailableAccounts, + getFinancialAssessment, + getLimits, + getSelfExclusion, + getSettings, + identityVerification, + jTokenCreate, + kycAuthStatus, +} From 0820f4f954d78c53b26378066012c09f2c77ee58 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Mon, 21 Oct 2024 13:46:59 +0800 Subject: [PATCH 10/17] send active_symbols custom event and retrieve the result --- .../active_symbols_response_result.dart | 19 ++----- .../connection/api_manager/binary_api.dart | 57 ++++++++++++++++++- .../api_manager/isolate_events.dart | 14 ++++- .../connection/api_manager/isolate_task.dart | 49 ++++++++++++++++ 4 files changed, 123 insertions(+), 16 deletions(-) diff --git a/lib/api/response/active_symbols_response_result.dart b/lib/api/response/active_symbols_response_result.dart index c21e198e5e..c37270393c 100644 --- a/lib/api/response/active_symbols_response_result.dart +++ b/lib/api/response/active_symbols_response_result.dart @@ -8,6 +8,7 @@ import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; /// Active symbols response model class. abstract class ActiveSymbolsResponseModel { @@ -56,7 +57,8 @@ class ActiveSymbolsResponse extends ActiveSymbolsResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the list of active symbols. /// @@ -64,19 +66,8 @@ class ActiveSymbolsResponse extends ActiveSymbolsResponseModel { /// Throws an [BaseAPIException] if API response contains an error static Future fetchActiveSymbols( ActiveSymbolsRequest request, - ) async { - final ActiveSymbolsReceive response = await _api.call( - request: request, - ); - - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return ActiveSymbolsResponse.fromJson(response.activeSymbols); - } + ) async => + _api.fetchActiveSymbols(request); /// Creates a copy of instance with given parameters. ActiveSymbolsResponse copyWith({ diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index c70210fd34..aa874f8f21 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -7,6 +7,11 @@ import 'dart:ui' as ui; import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; +import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart'; +import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; +import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; +import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; +import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; @@ -26,6 +31,7 @@ import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/ca import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart'; part 'isolate_events.dart'; + part 'isolate_task.dart'; /// This class is for handling Binary API connection and calling Binary APIs. @@ -319,6 +325,40 @@ class IsolateWrappingAPI extends BaseAPI { } } + if (message is CustomIsolateEvent) { + switch (message.event) { + case CustomEvent.ping: + case CustomEvent.activeSymbols: + final ActiveSymbolsResponse activeSymbolsResponse = + message.data as ActiveSymbolsResponse; + _pendingEvents[message.eventId]?.complete(activeSymbolsResponse); + + case CustomEvent.assetIndex: + case CustomEvent.balance: + case CustomEvent.buy: + case CustomEvent.accountList: + case CustomEvent.accountClosure: + case CustomEvent.cancel: + case CustomEvent.cashierPayment: + case CustomEvent.changeEmail: + case CustomEvent.changePassword: + case CustomEvent.confirmEmail: + case CustomEvent.contractUpdateHistory: + case CustomEvent.contractUpdate: + case CustomEvent.contractsFor: + case CustomEvent.getAccountStatus: + case CustomEvent.getAccountTypes: + case CustomEvent.getAvailableAccounts: + case CustomEvent.getFinancialAssessment: + case CustomEvent.getLimits: + case CustomEvent.getSelfExclusion: + case CustomEvent.getSettings: + case CustomEvent.identityVerification: + case CustomEvent.jTokenCreate: + case CustomEvent.kycAuthStatus: + } + } + // Check for other messages coming out from Isolate. }); } @@ -396,6 +436,22 @@ class IsolateWrappingAPI extends BaseAPI { return _callEvent(event); } + /// Gets the list of active symbols. + /// + /// For parameters information refer to [ActiveSymbolsRequest]. + /// Throws an [BaseAPIException] if API response contains an error + Future fetchActiveSymbols( + ActiveSymbolsRequest request, + ) async { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.activeSymbols, + ); + + return _callEvent(event); + } + @override Stream? subscribe({ required Request request, @@ -434,4 +490,3 @@ class IsolateWrappingAPI extends BaseAPI { _isolateSendPort?.send(_DisconnectEvent(eventId: _getEventId)); } } - diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index 75639c507f..fa95999f6d 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -86,7 +86,7 @@ class IsolateResponse { /// To communicate event and get the response for custom events that in addition /// to get the BE JSON the deserialization of the JSON response to dart models /// also happens inside the isolate. -abstract class CustomIsolateEvent extends _IsolateEvent { +class CustomIsolateEvent extends _IsolateEvent { CustomIsolateEvent({ required super.eventId, required this.event, @@ -99,6 +99,18 @@ abstract class CustomIsolateEvent extends _IsolateEvent { final Request request; final T? data; + + CustomIsolateEvent copyWith({ + CustomEvent? event, + Request? request, + T? data, + }) => + CustomIsolateEvent( + eventId: eventId, + event: event ?? this.event, + request: request ?? this.request, + data: data ?? this.data, + ); } enum CustomEvent { diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index b653c9797c..8ee20410bf 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -82,7 +82,56 @@ void _isolateTask(IsolateConfig isolateConfig) { final response = await binaryAPI.disconnect(); break; + case CustomIsolateEvent(): + _handleCustomEvent(message, binaryAPI, sendPort); } } }); } + +void _handleCustomEvent( + CustomIsolateEvent message, + BinaryAPI api, + SendPort sendPort, +) async { + switch (message.event) { + case CustomEvent.ping: + case CustomEvent.activeSymbols: + final ActiveSymbolsReceive response = await api.call( + request: message.request, + ); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + final asResponse = ActiveSymbolsResponse.fromJson(response.activeSymbols); + sendPort.send(message.copyWith(data: asResponse)); + + case CustomEvent.assetIndex: + case CustomEvent.balance: + case CustomEvent.buy: + case CustomEvent.accountList: + case CustomEvent.accountClosure: + case CustomEvent.cancel: + case CustomEvent.cashierPayment: + case CustomEvent.changeEmail: + case CustomEvent.changePassword: + case CustomEvent.confirmEmail: + case CustomEvent.contractUpdateHistory: + case CustomEvent.contractUpdate: + case CustomEvent.contractsFor: + case CustomEvent.getAccountStatus: + case CustomEvent.getAccountTypes: + case CustomEvent.getAvailableAccounts: + case CustomEvent.getFinancialAssessment: + case CustomEvent.getLimits: + case CustomEvent.getSelfExclusion: + case CustomEvent.getSettings: + case CustomEvent.identityVerification: + case CustomEvent.jTokenCreate: + case CustomEvent.kycAuthStatus: + } +} From c63da9b8eb4d3eb49dcb56fef8782e6be4a11502 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Mon, 21 Oct 2024 14:24:37 +0800 Subject: [PATCH 11/17] support tick stream subscription in Isolate API --- lib/api/manually/tick.dart | 1 + lib/api/response/ticks_response_result.dart | 25 ++------- .../connection/api_manager/binary_api.dart | 21 ++++++++ .../api_manager/isolate_events.dart | 7 +++ .../connection/api_manager/isolate_task.dart | 54 ++++++++++++++----- 5 files changed, 76 insertions(+), 32 deletions(-) diff --git a/lib/api/manually/tick.dart b/lib/api/manually/tick.dart index f6aa7d3c29..0cd96c8caa 100644 --- a/lib/api/manually/tick.dart +++ b/lib/api/manually/tick.dart @@ -6,6 +6,7 @@ import 'package:flutter_deriv_api/basic_api/generated/api.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; diff --git a/lib/api/response/ticks_response_result.dart b/lib/api/response/ticks_response_result.dart index cfd5319702..9b845a32ad 100644 --- a/lib/api/response/ticks_response_result.dart +++ b/lib/api/response/ticks_response_result.dart @@ -12,6 +12,7 @@ import 'package:flutter_deriv_api/basic_api/generated/ticks_send.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -64,33 +65,17 @@ class TicksResponse extends TicksResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Subscribes to a tick for given [TickRequest] /// /// Throws [BaseAPIException] if API response contains an error - static Stream subscribeTick( + static Stream subscribeTick( TicksRequest tickRequest, { RequestCompareFunction? comparePredicate, }) => - _api - .subscribe(request: tickRequest, comparePredicate: comparePredicate)! - .map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is TicksReceive - ? TicksResponse.fromJson( - response.tick, - response.subscription, - ) - : null; - }, - ); + _api.subscribeTick(tickRequest); /// Unsubscribes all ticks. /// diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index aa874f8f21..5368e7dd4f 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -8,10 +8,13 @@ import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart'; +import 'package:flutter_deriv_api/api/manually/tick.dart'; import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; +import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; +import 'package:flutter_deriv_api/basic_api/generated/api.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; @@ -356,6 +359,9 @@ class IsolateWrappingAPI extends BaseAPI { case CustomEvent.identityVerification: case CustomEvent.jTokenCreate: case CustomEvent.kycAuthStatus: + case CustomEvent.ticks: + _pendingSubscriptions[message.eventId]?.add(message.data); + case CustomEvent.proposalOpenContract: } } @@ -452,6 +458,21 @@ class IsolateWrappingAPI extends BaseAPI { return _callEvent(event); } + Stream subscribeTick(TicksRequest request) { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.ticks, + ); + + final StreamController responseStream = + StreamController.broadcast(); + _pendingSubscriptions[event.eventId] = responseStream; + + _isolateSendPort?.send(event); + return responseStream.stream; + } + @override Stream? subscribe({ required Request request, diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index fa95999f6d..303aa8bfa7 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -92,24 +92,29 @@ class CustomIsolateEvent extends _IsolateEvent { required this.event, required this.request, this.data, + this.isSubscription = false, }); final CustomEvent event; final Request request; + final bool isSubscription; + final T? data; CustomIsolateEvent copyWith({ CustomEvent? event, Request? request, T? data, + bool? isSubscription, }) => CustomIsolateEvent( eventId: eventId, event: event ?? this.event, request: request ?? this.request, data: data ?? this.data, + isSubscription: isSubscription ?? this.isSubscription, ); } @@ -139,4 +144,6 @@ enum CustomEvent { identityVerification, jTokenCreate, kycAuthStatus, + ticks, + proposalOpenContract, } diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index 8ee20410bf..b7b59223e9 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -97,18 +97,8 @@ void _handleCustomEvent( switch (message.event) { case CustomEvent.ping: case CustomEvent.activeSymbols: - final ActiveSymbolsReceive response = await api.call( - request: message.request, - ); - - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - final asResponse = ActiveSymbolsResponse.fromJson(response.activeSymbols); - sendPort.send(message.copyWith(data: asResponse)); + await _fetchActiveSymbols(api, message, sendPort); + break; case CustomEvent.assetIndex: case CustomEvent.balance: @@ -133,5 +123,45 @@ void _handleCustomEvent( case CustomEvent.identityVerification: case CustomEvent.jTokenCreate: case CustomEvent.kycAuthStatus: + case CustomEvent.ticks: + api.subscribe(request: message.request)!.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is TicksReceive + ? TicksResponse.fromJson( + response.tick, + response.subscription, + ) + : null; + }, + ).listen((TicksResponse? tick) { + sendPort.send(message.copyWith(data: tick, isSubscription: true)); + }); + + case CustomEvent.proposalOpenContract: } } + +Future _fetchActiveSymbols( + BinaryAPI api, + CustomIsolateEvent message, + SendPort sendPort, +) async { + final ActiveSymbolsReceive response = await api.call( + request: message.request, + ); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + final asResponse = ActiveSymbolsResponse.fromJson(response.activeSymbols); + sendPort.send(message.copyWith(data: asResponse)); +} From e0e2efb04d925a3c6a17dccf3f5f09273ec25e22 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Mon, 21 Oct 2024 15:22:22 +0800 Subject: [PATCH 12/17] support authorize call in Isolate --- .../response/authorize_response_result.dart | 19 +++++------------- .../connection/api_manager/binary_api.dart | 20 +++++++++++++++++++ .../api_manager/isolate_events.dart | 4 ++++ .../connection/api_manager/isolate_task.dart | 15 ++++++++++++++ 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/lib/api/response/authorize_response_result.dart b/lib/api/response/authorize_response_result.dart index a8e4e6e26f..c20f3652c6 100644 --- a/lib/api/response/authorize_response_result.dart +++ b/lib/api/response/authorize_response_result.dart @@ -9,6 +9,7 @@ import 'package:flutter_deriv_api/basic_api/generated/authorize_send.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; /// Authorize response model class. abstract class AuthorizeResponseModel { @@ -48,7 +49,8 @@ class AuthorizeResponse extends AuthorizeResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Authorizes current WebSocket session to act on behalf of the owner of a given token. /// @@ -56,19 +58,8 @@ class AuthorizeResponse extends AuthorizeResponseModel { /// Throws an [BaseAPIException] if API response contains an error. static Future authorizeMethodRaw( AuthorizeRequest request, - ) async { - final AuthorizeReceive response = await _api.call( - request: request, - ); - - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response; - } + ) async => + _api.authorize(request); /// Authorizes current WebSocket session to act on behalf of the owner of a given token. /// diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 5368e7dd4f..261cd9f547 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -11,6 +11,7 @@ import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart'; import 'package:flutter_deriv_api/api/manually/tick.dart'; import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; +import 'package:flutter_deriv_api/api/response/authorize_response_result.dart'; import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; @@ -320,6 +321,8 @@ class IsolateWrappingAPI extends BaseAPI { if (message.isSubscription) { _pendingSubscriptions[message.eventId]?.add(message.response); } else { + print( + '#### Retrieved response ${(message.response as Response).msgType}'); final Completer? completer = _pendingEvents[message.eventId]; if (completer != null) { completer.complete(message.response); @@ -362,6 +365,13 @@ class IsolateWrappingAPI extends BaseAPI { case CustomEvent.ticks: _pendingSubscriptions[message.eventId]?.add(message.data); case CustomEvent.proposalOpenContract: + case CustomEvent.authorize: + final AuthorizeReceive authorizeReceive = + message.data as AuthorizeReceive; + _pendingEvents[message.eventId]?.complete(authorizeReceive); + case CustomEvent.landingCompany: + case CustomEvent.statesList: + case CustomEvent.residenceList: } } @@ -510,4 +520,14 @@ class IsolateWrappingAPI extends BaseAPI { Future disconnect() async { _isolateSendPort?.send(_DisconnectEvent(eventId: _getEventId)); } + + Future authorize(AuthorizeRequest request) { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.authorize, + ); + + return _callEvent(event); + } } diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index 303aa8bfa7..01ba614e50 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -120,7 +120,11 @@ class CustomIsolateEvent extends _IsolateEvent { enum CustomEvent { ping, + authorize, + landingCompany, activeSymbols, + statesList, + residenceList, assetIndex, balance, buy, diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index b7b59223e9..b37b8bd45c 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -144,6 +144,21 @@ void _handleCustomEvent( }); case CustomEvent.proposalOpenContract: + case CustomEvent.authorize: + final AuthorizeReceive response = await api.call( + request: message.request, + ); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + sendPort.send(message.copyWith(data: response)); + case CustomEvent.landingCompany: + case CustomEvent.statesList: + case CustomEvent.residenceList: } } From 963728a7ef3ccb46004462d12bc9f55097ac3def Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Wed, 23 Oct 2024 13:48:23 +0800 Subject: [PATCH 13/17] WIP adding landing company --- .../connection/api_manager/binary_api.dart | 1 + .../connection/api_manager/isolate_task.dart | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 261cd9f547..9320a2d54c 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -12,6 +12,7 @@ import 'package:flutter_deriv_api/api/manually/tick.dart'; import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; import 'package:flutter_deriv_api/api/response/authorize_response_result.dart'; +import 'package:flutter_deriv_api/api/response/landing_company_response_result.dart'; import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index b37b8bd45c..27624ec7d9 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -157,11 +157,27 @@ void _handleCustomEvent( sendPort.send(message.copyWith(data: response)); case CustomEvent.landingCompany: + await _fetchLandingCompnay(api, message, sendPort); case CustomEvent.statesList: case CustomEvent.residenceList: } } +Future _fetchLandingCompnay(BinaryAPI api, + CustomIsolateEvent message, SendPort sendPort) async { + final LandingCompanyReceive response = + await api.call(request: message.request); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + sendPort.send(message.copyWith( + data: LandingCompanyResponse.fromJson(response.landingCompany))); +} + Future _fetchActiveSymbols( BinaryAPI api, CustomIsolateEvent message, From 03e7bc97ba4f0267639865d341dccfc8bd920e20 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Thu, 24 Oct 2024 14:32:16 +0800 Subject: [PATCH 14/17] add tick history subscription --- .../ticks_history_response_result.dart | 45 ++-------------- .../connection/api_manager/binary_api.dart | 51 ++++++++++++++++++- .../api_manager/isolate_events.dart | 37 ++++++++++++++ .../connection/api_manager/isolate_task.dart | 51 +++++++++++++++++++ 4 files changed, 142 insertions(+), 42 deletions(-) diff --git a/lib/api/response/ticks_history_response_result.dart b/lib/api/response/ticks_history_response_result.dart index 9049bd5c1b..b73b5d5c07 100644 --- a/lib/api/response/ticks_history_response_result.dart +++ b/lib/api/response/ticks_history_response_result.dart @@ -15,6 +15,7 @@ import 'package:flutter_deriv_api/basic_api/generated/ticks_receive.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -95,7 +96,8 @@ class TicksHistoryResponse extends TicksHistoryResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the [TickHistory] for the given [symbol] in [request] /// @@ -124,46 +126,7 @@ class TicksHistoryResponse extends TicksHistoryResponseModel { bool subscribe = true, }) async { if (subscribe) { - final Stream? responseStream = - _api.subscribe(request: request, comparePredicate: comparePredicate); - final Response? firstResponse = await responseStream?.first; - - checkException( - response: firstResponse, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - if (firstResponse is TicksHistoryReceive) { - return TickHistorySubscription( - tickHistory: TicksHistoryResponse.fromJson( - firstResponse.candles, - firstResponse.history, - firstResponse.pipSize, - firstResponse.subscription), - tickStream: responseStream?.map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is TicksReceive - ? Tick.fromJson( - response.tick!, - subscriptionJson: response.subscription, - ) - : response is OHLCResponse - ? OHLC.fromJson( - response.ohlc!, - subscriptionJson: response.subscription, - ) - : null; - }, - ), - ); - } - return null; + return _api.subscribeTickHistory(request); } else { return TickHistorySubscription( tickHistory: await fetchTickHistory(request), diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 9320a2d54c..b0eeba29ce 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -8,11 +8,16 @@ import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart'; -import 'package:flutter_deriv_api/api/manually/tick.dart'; +import 'package:flutter_deriv_api/api/manually/ohlc_response.dart'; +import 'package:flutter_deriv_api/api/manually/ohlc_response_result.dart'; +import 'package:flutter_deriv_api/api/manually/tick.dart' as man_tick; +import 'package:flutter_deriv_api/api/manually/tick_base.dart'; +import 'package:flutter_deriv_api/api/manually/tick_history_subscription.dart'; import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; import 'package:flutter_deriv_api/api/response/authorize_response_result.dart'; import 'package:flutter_deriv_api/api/response/landing_company_response_result.dart'; +import 'package:flutter_deriv_api/api/response/ticks_history_response_result.dart'; import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; @@ -339,6 +344,7 @@ class IsolateWrappingAPI extends BaseAPI { final ActiveSymbolsResponse activeSymbolsResponse = message.data as ActiveSymbolsResponse; _pendingEvents[message.eventId]?.complete(activeSymbolsResponse); + _pendingEvents.remove(message.eventId); case CustomEvent.assetIndex: case CustomEvent.balance: @@ -370,9 +376,24 @@ class IsolateWrappingAPI extends BaseAPI { final AuthorizeReceive authorizeReceive = message.data as AuthorizeReceive; _pendingEvents[message.eventId]?.complete(authorizeReceive); + _pendingEvents.remove(message.eventId); case CustomEvent.landingCompany: case CustomEvent.statesList: case CustomEvent.residenceList: + case CustomEvent.ticksHistory: + final historyMessage = message as TicksHistoryEvent; + + if (historyMessage.tickHistory != null) { + final TicksHistoryResponse historyResponse = + historyMessage.tickHistory!; + _pendingEvents[historyMessage.eventId]?.complete( + TickHistorySubscription(tickHistory: historyResponse), + ); + _pendingEvents.remove(historyMessage.eventId); + } else if (historyMessage.tickStreamItem != null) { + _pendingSubscriptions[historyMessage.eventId] + ?.add(historyMessage.tickStreamItem); + } } } @@ -484,6 +505,34 @@ class IsolateWrappingAPI extends BaseAPI { return responseStream.stream; } + Future subscribeTickHistory( + TicksHistoryRequest request, + ) async { + final event = TicksHistoryEvent( + eventId: _getEventId, + event: CustomEvent.ticksHistory, + request: request, + ); + + final StreamController tickStreamController = + StreamController.broadcast(); + final completer = Completer(); + + _pendingSubscriptions[event.eventId] = tickStreamController; + _pendingEvents[event.eventId] = completer; + + _isolateSendPort?.send(event); + + final tickHistorySubscription = await completer.future; + + final response = tickHistorySubscription.copyWith( + tickHistorySubscription.tickHistory!, + tickStreamController.stream, + ); + + return response; + } + @override Stream? subscribe({ required Request request, diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index 01ba614e50..0bffe93f56 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -118,6 +118,42 @@ class CustomIsolateEvent extends _IsolateEvent { ); } +class TicksHistoryEvent extends CustomIsolateEvent { + TicksHistoryEvent({ + required super.eventId, + required super.event, + required super.request, + super.data, + super.isSubscription, + this.tickStreamItem, + this.tickHistory, + }); + + /// The history of tick + final TicksHistoryResponse? tickHistory; + + /// The stream of the tick + final TickBase? tickStreamItem; + + @override + TicksHistoryEvent copyWith({ + CustomEvent? event, + Request? request, + data, + bool? isSubscription, + TicksHistoryResponse? tickHistory, + TickBase? tickStreamItem, + }) => + TicksHistoryEvent( + eventId: eventId, + event: event ?? super.event, + request: request ?? super.request, + data: data ?? super.data, + tickHistory: tickHistory ?? this.tickHistory, + tickStreamItem: tickStreamItem ?? this.tickStreamItem, + ); +} + enum CustomEvent { ping, authorize, @@ -149,5 +185,6 @@ enum CustomEvent { jTokenCreate, kycAuthStatus, ticks, + ticksHistory, proposalOpenContract, } diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index 27624ec7d9..56ca6ebd08 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -94,6 +94,7 @@ void _handleCustomEvent( BinaryAPI api, SendPort sendPort, ) async { + print('@@@@@@12 ${message.event}'); switch (message.event) { case CustomEvent.ping: case CustomEvent.activeSymbols: @@ -142,6 +143,56 @@ void _handleCustomEvent( ).listen((TicksResponse? tick) { sendPort.send(message.copyWith(data: tick, isSubscription: true)); }); + break; + + case CustomEvent.ticksHistory: + final Stream? responseStream = + api.subscribe(request: message.request); + final Response? firstResponse = await responseStream?.first; + + checkException( + response: firstResponse, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + final tickHistory = message as TicksHistoryEvent; + + if (firstResponse is TicksHistoryReceive) { + + sendPort.send(tickHistory.copyWith( + tickHistory: TicksHistoryResponse.fromJson( + firstResponse.candles, + firstResponse.history, + firstResponse.pipSize, + firstResponse.subscription, + ), + )); + + responseStream?.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is TicksReceive + ? man_tick.Tick.fromJson( + response.tick!, + subscriptionJson: response.subscription, + ) + : response is OHLCResponse + ? OHLC.fromJson( + response.ohlc!, + subscriptionJson: response.subscription, + ) + : null; + }, + ).listen((tick) => + sendPort.send(tickHistory.copyWith(tickStreamItem: tick))); + } + break; case CustomEvent.proposalOpenContract: case CustomEvent.authorize: From a38d042c95c32b175792bd09345a57bb553b2eb2 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Tue, 29 Oct 2024 15:53:47 +0800 Subject: [PATCH 15/17] add debugging prints --- .../connection/api_manager/binary_api.dart | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index b0eeba29ce..2b0aa01669 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -326,13 +326,13 @@ class IsolateWrappingAPI extends BaseAPI { if (message is IsolateResponse) { if (message.isSubscription) { _pendingSubscriptions[message.eventId]?.add(message.response); + print('####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } else { - print( - '#### Retrieved response ${(message.response as Response).msgType}'); final Completer? completer = _pendingEvents[message.eventId]; if (completer != null) { completer.complete(message.response); _pendingEvents.remove(message.eventId); + print('####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } } } @@ -345,6 +345,8 @@ class IsolateWrappingAPI extends BaseAPI { message.data as ActiveSymbolsResponse; _pendingEvents[message.eventId]?.complete(activeSymbolsResponse); _pendingEvents.remove(message.eventId); + print('####12 FUTURE : REAL ONE ${DateTime.now()}'); + break; case CustomEvent.assetIndex: case CustomEvent.balance: @@ -371,12 +373,17 @@ class IsolateWrappingAPI extends BaseAPI { case CustomEvent.kycAuthStatus: case CustomEvent.ticks: _pendingSubscriptions[message.eventId]?.add(message.data); + print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); + break; case CustomEvent.proposalOpenContract: case CustomEvent.authorize: final AuthorizeReceive authorizeReceive = message.data as AuthorizeReceive; _pendingEvents[message.eventId]?.complete(authorizeReceive); _pendingEvents.remove(message.eventId); + print('####12 FUTURE : REAL ONE ${DateTime.now()} '); + break; + case CustomEvent.landingCompany: case CustomEvent.statesList: case CustomEvent.residenceList: @@ -390,10 +397,13 @@ class IsolateWrappingAPI extends BaseAPI { TickHistorySubscription(tickHistory: historyResponse), ); _pendingEvents.remove(historyMessage.eventId); + print('####12 FUTURE : REAL ONE ${DateTime.now()} '); } else if (historyMessage.tickStreamItem != null) { _pendingSubscriptions[historyMessage.eventId] ?.add(historyMessage.tickStreamItem); + print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); } + break; } } From 3f47bec81abb97aa44a2b56bde1405c50ddaaa74 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Tue, 29 Oct 2024 16:20:18 +0800 Subject: [PATCH 16/17] add proposal subscription --- .../response/proposal_response_result.dart | 23 +++----------- .../connection/api_manager/binary_api.dart | 30 ++++++++++++++++--- .../api_manager/isolate_events.dart | 1 + .../connection/api_manager/isolate_task.dart | 20 ++++++++++++- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/lib/api/response/proposal_response_result.dart b/lib/api/response/proposal_response_result.dart index 175c3a8013..61d4f0f065 100644 --- a/lib/api/response/proposal_response_result.dart +++ b/lib/api/response/proposal_response_result.dart @@ -17,6 +17,7 @@ import 'package:flutter_deriv_api/basic_api/generated/proposal_send.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -69,7 +70,8 @@ class ProposalResponse extends ProposalResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the price proposal for contract /// @@ -97,24 +99,7 @@ class ProposalResponse extends ProposalResponseModel { ProposalRequest request, { RequestCompareFunction? comparePredicate, }) => - _api - .subscribe(request: request, comparePredicate: comparePredicate)! - .map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is ProposalReceive - ? ProposalResponse.fromJson( - response.proposal, - response.subscription, - ) - : null; - }, - ); + _api.subscribePriceForContract(request); /// Unsubscribes from price proposal subscription. /// diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 2b0aa01669..12de840630 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -15,15 +15,14 @@ import 'package:flutter_deriv_api/api/manually/tick_base.dart'; import 'package:flutter_deriv_api/api/manually/tick_history_subscription.dart'; import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; -import 'package:flutter_deriv_api/api/response/authorize_response_result.dart'; import 'package:flutter_deriv_api/api/response/landing_company_response_result.dart'; +import 'package:flutter_deriv_api/api/response/proposal_response_result.dart'; import 'package:flutter_deriv_api/api/response/ticks_history_response_result.dart'; import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; import 'package:flutter_deriv_api/basic_api/generated/api.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; -import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/io.dart'; import 'package:flutter_deriv_api/api/models/enums.dart'; @@ -326,13 +325,15 @@ class IsolateWrappingAPI extends BaseAPI { if (message is IsolateResponse) { if (message.isSubscription) { _pendingSubscriptions[message.eventId]?.add(message.response); - print('####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + print( + '####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } else { final Completer? completer = _pendingEvents[message.eventId]; if (completer != null) { completer.complete(message.response); _pendingEvents.remove(message.eventId); - print('####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + print( + '####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } } } @@ -404,6 +405,11 @@ class IsolateWrappingAPI extends BaseAPI { print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); } break; + case CustomEvent.proposal: + final proposalMessage = + message as CustomIsolateEvent; + + _pendingSubscriptions[message.eventId]?.add(proposalMessage.data); } } @@ -500,6 +506,22 @@ class IsolateWrappingAPI extends BaseAPI { return _callEvent(event); } + Stream subscribePriceForContract( + ProposalRequest request, { + RequestCompareFunction? comparePredicate, + }) { + final event = CustomIsolateEvent( + eventId: _getEventId, + request: request, + event: CustomEvent.proposal, + ); + final StreamController responseStream = + StreamController.broadcast(); + _pendingSubscriptions[event.eventId] = responseStream; + _isolateSendPort?.send(event); + return responseStream.stream; + } + Stream subscribeTick(TicksRequest request) { final event = CustomIsolateEvent( request: request, diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart index 0bffe93f56..32584344dd 100644 --- a/lib/services/connection/api_manager/isolate_events.dart +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -187,4 +187,5 @@ enum CustomEvent { ticks, ticksHistory, proposalOpenContract, + proposal, } diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart index 56ca6ebd08..6cf9d2f98a 100644 --- a/lib/services/connection/api_manager/isolate_task.dart +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -159,7 +159,6 @@ void _handleCustomEvent( final tickHistory = message as TicksHistoryEvent; if (firstResponse is TicksHistoryReceive) { - sendPort.send(tickHistory.copyWith( tickHistory: TicksHistoryResponse.fromJson( firstResponse.candles, @@ -211,6 +210,25 @@ void _handleCustomEvent( await _fetchLandingCompnay(api, message, sendPort); case CustomEvent.statesList: case CustomEvent.residenceList: + case CustomEvent.proposal: + api.subscribe(request: message.request)!.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is ProposalReceive + ? ProposalResponse.fromJson( + response.proposal, + response.subscription, + ) + : null; + }, + ).listen((ProposalResponse? response) { + sendPort.send(message.copyWith(data: response)); + }); } } From 08759e8327a95c0af7f2b0d9d5a19232584d8140 Mon Sep 17 00:00:00 2001 From: ramin-deriv Date: Wed, 30 Oct 2024 10:54:32 +0800 Subject: [PATCH 17/17] remove print methods --- .../connection/api_manager/binary_api.dart | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 12de840630..1dd4df5350 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -325,15 +325,15 @@ class IsolateWrappingAPI extends BaseAPI { if (message is IsolateResponse) { if (message.isSubscription) { _pendingSubscriptions[message.eventId]?.add(message.response); - print( - '####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + // print( + // '####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } else { final Completer? completer = _pendingEvents[message.eventId]; if (completer != null) { completer.complete(message.response); _pendingEvents.remove(message.eventId); - print( - '####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + // print( + // '####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); } } } @@ -346,7 +346,7 @@ class IsolateWrappingAPI extends BaseAPI { message.data as ActiveSymbolsResponse; _pendingEvents[message.eventId]?.complete(activeSymbolsResponse); _pendingEvents.remove(message.eventId); - print('####12 FUTURE : REAL ONE ${DateTime.now()}'); + // print('####12 FUTURE : REAL ONE ${DateTime.now()}'); break; case CustomEvent.assetIndex: @@ -374,7 +374,7 @@ class IsolateWrappingAPI extends BaseAPI { case CustomEvent.kycAuthStatus: case CustomEvent.ticks: _pendingSubscriptions[message.eventId]?.add(message.data); - print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); + // print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); break; case CustomEvent.proposalOpenContract: case CustomEvent.authorize: @@ -382,7 +382,7 @@ class IsolateWrappingAPI extends BaseAPI { message.data as AuthorizeReceive; _pendingEvents[message.eventId]?.complete(authorizeReceive); _pendingEvents.remove(message.eventId); - print('####12 FUTURE : REAL ONE ${DateTime.now()} '); + // print('####12 FUTURE : REAL ONE ${DateTime.now()} '); break; case CustomEvent.landingCompany: @@ -398,11 +398,11 @@ class IsolateWrappingAPI extends BaseAPI { TickHistorySubscription(tickHistory: historyResponse), ); _pendingEvents.remove(historyMessage.eventId); - print('####12 FUTURE : REAL ONE ${DateTime.now()} '); + // print('####12 FUTURE : REAL ONE ${DateTime.now()} '); } else if (historyMessage.tickStreamItem != null) { _pendingSubscriptions[historyMessage.eventId] ?.add(historyMessage.tickStreamItem); - print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); + // print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); } break; case CustomEvent.proposal: