Skip to content

Commit 03e7bc9

Browse files
committed
add tick history subscription
1 parent b35e2d0 commit 03e7bc9

File tree

4 files changed

+142
-42
lines changed

4 files changed

+142
-42
lines changed

lib/api/response/ticks_history_response_result.dart

+4-41
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import 'package:flutter_deriv_api/basic_api/generated/ticks_receive.dart';
1515
import 'package:flutter_deriv_api/basic_api/response.dart';
1616
import 'package:flutter_deriv_api/helpers/helpers.dart';
1717
import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart';
18+
import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart';
1819
import 'package:flutter_deriv_api/services/connection/call_manager/base_call_manager.dart';
1920
import 'package:deriv_dependency_injector/dependency_injector.dart';
2021

@@ -95,7 +96,8 @@ class TicksHistoryResponse extends TicksHistoryResponseModel {
9596
return resultMap;
9697
}
9798

98-
static final BaseAPI _api = Injector()<BaseAPI>();
99+
static final IsolateWrappingAPI _api =
100+
Injector()<BaseAPI>() as IsolateWrappingAPI;
99101

100102
/// Gets the [TickHistory] for the given [symbol] in [request]
101103
///
@@ -124,46 +126,7 @@ class TicksHistoryResponse extends TicksHistoryResponseModel {
124126
bool subscribe = true,
125127
}) async {
126128
if (subscribe) {
127-
final Stream<Response>? responseStream =
128-
_api.subscribe(request: request, comparePredicate: comparePredicate);
129-
final Response? firstResponse = await responseStream?.first;
130-
131-
checkException(
132-
response: firstResponse,
133-
exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) =>
134-
BaseAPIException(baseExceptionModel: baseExceptionModel),
135-
);
136-
if (firstResponse is TicksHistoryReceive) {
137-
return TickHistorySubscription(
138-
tickHistory: TicksHistoryResponse.fromJson(
139-
firstResponse.candles,
140-
firstResponse.history,
141-
firstResponse.pipSize,
142-
firstResponse.subscription),
143-
tickStream: responseStream?.map<TickBase?>(
144-
(Response response) {
145-
checkException(
146-
response: response,
147-
exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) =>
148-
BaseAPIException(baseExceptionModel: baseExceptionModel),
149-
);
150-
151-
return response is TicksReceive
152-
? Tick.fromJson(
153-
response.tick!,
154-
subscriptionJson: response.subscription,
155-
)
156-
: response is OHLCResponse
157-
? OHLC.fromJson(
158-
response.ohlc!,
159-
subscriptionJson: response.subscription,
160-
)
161-
: null;
162-
},
163-
),
164-
);
165-
}
166-
return null;
129+
return _api.subscribeTickHistory(request);
167130
} else {
168131
return TickHistorySubscription(
169132
tickHistory: await fetchTickHistory(request),

lib/services/connection/api_manager/binary_api.dart

+50-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ import 'package:flutter/services.dart';
88

99
import 'package:flutter/widgets.dart';
1010
import 'package:flutter_deriv_api/api/exceptions/base_api_exception.dart';
11-
import 'package:flutter_deriv_api/api/manually/tick.dart';
11+
import 'package:flutter_deriv_api/api/manually/ohlc_response.dart';
12+
import 'package:flutter_deriv_api/api/manually/ohlc_response_result.dart';
13+
import 'package:flutter_deriv_api/api/manually/tick.dart' as man_tick;
14+
import 'package:flutter_deriv_api/api/manually/tick_base.dart';
15+
import 'package:flutter_deriv_api/api/manually/tick_history_subscription.dart';
1216
import 'package:flutter_deriv_api/api/models/base_exception_model.dart';
1317
import 'package:flutter_deriv_api/api/response/active_symbols_response_result.dart';
1418
import 'package:flutter_deriv_api/api/response/authorize_response_result.dart';
1519
import 'package:flutter_deriv_api/api/response/landing_company_response_result.dart';
20+
import 'package:flutter_deriv_api/api/response/ticks_history_response_result.dart';
1621
import 'package:flutter_deriv_api/api/response/ticks_response_result.dart';
1722
import 'package:flutter_deriv_api/basic_api/generated/active_symbols_receive.dart';
1823
import 'package:flutter_deriv_api/basic_api/generated/active_symbols_send.dart';
@@ -339,6 +344,7 @@ class IsolateWrappingAPI extends BaseAPI {
339344
final ActiveSymbolsResponse activeSymbolsResponse =
340345
message.data as ActiveSymbolsResponse;
341346
_pendingEvents[message.eventId]?.complete(activeSymbolsResponse);
347+
_pendingEvents.remove(message.eventId);
342348

343349
case CustomEvent.assetIndex:
344350
case CustomEvent.balance:
@@ -370,9 +376,24 @@ class IsolateWrappingAPI extends BaseAPI {
370376
final AuthorizeReceive authorizeReceive =
371377
message.data as AuthorizeReceive;
372378
_pendingEvents[message.eventId]?.complete(authorizeReceive);
379+
_pendingEvents.remove(message.eventId);
373380
case CustomEvent.landingCompany:
374381
case CustomEvent.statesList:
375382
case CustomEvent.residenceList:
383+
case CustomEvent.ticksHistory:
384+
final historyMessage = message as TicksHistoryEvent;
385+
386+
if (historyMessage.tickHistory != null) {
387+
final TicksHistoryResponse historyResponse =
388+
historyMessage.tickHistory!;
389+
_pendingEvents[historyMessage.eventId]?.complete(
390+
TickHistorySubscription(tickHistory: historyResponse),
391+
);
392+
_pendingEvents.remove(historyMessage.eventId);
393+
} else if (historyMessage.tickStreamItem != null) {
394+
_pendingSubscriptions[historyMessage.eventId]
395+
?.add(historyMessage.tickStreamItem);
396+
}
376397
}
377398
}
378399

@@ -484,6 +505,34 @@ class IsolateWrappingAPI extends BaseAPI {
484505
return responseStream.stream;
485506
}
486507

508+
Future<TickHistorySubscription> subscribeTickHistory(
509+
TicksHistoryRequest request,
510+
) async {
511+
final event = TicksHistoryEvent(
512+
eventId: _getEventId,
513+
event: CustomEvent.ticksHistory,
514+
request: request,
515+
);
516+
517+
final StreamController<TickBase> tickStreamController =
518+
StreamController.broadcast();
519+
final completer = Completer<TickHistorySubscription>();
520+
521+
_pendingSubscriptions[event.eventId] = tickStreamController;
522+
_pendingEvents[event.eventId] = completer;
523+
524+
_isolateSendPort?.send(event);
525+
526+
final tickHistorySubscription = await completer.future;
527+
528+
final response = tickHistorySubscription.copyWith(
529+
tickHistorySubscription.tickHistory!,
530+
tickStreamController.stream,
531+
);
532+
533+
return response;
534+
}
535+
487536
@override
488537
Stream<Response>? subscribe({
489538
required Request request,

lib/services/connection/api_manager/isolate_events.dart

+37
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,42 @@ class CustomIsolateEvent<T> extends _IsolateEvent {
118118
);
119119
}
120120

121+
class TicksHistoryEvent<TickHistorySubscription> extends CustomIsolateEvent {
122+
TicksHistoryEvent({
123+
required super.eventId,
124+
required super.event,
125+
required super.request,
126+
super.data,
127+
super.isSubscription,
128+
this.tickStreamItem,
129+
this.tickHistory,
130+
});
131+
132+
/// The history of tick
133+
final TicksHistoryResponse? tickHistory;
134+
135+
/// The stream of the tick
136+
final TickBase? tickStreamItem;
137+
138+
@override
139+
TicksHistoryEvent<TickHistorySubscription> copyWith({
140+
CustomEvent? event,
141+
Request? request,
142+
data,
143+
bool? isSubscription,
144+
TicksHistoryResponse? tickHistory,
145+
TickBase? tickStreamItem,
146+
}) =>
147+
TicksHistoryEvent(
148+
eventId: eventId,
149+
event: event ?? super.event,
150+
request: request ?? super.request,
151+
data: data ?? super.data,
152+
tickHistory: tickHistory ?? this.tickHistory,
153+
tickStreamItem: tickStreamItem ?? this.tickStreamItem,
154+
);
155+
}
156+
121157
enum CustomEvent {
122158
ping,
123159
authorize,
@@ -149,5 +185,6 @@ enum CustomEvent {
149185
jTokenCreate,
150186
kycAuthStatus,
151187
ticks,
188+
ticksHistory,
152189
proposalOpenContract,
153190
}

lib/services/connection/api_manager/isolate_task.dart

+51
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ void _handleCustomEvent(
9494
BinaryAPI api,
9595
SendPort sendPort,
9696
) async {
97+
print('@@@@@@12 ${message.event}');
9798
switch (message.event) {
9899
case CustomEvent.ping:
99100
case CustomEvent.activeSymbols:
@@ -142,6 +143,56 @@ void _handleCustomEvent(
142143
).listen((TicksResponse? tick) {
143144
sendPort.send(message.copyWith(data: tick, isSubscription: true));
144145
});
146+
break;
147+
148+
case CustomEvent.ticksHistory:
149+
final Stream<Response>? responseStream =
150+
api.subscribe(request: message.request);
151+
final Response? firstResponse = await responseStream?.first;
152+
153+
checkException(
154+
response: firstResponse,
155+
exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) =>
156+
BaseAPIException(baseExceptionModel: baseExceptionModel),
157+
);
158+
159+
final tickHistory = message as TicksHistoryEvent;
160+
161+
if (firstResponse is TicksHistoryReceive) {
162+
163+
sendPort.send(tickHistory.copyWith(
164+
tickHistory: TicksHistoryResponse.fromJson(
165+
firstResponse.candles,
166+
firstResponse.history,
167+
firstResponse.pipSize,
168+
firstResponse.subscription,
169+
),
170+
));
171+
172+
responseStream?.map<TickBase?>(
173+
(Response response) {
174+
checkException(
175+
response: response,
176+
exceptionCreator: ({BaseExceptionModel? baseExceptionModel}) =>
177+
BaseAPIException(baseExceptionModel: baseExceptionModel),
178+
);
179+
180+
return response is TicksReceive
181+
? man_tick.Tick.fromJson(
182+
response.tick!,
183+
subscriptionJson: response.subscription,
184+
)
185+
: response is OHLCResponse
186+
? OHLC.fromJson(
187+
response.ohlc!,
188+
subscriptionJson: response.subscription,
189+
)
190+
: null;
191+
},
192+
).listen((tick) =>
193+
sendPort.send(tickHistory.copyWith(tickStreamItem: tick)));
194+
}
195+
break;
145196

146197
case CustomEvent.proposalOpenContract:
147198
case CustomEvent.authorize:

0 commit comments

Comments
 (0)