forked from me-no-dev/ESPAsyncWebServer
-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathAsyncEventSource.h
314 lines (275 loc) · 9.94 KB
/
AsyncEventSource.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov
#ifndef ASYNCEVENTSOURCE_H_
#define ASYNCEVENTSOURCE_H_
#include <Arduino.h>
#ifdef ESP32
#include <AsyncTCP.h>
#include <mutex>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 32
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 16 * 1024 // but no more than 16k, no need to blow it, since same data is kept in local Q
#elif defined(ESP8266)
#include <ESPAsyncTCP.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 8
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 8 * 1024 // but no more than 8k, no need to blow it, since same data is kept in local Q
#elif defined(TARGET_RP2040)
#include <AsyncTCP_RP2040W.h>
#ifndef SSE_MAX_QUEUED_MESSAGES
#define SSE_MAX_QUEUED_MESSAGES 32
#endif
#define SSE_MIN_INFLIGH 2 * 1460 // allow 2 MSS packets
#define SSE_MAX_INFLIGH 16 * 1024 // but no more than 16k, no need to blow it, since same data is kept in local Q
#endif
#include <ESPAsyncWebServer.h>
#ifdef ESP8266
#include <Hash.h>
#ifdef CRYPTO_HASH_h // include Hash.h from espressif framework if the first include was from the crypto library
#include <../src/Hash.h>
#endif
#endif
class AsyncEventSource;
class AsyncEventSourceResponse;
class AsyncEventSourceClient;
using ArEventHandlerFunction = std::function<void(AsyncEventSourceClient *client)>;
using ArAuthorizeConnectHandler = ArAuthorizeFunction;
// shared message object container
using AsyncEvent_SharedData_t = std::shared_ptr<String>;
/**
* @brief Async Event Message container with shared message content data
*
*/
class AsyncEventSourceMessage {
private:
const AsyncEvent_SharedData_t _data;
size_t _sent{0}; // num of bytes already sent
size_t _acked{0}; // num of bytes acked
public:
AsyncEventSourceMessage(AsyncEvent_SharedData_t data) : _data(data){};
#ifdef ESP32
AsyncEventSourceMessage(const char *data, size_t len) : _data(std::make_shared<String>(data, len)){};
#else
// esp8266's String does not have constructor with data/length arguments. Use a concat method here
AsyncEventSourceMessage(const char *data, size_t len) {
_data->concat(data, len);
};
#endif
/**
* @brief acknowledge sending len bytes of data
* @note if num of bytes to ack is larger then the unacknowledged message length the number of carried over bytes are returned
*
* @param len bytes to acknowledge
* @param time
* @return size_t number of extra bytes carried over
*/
size_t ack(size_t len, uint32_t time = 0);
/**
* @brief write message data to client's buffer
* @note this method does NOT call client's send
*
* @param client
* @return size_t number of bytes written
*/
size_t write(AsyncClient *client);
/**
* @brief writes message data to client's buffer and calls client's send method
*
* @param client
* @return size_t returns num of bytes the clien was able to send()
*/
size_t send(AsyncClient *client);
// returns true if full message's length were acked
bool finished() {
return _acked == _data->length();
}
/**
* @brief returns true if all data has been sent already
*
*/
bool sent() {
return _sent == _data->length();
}
};
/**
* @brief class holds a sse messages queue for a particular client's connection
*
*/
class AsyncEventSourceClient {
private:
AsyncClient *_client;
AsyncEventSource *_server;
uint32_t _lastId{0};
size_t _inflight{0}; // num of unacknowledged bytes that has been written to socket buffer
size_t _max_inflight{SSE_MAX_INFLIGH}; // max num of unacknowledged bytes that could be written to socket buffer
std::list<AsyncEventSourceMessage> _messageQueue;
#ifdef ESP32
mutable std::mutex _lockmq;
#endif
bool _queueMessage(const char *message, size_t len);
bool _queueMessage(AsyncEvent_SharedData_t &&msg);
void _runQueue();
public:
AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server);
~AsyncEventSourceClient();
/**
* @brief Send an SSE message to client
* it will craft an SSE message and place it to client's message queue
*
* @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
* @param event body string, a sinle line string
* @param id sequence id
* @param reconnect client's reconnect timeout
* @return true if message was placed in a queue
* @return false if queue is full
*/
bool send(const char *message, const char *event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
bool send(const String &message, const String &event, uint32_t id = 0, uint32_t reconnect = 0) {
return send(message.c_str(), event.c_str(), id, reconnect);
}
bool send(const String &message, const char *event, uint32_t id = 0, uint32_t reconnect = 0) {
return send(message.c_str(), event, id, reconnect);
}
/**
* @brief place supplied preformatted SSE message to the message queue
* @note message must a properly formatted SSE string according to https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
*
* @param message data
* @return true on success
* @return false on queue overflow or no client connected
*/
bool write(AsyncEvent_SharedData_t message) {
return connected() && _queueMessage(std::move(message));
};
[[deprecated("Use _write(AsyncEvent_SharedData_t message) instead to share same data with multiple SSE clients")]]
bool write(const char *message, size_t len) {
return connected() && _queueMessage(message, len);
};
// close client's connection
void close();
// getters
AsyncClient *client() {
return _client;
}
bool connected() const {
return _client && _client->connected();
}
uint32_t lastId() const {
return _lastId;
}
size_t packetsWaiting() const {
return _messageQueue.size();
};
/**
* @brief Sets max amount of bytes that could be written to client's socket while awaiting delivery acknowledge
* used to throttle message delivery length to tradeoff memory consumption
* @note actual amount of data written could possible be a bit larger but no more than available socket buff space
*
* @param value
*/
void set_max_inflight_bytes(size_t value);
/**
* @brief Get current max inflight bytes value
*
* @return size_t
*/
size_t get_max_inflight_bytes() const {
return _max_inflight;
}
// system callbacks (do not call if from user code!)
void _onAck(size_t len, uint32_t time);
void _onPoll();
void _onTimeout(uint32_t time);
void _onDisconnect();
};
/**
* @brief a class that maintains all connected HTTP clients subscribed to SSE delivery
* dispatches supplied messages to the client's queues
*
*/
class AsyncEventSource : public AsyncWebHandler {
private:
String _url;
std::list<std::unique_ptr<AsyncEventSourceClient>> _clients;
#ifdef ESP32
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
mutable std::mutex _client_queue_lock;
#endif
ArEventHandlerFunction _connectcb = nullptr;
ArEventHandlerFunction _disconnectcb = nullptr;
// this method manipulates in-fligh data size for connected client depending on number of active connections
void _adjust_inflight_window();
public:
typedef enum {
DISCARDED = 0,
ENQUEUED = 1,
PARTIALLY_ENQUEUED = 2,
} SendStatus;
AsyncEventSource(const char *url) : _url(url){};
AsyncEventSource(const String &url) : _url(url){};
~AsyncEventSource() {
close();
};
const char *url() const {
return _url.c_str();
}
// close all connected clients
void close();
/**
* @brief set on-connect callback for the client
* used to deliver messages to client on first connect
*
* @param cb
*/
void onConnect(ArEventHandlerFunction cb) {
_connectcb = cb;
}
/**
* @brief Send an SSE message to client
* it will craft an SSE message and place it to all connected client's message queues
*
* @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
* @param event body string, a sinle line string
* @param id sequence id
* @param reconnect client's reconnect timeout
* @return SendStatus if message was placed in any/all/part of the client's queues
*/
SendStatus send(const char *message, const char *event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
SendStatus send(const String &message, const String &event, uint32_t id = 0, uint32_t reconnect = 0) {
return send(message.c_str(), event.c_str(), id, reconnect);
}
SendStatus send(const String &message, const char *event, uint32_t id = 0, uint32_t reconnect = 0) {
return send(message.c_str(), event, id, reconnect);
}
// The client pointer sent to the callback is only for reference purposes. DO NOT CALL ANY METHOD ON IT !
void onDisconnect(ArEventHandlerFunction cb) {
_disconnectcb = cb;
}
void authorizeConnect(ArAuthorizeConnectHandler cb);
// returns number of connected clients
size_t count() const;
// returns average number of messages pending in all client's queues
size_t avgPacketsWaiting() const;
// system callbacks (do not call from user code!)
void _addClient(AsyncEventSourceClient *client);
void _handleDisconnect(AsyncEventSourceClient *client);
bool canHandle(AsyncWebServerRequest *request) const override final;
void handleRequest(AsyncWebServerRequest *request) override final;
};
class AsyncEventSourceResponse : public AsyncWebServerResponse {
private:
AsyncEventSource *_server;
public:
AsyncEventSourceResponse(AsyncEventSource *server);
void _respond(AsyncWebServerRequest *request);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
bool _sourceValid() const {
return true;
}
};
#endif /* ASYNCEVENTSOURCE_H_ */