diff --git a/lib/api/manually/tick.dart b/lib/api/manually/tick.dart index f6aa7d3c29..0cd96c8caa 100644 --- a/lib/api/manually/tick.dart +++ b/lib/api/manually/tick.dart @@ -6,6 +6,7 @@ import 'package:flutter_deriv_api/basic_api/generated/api.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; diff --git a/lib/api/response/active_symbols_response_result.dart b/lib/api/response/active_symbols_response_result.dart index c21e198e5e..c37270393c 100644 --- a/lib/api/response/active_symbols_response_result.dart +++ b/lib/api/response/active_symbols_response_result.dart @@ -8,6 +8,7 @@ import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; /// Active symbols response model class. abstract class ActiveSymbolsResponseModel { @@ -56,7 +57,8 @@ class ActiveSymbolsResponse extends ActiveSymbolsResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the list of active symbols. /// @@ -64,19 +66,8 @@ class ActiveSymbolsResponse extends ActiveSymbolsResponseModel { /// Throws an [BaseAPIException] if API response contains an error static Future fetchActiveSymbols( ActiveSymbolsRequest request, - ) async { - final ActiveSymbolsReceive response = await _api.call( - request: request, - ); - - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return ActiveSymbolsResponse.fromJson(response.activeSymbols); - } + ) async => + _api.fetchActiveSymbols(request); /// Creates a copy of instance with given parameters. ActiveSymbolsResponse copyWith({ diff --git a/lib/api/response/authorize_response_result.dart b/lib/api/response/authorize_response_result.dart index a8e4e6e26f..c20f3652c6 100644 --- a/lib/api/response/authorize_response_result.dart +++ b/lib/api/response/authorize_response_result.dart @@ -9,6 +9,7 @@ import 'package:flutter_deriv_api/basic_api/generated/authorize_send.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; /// Authorize response model class. abstract class AuthorizeResponseModel { @@ -48,7 +49,8 @@ class AuthorizeResponse extends AuthorizeResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Authorizes current WebSocket session to act on behalf of the owner of a given token. /// @@ -56,19 +58,8 @@ class AuthorizeResponse extends AuthorizeResponseModel { /// Throws an [BaseAPIException] if API response contains an error. static Future authorizeMethodRaw( AuthorizeRequest request, - ) async { - final AuthorizeReceive response = await _api.call( - request: request, - ); - - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response; - } + ) async => + _api.authorize(request); /// Authorizes current WebSocket session to act on behalf of the owner of a given token. /// diff --git a/lib/api/response/proposal_response_result.dart b/lib/api/response/proposal_response_result.dart index 175c3a8013..61d4f0f065 100644 --- a/lib/api/response/proposal_response_result.dart +++ b/lib/api/response/proposal_response_result.dart @@ -17,6 +17,7 @@ import 'package:flutter_deriv_api/basic_api/generated/proposal_send.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -69,7 +70,8 @@ class ProposalResponse extends ProposalResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the price proposal for contract /// @@ -97,24 +99,7 @@ class ProposalResponse extends ProposalResponseModel { ProposalRequest request, { RequestCompareFunction? comparePredicate, }) => - _api - .subscribe(request: request, comparePredicate: comparePredicate)! - .map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is ProposalReceive - ? ProposalResponse.fromJson( - response.proposal, - response.subscription, - ) - : null; - }, - ); + _api.subscribePriceForContract(request); /// Unsubscribes from price proposal subscription. /// diff --git a/lib/api/response/ticks_history_response_result.dart b/lib/api/response/ticks_history_response_result.dart index 9049bd5c1b..b73b5d5c07 100644 --- a/lib/api/response/ticks_history_response_result.dart +++ b/lib/api/response/ticks_history_response_result.dart @@ -15,6 +15,7 @@ import 'package:flutter_deriv_api/basic_api/generated/ticks_receive.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -95,7 +96,8 @@ class TicksHistoryResponse extends TicksHistoryResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Gets the [TickHistory] for the given [symbol] in [request] /// @@ -124,46 +126,7 @@ class TicksHistoryResponse extends TicksHistoryResponseModel { bool subscribe = true, }) async { if (subscribe) { - final Stream? responseStream = - _api.subscribe(request: request, comparePredicate: comparePredicate); - final Response? firstResponse = await responseStream?.first; - - checkException( - response: firstResponse, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - if (firstResponse is TicksHistoryReceive) { - return TickHistorySubscription( - tickHistory: TicksHistoryResponse.fromJson( - firstResponse.candles, - firstResponse.history, - firstResponse.pipSize, - firstResponse.subscription), - tickStream: responseStream?.map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is TicksReceive - ? Tick.fromJson( - response.tick!, - subscriptionJson: response.subscription, - ) - : response is OHLCResponse - ? OHLC.fromJson( - response.ohlc!, - subscriptionJson: response.subscription, - ) - : null; - }, - ), - ); - } - return null; + return _api.subscribeTickHistory(request); } else { return TickHistorySubscription( tickHistory: await fetchTickHistory(request), diff --git a/lib/api/response/ticks_response_result.dart b/lib/api/response/ticks_response_result.dart index cfd5319702..9b845a32ad 100644 --- a/lib/api/response/ticks_response_result.dart +++ b/lib/api/response/ticks_response_result.dart @@ -12,6 +12,7 @@ import 'package:flutter_deriv_api/basic_api/generated/ticks_send.dart'; import 'package:flutter_deriv_api/basic_api/response.dart'; import 'package:flutter_deriv_api/helpers/helpers.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -64,33 +65,17 @@ class TicksResponse extends TicksResponseModel { return resultMap; } - static final BaseAPI _api = Injector()(); + static final IsolateWrappingAPI _api = + Injector()() as IsolateWrappingAPI; /// Subscribes to a tick for given [TickRequest] /// /// Throws [BaseAPIException] if API response contains an error - static Stream subscribeTick( + static Stream subscribeTick( TicksRequest tickRequest, { RequestCompareFunction? comparePredicate, }) => - _api - .subscribe(request: tickRequest, comparePredicate: comparePredicate)! - .map( - (Response response) { - checkException( - response: response, - exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => - BaseAPIException(baseExceptionModel: baseExceptionModel), - ); - - return response is TicksReceive - ? TicksResponse.fromJson( - response.tick, - response.subscription, - ) - : null; - }, - ); + _api.subscribeTick(tickRequest); /// Unsubscribes all ticks. /// diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 64a7041c70..1dd4df5350 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -2,8 +2,26 @@ import 'dart:async'; import 'dart:convert'; import 'dart:developer' as dev; import 'dart:io'; +import 'dart:isolate'; +import 'dart:ui' as ui; +import 'package:flutter/services.dart'; import 'package:flutter/widgets.dart'; +import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart'; +import 'package:flutter_deriv_api/api/manually/ohlc_response.dart'; +import 'package:flutter_deriv_api/api/manually/ohlc_response_result.dart'; +import 'package:flutter_deriv_api/api/manually/tick.dart' as man_tick; +import 'package:flutter_deriv_api/api/manually/tick_base.dart'; +import 'package:flutter_deriv_api/api/manually/tick_history_subscription.dart'; +import 'package:flutter_deriv_api/api/models/base_exception_model.dart'; +import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart'; +import 'package:flutter_deriv_api/api/response/landing_company_response_result.dart'; +import 'package:flutter_deriv_api/api/response/proposal_response_result.dart'; +import 'package:flutter_deriv_api/api/response/ticks_history_response_result.dart'; +import 'package:flutter_deriv_api/api/response/ticks_response_result.dart'; +import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart'; +import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart'; +import 'package:flutter_deriv_api/basic_api/generated/api.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:web_socket_channel/io.dart'; @@ -21,6 +39,10 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_manager. import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart'; +part 'isolate_events.dart'; + +part 'isolate_task.dart'; + /// This class is for handling Binary API connection and calling Binary APIs. class BinaryAPI extends BaseAPI { /// Initializes [BinaryAPI] instance. @@ -258,3 +280,336 @@ class BinaryAPI extends BaseAPI { } } } + +/// This class is for handling Binary API connection and calling Binary APIs. +class IsolateWrappingAPI extends BaseAPI { + /// Initializes [BinaryAPI] instance. + IsolateWrappingAPI({ + String? key, + bool enableDebug = false, + this.proxyAwareConnection = false, + }) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug) { + _canStartCompleter = Completer(); + + Isolate.spawn( + _isolateTask, + IsolateConfig( + sendPort: _isolateIncomingPort.sendPort, + rootIsolateToken: ServicesBinding.rootIsolateToken, + apiInstanceKey: super.key, + ), + ); + + _isolateIncomingPort.listen((dynamic message) { + if (message is SendPort) { + _isolateSendPort = message; + if (!_canStartCompleter.isCompleted) { + _canStartCompleter.complete(); + } + } + + if (message is ConnectionEventReply) { + switch (message.callback) { + case ConnectionCallbacks.onOpen: + _onOpen?.call(message.key); + break; + case ConnectionCallbacks.onDone: + _onDone?.call(message.key); + break; + case ConnectionCallbacks.onError: + _onError?.call(message.key); + break; + } + } + + if (message is IsolateResponse) { + if (message.isSubscription) { + _pendingSubscriptions[message.eventId]?.add(message.response); + // print( + // '####12 SUBSCRIPTION : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + } else { + final Completer? completer = _pendingEvents[message.eventId]; + if (completer != null) { + completer.complete(message.response); + _pendingEvents.remove(message.eventId); + // print( + // '####12 FUTURE : NORMAL ONE -> ${message.response} : ${DateTime.now()}'); + } + } + } + + if (message is CustomIsolateEvent) { + switch (message.event) { + case CustomEvent.ping: + case CustomEvent.activeSymbols: + final ActiveSymbolsResponse activeSymbolsResponse = + message.data as ActiveSymbolsResponse; + _pendingEvents[message.eventId]?.complete(activeSymbolsResponse); + _pendingEvents.remove(message.eventId); + // print('####12 FUTURE : REAL ONE ${DateTime.now()}'); + break; + + case CustomEvent.assetIndex: + case CustomEvent.balance: + case CustomEvent.buy: + case CustomEvent.accountList: + case CustomEvent.accountClosure: + case CustomEvent.cancel: + case CustomEvent.cashierPayment: + case CustomEvent.changeEmail: + case CustomEvent.changePassword: + case CustomEvent.confirmEmail: + case CustomEvent.contractUpdateHistory: + case CustomEvent.contractUpdate: + case CustomEvent.contractsFor: + case CustomEvent.getAccountStatus: + case CustomEvent.getAccountTypes: + case CustomEvent.getAvailableAccounts: + case CustomEvent.getFinancialAssessment: + case CustomEvent.getLimits: + case CustomEvent.getSelfExclusion: + case CustomEvent.getSettings: + case CustomEvent.identityVerification: + case CustomEvent.jTokenCreate: + case CustomEvent.kycAuthStatus: + case CustomEvent.ticks: + _pendingSubscriptions[message.eventId]?.add(message.data); + // print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); + break; + case CustomEvent.proposalOpenContract: + case CustomEvent.authorize: + final AuthorizeReceive authorizeReceive = + message.data as AuthorizeReceive; + _pendingEvents[message.eventId]?.complete(authorizeReceive); + _pendingEvents.remove(message.eventId); + // print('####12 FUTURE : REAL ONE ${DateTime.now()} '); + break; + + case CustomEvent.landingCompany: + case CustomEvent.statesList: + case CustomEvent.residenceList: + case CustomEvent.ticksHistory: + final historyMessage = message as TicksHistoryEvent; + + if (historyMessage.tickHistory != null) { + final TicksHistoryResponse historyResponse = + historyMessage.tickHistory!; + _pendingEvents[historyMessage.eventId]?.complete( + TickHistorySubscription(tickHistory: historyResponse), + ); + _pendingEvents.remove(historyMessage.eventId); + // print('####12 FUTURE : REAL ONE ${DateTime.now()} '); + } else if (historyMessage.tickStreamItem != null) { + _pendingSubscriptions[historyMessage.eventId] + ?.add(historyMessage.tickStreamItem); + // print('####12 SUBSCRIPTION : REAL ONE ${DateTime.now()} '); + } + break; + case CustomEvent.proposal: + final proposalMessage = + message as CustomIsolateEvent; + + _pendingSubscriptions[message.eventId]?.add(proposalMessage.data); + } + } + + // Check for other messages coming out from Isolate. + }); + } + + final Map> _pendingEvents = + >{}; + + final Map> _pendingSubscriptions = + >{}; + + late final Completer _canStartCompleter; + + final ReceivePort _isolateIncomingPort = ReceivePort(); + SendPort? _isolateSendPort; + int _eventId = 0; + + int get _getEventId => _eventId++; + + /// A flag to indicate if the connection is proxy aware. + final bool proxyAwareConnection; + + /// Call manager instance. + CallManager? _callManager; + + /// Subscription manager instance. + SubscriptionManager? _subscriptionManager; + + /// Gets API call history. + CallHistory? get callHistory => _callManager?.callHistory; + + /// Gets API subscription history. + CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory; + + ConnectionCallback? _onOpen; + ConnectionCallback? _onDone; + ConnectionCallback? _onError; + + @override + Future connect( + ConnectionInformation? connectionInformation, { + ConnectionCallback? onOpen, + ConnectionCallback? onDone, + ConnectionCallback? onError, + bool printResponse = false, + }) async { + await _canStartCompleter.future; + print('Sending Connect event to Ioslate ${DateTime.now()}'); + _isolateSendPort?.send(_WSConnectConfig( + connectionInformation: connectionInformation, + )); + _onOpen = onOpen; + _onDone = onDone; + _onError = onError; + } + + Future _callEvent(_IsolateEvent event) { + final Completer responseCompleter = Completer(); + _pendingEvents[event.eventId] = responseCompleter; + + _isolateSendPort?.send(event); + + return responseCompleter.future; + } + + @override + void addToChannel(Map request) => _isolateSendPort + ?.send(_AddToChannelEvent(request: request, eventId: _getEventId)); + + @override + Future call({ + required Request request, + List nullableKeys = const [], + }) async { + final event = _CallEvent(request: request, eventId: _getEventId); + return _callEvent(event); + } + + /// Gets the list of active symbols. + /// + /// For parameters information refer to [ActiveSymbolsRequest]. + /// Throws an [BaseAPIException] if API response contains an error + Future fetchActiveSymbols( + ActiveSymbolsRequest request, + ) async { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.activeSymbols, + ); + + return _callEvent(event); + } + + Stream subscribePriceForContract( + ProposalRequest request, { + RequestCompareFunction? comparePredicate, + }) { + final event = CustomIsolateEvent( + eventId: _getEventId, + request: request, + event: CustomEvent.proposal, + ); + final StreamController responseStream = + StreamController.broadcast(); + _pendingSubscriptions[event.eventId] = responseStream; + _isolateSendPort?.send(event); + return responseStream.stream; + } + + Stream subscribeTick(TicksRequest request) { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.ticks, + ); + + final StreamController responseStream = + StreamController.broadcast(); + _pendingSubscriptions[event.eventId] = responseStream; + + _isolateSendPort?.send(event); + return responseStream.stream; + } + + Future subscribeTickHistory( + TicksHistoryRequest request, + ) async { + final event = TicksHistoryEvent( + eventId: _getEventId, + event: CustomEvent.ticksHistory, + request: request, + ); + + final StreamController tickStreamController = + StreamController.broadcast(); + final completer = Completer(); + + _pendingSubscriptions[event.eventId] = tickStreamController; + _pendingEvents[event.eventId] = completer; + + _isolateSendPort?.send(event); + + final tickHistorySubscription = await completer.future; + + final response = tickHistorySubscription.copyWith( + tickHistorySubscription.tickHistory!, + tickStreamController.stream, + ); + + return response; + } + + @override + Stream? subscribe({ + required Request request, + int cacheSize = 0, + RequestCompareFunction? comparePredicate, + }) { + final StreamController responseStream = + StreamController.broadcast(); + final subEvent = _SubEvent( + request: request, + eventId: _getEventId, + ); + _pendingSubscriptions[subEvent.eventId] = responseStream; + + _isolateSendPort?.send(subEvent); + return responseStream.stream; + } + + @override + Future unsubscribe({required String subscriptionId}) { + final event = + _UnSubEvent(subscriptionId: subscriptionId, eventId: _getEventId); + return _callEvent(event); + } + + @override + Future unsubscribeAll({ + required ForgetStreamType method, + }) { + final event = _UnSubAllEvent(streamType: method, eventId: _getEventId); + return _callEvent(event); + } + + @override + Future disconnect() async { + _isolateSendPort?.send(_DisconnectEvent(eventId: _getEventId)); + } + + Future authorize(AuthorizeRequest request) { + final event = CustomIsolateEvent( + request: request, + eventId: _getEventId, + event: CustomEvent.authorize, + ); + + return _callEvent(event); + } +} diff --git a/lib/services/connection/api_manager/isolate_events.dart b/lib/services/connection/api_manager/isolate_events.dart new file mode 100644 index 0000000000..32584344dd --- /dev/null +++ b/lib/services/connection/api_manager/isolate_events.dart @@ -0,0 +1,191 @@ +part of 'binary_api.dart'; + +class _WSConnectConfig { + _WSConnectConfig({ + this.connectionInformation, + this.printResponse = false, + }); + + final ConnectionInformation? connectionInformation; + + final bool printResponse; +} + +sealed class _IsolateEvent { + _IsolateEvent({required this.eventId, this.isSubscription = false}); + + final int eventId; + final bool isSubscription; +} + +class _AddToChannelEvent extends _IsolateEvent { + _AddToChannelEvent({required this.request, required super.eventId}); + + final Map request; +} + +class _CallEvent extends _IsolateEvent { + _CallEvent({required this.request, required super.eventId}); + + final Request request; +} + +class _SubEvent extends _IsolateEvent { + _SubEvent({ + required this.request, + required super.eventId, + }) : super(isSubscription: true); + + final Request request; +} + +class _UnSubEvent extends _IsolateEvent { + _UnSubEvent({required this.subscriptionId, required super.eventId}); + + final String subscriptionId; +} + +class _UnSubAllEvent extends _IsolateEvent { + _UnSubAllEvent({required this.streamType, required super.eventId}); + + final ForgetStreamType streamType; +} + +class _DisconnectEvent extends _IsolateEvent { + _DisconnectEvent({required super.eventId}); +} + +enum ConnectionCallbacks { + onOpen, + onDone, + onError, +} + +class ConnectionEventReply { + ConnectionEventReply({required this.key, required this.callback}); + + final String key; + final ConnectionCallbacks callback; +} + +class IsolateResponse { + IsolateResponse({ + required this.response, + required this.eventId, + this.isSubscription = false, + }); + + final T response; + final int eventId; + final bool isSubscription; + + @override + String toString() => 'ResponseEvent: [$eventId]: $response'; +} + +/// To communicate event and get the response for custom events that in addition +/// to get the BE JSON the deserialization of the JSON response to dart models +/// also happens inside the isolate. +class CustomIsolateEvent extends _IsolateEvent { + CustomIsolateEvent({ + required super.eventId, + required this.event, + required this.request, + this.data, + this.isSubscription = false, + }); + + final CustomEvent event; + + final Request request; + + final bool isSubscription; + + final T? data; + + CustomIsolateEvent copyWith({ + CustomEvent? event, + Request? request, + T? data, + bool? isSubscription, + }) => + CustomIsolateEvent( + eventId: eventId, + event: event ?? this.event, + request: request ?? this.request, + data: data ?? this.data, + isSubscription: isSubscription ?? this.isSubscription, + ); +} + +class TicksHistoryEvent extends CustomIsolateEvent { + TicksHistoryEvent({ + required super.eventId, + required super.event, + required super.request, + super.data, + super.isSubscription, + this.tickStreamItem, + this.tickHistory, + }); + + /// The history of tick + final TicksHistoryResponse? tickHistory; + + /// The stream of the tick + final TickBase? tickStreamItem; + + @override + TicksHistoryEvent copyWith({ + CustomEvent? event, + Request? request, + data, + bool? isSubscription, + TicksHistoryResponse? tickHistory, + TickBase? tickStreamItem, + }) => + TicksHistoryEvent( + eventId: eventId, + event: event ?? super.event, + request: request ?? super.request, + data: data ?? super.data, + tickHistory: tickHistory ?? this.tickHistory, + tickStreamItem: tickStreamItem ?? this.tickStreamItem, + ); +} + +enum CustomEvent { + ping, + authorize, + landingCompany, + activeSymbols, + statesList, + residenceList, + assetIndex, + balance, + buy, + accountList, + accountClosure, + cancel, + cashierPayment, + changeEmail, + changePassword, + confirmEmail, + contractUpdateHistory, + contractUpdate, + contractsFor, + getAccountStatus, + getAccountTypes, + getAvailableAccounts, + getFinancialAssessment, + getLimits, + getSelfExclusion, + getSettings, + identityVerification, + jTokenCreate, + kycAuthStatus, + ticks, + ticksHistory, + proposalOpenContract, + proposal, +} diff --git a/lib/services/connection/api_manager/isolate_task.dart b/lib/services/connection/api_manager/isolate_task.dart new file mode 100644 index 0000000000..6cf9d2f98a --- /dev/null +++ b/lib/services/connection/api_manager/isolate_task.dart @@ -0,0 +1,267 @@ +part of 'binary_api.dart'; + +class IsolateConfig { + IsolateConfig({ + required this.sendPort, + required this.rootIsolateToken, + required this.apiInstanceKey, + }); + + final SendPort sendPort; + final ui.RootIsolateToken? rootIsolateToken; + + final String apiInstanceKey; +} + +void _isolateTask(IsolateConfig isolateConfig) { + if (isolateConfig.rootIsolateToken != null) { + ui.PlatformDispatcher.instance + .registerBackgroundIsolate(isolateConfig.rootIsolateToken!); + BackgroundIsolateBinaryMessenger.ensureInitialized( + isolateConfig.rootIsolateToken!); + } + final sendPort = isolateConfig.sendPort; + + final ReceivePort receivePort = ReceivePort(); + + final BinaryAPI binaryAPI = BinaryAPI(key: isolateConfig.apiInstanceKey); + + sendPort.send(receivePort.sendPort); + receivePort.listen((message) async { + if (message is _WSConnectConfig) { + await binaryAPI.connect( + message.connectionInformation, + onOpen: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onOpen)), + onDone: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onDone)), + onError: (key) => sendPort.send(ConnectionEventReply( + key: key, callback: ConnectionCallbacks.onError)), + ); + } + + if (message is _IsolateEvent) { + switch (message) { + case _AddToChannelEvent(): + binaryAPI.addToChannel(message.request); + break; + case _CallEvent(): + final response = await binaryAPI.call(request: message.request); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); + break; + + case _SubEvent(): + final stream = binaryAPI.subscribe(request: message.request); + stream?.listen((event) { + sendPort.send( + IsolateResponse( + response: event, + eventId: message.eventId, + isSubscription: true, + ), + ); + }); + break; + case _UnSubEvent(): + final response = await binaryAPI.unsubscribe( + subscriptionId: message.subscriptionId, + ); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId), + ); + break; + case _UnSubAllEvent(): + final response = + await binaryAPI.unsubscribeAll(method: message.streamType); + sendPort.send( + IsolateResponse(response: response, eventId: message.eventId)); + break; + case _DisconnectEvent(): + final response = await binaryAPI.disconnect(); + + break; + case CustomIsolateEvent(): + _handleCustomEvent(message, binaryAPI, sendPort); + } + } + }); +} + +void _handleCustomEvent( + CustomIsolateEvent message, + BinaryAPI api, + SendPort sendPort, +) async { + print('@@@@@@12 ${message.event}'); + switch (message.event) { + case CustomEvent.ping: + case CustomEvent.activeSymbols: + await _fetchActiveSymbols(api, message, sendPort); + break; + + case CustomEvent.assetIndex: + case CustomEvent.balance: + case CustomEvent.buy: + case CustomEvent.accountList: + case CustomEvent.accountClosure: + case CustomEvent.cancel: + case CustomEvent.cashierPayment: + case CustomEvent.changeEmail: + case CustomEvent.changePassword: + case CustomEvent.confirmEmail: + case CustomEvent.contractUpdateHistory: + case CustomEvent.contractUpdate: + case CustomEvent.contractsFor: + case CustomEvent.getAccountStatus: + case CustomEvent.getAccountTypes: + case CustomEvent.getAvailableAccounts: + case CustomEvent.getFinancialAssessment: + case CustomEvent.getLimits: + case CustomEvent.getSelfExclusion: + case CustomEvent.getSettings: + case CustomEvent.identityVerification: + case CustomEvent.jTokenCreate: + case CustomEvent.kycAuthStatus: + case CustomEvent.ticks: + api.subscribe(request: message.request)!.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is TicksReceive + ? TicksResponse.fromJson( + response.tick, + response.subscription, + ) + : null; + }, + ).listen((TicksResponse? tick) { + sendPort.send(message.copyWith(data: tick, isSubscription: true)); + }); + break; + + case CustomEvent.ticksHistory: + final Stream? responseStream = + api.subscribe(request: message.request); + final Response? firstResponse = await responseStream?.first; + + checkException( + response: firstResponse, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + final tickHistory = message as TicksHistoryEvent; + + if (firstResponse is TicksHistoryReceive) { + sendPort.send(tickHistory.copyWith( + tickHistory: TicksHistoryResponse.fromJson( + firstResponse.candles, + firstResponse.history, + firstResponse.pipSize, + firstResponse.subscription, + ), + )); + + responseStream?.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is TicksReceive + ? man_tick.Tick.fromJson( + response.tick!, + subscriptionJson: response.subscription, + ) + : response is OHLCResponse + ? OHLC.fromJson( + response.ohlc!, + subscriptionJson: response.subscription, + ) + : null; + }, + ).listen((tick) => + sendPort.send(tickHistory.copyWith(tickStreamItem: tick))); + } + break; + + case CustomEvent.proposalOpenContract: + case CustomEvent.authorize: + final AuthorizeReceive response = await api.call( + request: message.request, + ); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + sendPort.send(message.copyWith(data: response)); + case CustomEvent.landingCompany: + await _fetchLandingCompnay(api, message, sendPort); + case CustomEvent.statesList: + case CustomEvent.residenceList: + case CustomEvent.proposal: + api.subscribe(request: message.request)!.map( + (Response response) { + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + return response is ProposalReceive + ? ProposalResponse.fromJson( + response.proposal, + response.subscription, + ) + : null; + }, + ).listen((ProposalResponse? response) { + sendPort.send(message.copyWith(data: response)); + }); + } +} + +Future _fetchLandingCompnay(BinaryAPI api, + CustomIsolateEvent message, SendPort sendPort) async { + final LandingCompanyReceive response = + await api.call(request: message.request); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + sendPort.send(message.copyWith( + data: LandingCompanyResponse.fromJson(response.landingCompany))); +} + +Future _fetchActiveSymbols( + BinaryAPI api, + CustomIsolateEvent message, + SendPort sendPort, +) async { + final ActiveSymbolsReceive response = await api.call( + request: message.request, + ); + + checkException( + response: response, + exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) => + BaseAPIException(baseExceptionModel: baseExceptionModel), + ); + + final asResponse = ActiveSymbolsResponse.fromJson(response.activeSymbols); + sendPort.send(message.copyWith(data: asResponse)); +} diff --git a/lib/state/connection/connection_cubit.dart b/lib/state/connection/connection_cubit.dart index 49a8daaae9..19812fc2ea 100644 --- a/lib/state/connection/connection_cubit.dart +++ b/lib/state/connection/connection_cubit.dart @@ -30,7 +30,7 @@ class ConnectionCubit extends Cubit { APIInitializer().initialize( api: api ?? - BinaryAPI( + IsolateWrappingAPI( key: _key, proxyAwareConnection: proxyAwareConnection, enableDebug: enableDebug, @@ -141,7 +141,7 @@ class ConnectionCubit extends Cubit { }, ); - if (_api is BinaryAPI) { + if (_api is IsolateWrappingAPI) { _setupConnectivityListener(); } }