Skip to content

Commit 8a90f32

Browse files
authored
feat: Implement backpressure handling (#115)
* Add drain event * Add socket.pause() method * Add socket.resume() method
1 parent 8990fb0 commit 8a90f32

19 files changed

+545
-280
lines changed

README.md

+13-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ React Native TCP socket API for Android, iOS & macOS with **client SSL/TLS suppo
2020
- [Server](#server)
2121
- [SSL Client](#ssl-client)
2222
- [API](#api)
23-
- [TcpSocket](#tcpsocket)
23+
- [Socket](#socket)
2424
- [`createConnection()`](#createconnection)
2525
- [Server](#server-1)
2626
- [`listen()`](#listen)
@@ -218,7 +218,7 @@ _Note: In order to use self-signed certificates make sure to [update your metro.
218218
## API
219219
Here are listed all methods implemented in `react-native-tcp-socket`, their functionalities are equivalent to those provided by Node's [net](https://nodejs.org/api/net.html) (more info on [#41](https://github.com/Rapsssito/react-native-tcp-socket/issues/41)). However, the **methods whose interface differs from Node are marked in bold**.
220220

221-
### TcpSocket
221+
### Socket
222222
* **Methods:**
223223
* **[`TcpSocket.createConnection(options[, callback])`](#createconnection)**
224224
* [`address()`](https://nodejs.org/api/net.html#net_socket_address)
@@ -229,17 +229,28 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func
229229
* [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay)
230230
* [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback)
231231
* [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback)
232+
* [`pause()`](https://nodejs.org/api/net.html#net_socket_pause)
233+
* `ref()` - _Will not have any effect_
234+
* [`resume()`](https://nodejs.org/api/net.html#net_socket_resume)
235+
* `unref()` - _Will not have any effect_
232236
* **Properties:**
237+
* Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable):
238+
* [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain)
233239
* [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress)
234240
* [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily)
235241
* [`remotePort`](https://nodejs.org/api/net.html#net_socket_remoteport)
236242
* [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress)
237243
* [`localPort`](https://nodejs.org/api/net.html#net_socket_localport)
238244
* **Events:**
245+
* Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable):
246+
* [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause)
247+
* [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume)
239248
* [`'close'`](https://nodejs.org/api/net.html#net_event_close_1)
240249
* [`'connect'`](https://nodejs.org/api/net.html#net_event_connect)
241250
* [`'data'`](https://nodejs.org/api/net.html#net_event_data)
251+
* [`'drain'`](https://nodejs.org/api/net.html#net_event_drain)
242252
* [`'error'`](https://nodejs.org/api/net.html#net_event_error_1)
253+
* [`'timeout'`](https://nodejs.org/api/net.html#net_event_timeout)
243254

244255
#### `createConnection()`
245256
`createConnection(options[, callback])` creates a TCP connection using the given [`options`](#createconnection-options).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.asterinet.react.tcpsocket;
2+
3+
import android.util.Base64;
4+
5+
import com.facebook.react.bridge.Arguments;
6+
import com.facebook.react.bridge.ReactContext;
7+
import com.facebook.react.bridge.WritableMap;
8+
import com.facebook.react.modules.core.DeviceEventManagerModule;
9+
10+
import java.net.Inet6Address;
11+
import java.net.InetAddress;
12+
import java.net.InetSocketAddress;
13+
import java.net.ServerSocket;
14+
import java.net.Socket;
15+
16+
import javax.annotation.Nullable;
17+
18+
public class TcpEventListener {
19+
20+
private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter;
21+
22+
public TcpEventListener(final ReactContext reactContext) {
23+
rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class);
24+
}
25+
26+
public void onConnection(int serverId, int clientId, Socket socket) {
27+
WritableMap eventParams = Arguments.createMap();
28+
eventParams.putInt("id", serverId);
29+
30+
WritableMap infoParams = Arguments.createMap();
31+
infoParams.putInt("id", clientId);
32+
33+
WritableMap connectionParams = Arguments.createMap();
34+
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
35+
36+
connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
37+
connectionParams.putInt("localPort", socket.getLocalPort());
38+
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
39+
connectionParams.putInt("remotePort", socket.getPort());
40+
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");
41+
42+
infoParams.putMap("connection", connectionParams);
43+
eventParams.putMap("info", infoParams);
44+
45+
sendEvent("connection", eventParams);
46+
}
47+
48+
public void onConnect(int id, TcpSocketClient client) {
49+
WritableMap eventParams = Arguments.createMap();
50+
eventParams.putInt("id", id);
51+
WritableMap connectionParams = Arguments.createMap();
52+
Socket socket = client.getSocket();
53+
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
54+
55+
connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
56+
connectionParams.putInt("localPort", socket.getLocalPort());
57+
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
58+
connectionParams.putInt("remotePort", socket.getPort());
59+
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");
60+
eventParams.putMap("connection", connectionParams);
61+
sendEvent("connect", eventParams);
62+
}
63+
64+
public void onListen(int id, TcpSocketServer server) {
65+
WritableMap eventParams = Arguments.createMap();
66+
eventParams.putInt("id", id);
67+
WritableMap connectionParams = Arguments.createMap();
68+
ServerSocket serverSocket = server.getServerSocket();
69+
InetAddress address = serverSocket.getInetAddress();
70+
71+
connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress());
72+
connectionParams.putInt("localPort", serverSocket.getLocalPort());
73+
connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4");
74+
eventParams.putMap("connection", connectionParams);
75+
sendEvent("listening", eventParams);
76+
}
77+
78+
public void onData(int id, byte[] data) {
79+
WritableMap eventParams = Arguments.createMap();
80+
eventParams.putInt("id", id);
81+
eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP));
82+
83+
sendEvent("data", eventParams);
84+
}
85+
86+
public void onWritten(int id, int msgId, @Nullable String error) {
87+
WritableMap eventParams = Arguments.createMap();
88+
eventParams.putInt("id", id);
89+
eventParams.putInt("msgId", msgId);
90+
eventParams.putString("err", error);
91+
92+
sendEvent("written", eventParams);
93+
}
94+
95+
public void onClose(int id, String error) {
96+
if (error != null) {
97+
onError(id, error);
98+
}
99+
WritableMap eventParams = Arguments.createMap();
100+
eventParams.putInt("id", id);
101+
eventParams.putBoolean("hadError", error != null);
102+
103+
sendEvent("close", eventParams);
104+
}
105+
106+
public void onError(int id, String error) {
107+
WritableMap eventParams = Arguments.createMap();
108+
eventParams.putInt("id", id);
109+
eventParams.putString("error", error);
110+
111+
sendEvent("error", eventParams);
112+
}
113+
114+
private void sendEvent(String eventName, WritableMap params) {
115+
rctEvtEmitter.emit(eventName, params);
116+
}
117+
}
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,64 @@
11
package com.asterinet.react.tcpsocket;
22

3-
import android.os.AsyncTask;
4-
import android.util.Pair;
5-
63
import java.io.BufferedInputStream;
74
import java.io.IOException;
8-
import java.net.InetSocketAddress;
9-
import java.util.Arrays;
105
import java.net.Socket;
6+
import java.util.Arrays;
117

128
/**
13-
* This is a specialized AsyncTask that receives data from a socket in the background, and
9+
* This is a specialized Runnable that receives data from a socket in the background, and
1410
* notifies it's listener when data is received. This is not threadsafe, the listener
1511
* should handle synchronicity.
1612
*/
17-
class TcpReceiverTask extends AsyncTask<Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>, Void, Void> {
13+
public class TcpReceiverTask implements Runnable {
14+
15+
private final TcpSocketClient clientSocket;
16+
private final TcpEventListener receiverListener;
17+
private boolean paused = false;
18+
19+
public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) {
20+
this.clientSocket = clientSocket;
21+
this.receiverListener = receiverListener;
22+
}
23+
1824
/**
1925
* An infinite loop to block and read data from the socket.
2026
*/
21-
@SafeVarargs
2227
@Override
23-
protected final Void doInBackground(Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>... params) {
24-
if (params.length > 1) {
25-
throw new IllegalArgumentException("This task is only for a single socket/listener pair.");
26-
}
27-
28-
TcpSocketClient clientSocket = params[0].first;
29-
OnDataReceivedListener receiverListener = params[0].second;
28+
public void run() {
3029
int socketId = clientSocket.getId();
3130
Socket socket = clientSocket.getSocket();
32-
byte[] buffer = new byte[8192];
33-
int bufferCount;
31+
byte[] buffer = new byte[16384];
3432
try {
3533
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
36-
while (!isCancelled() && !socket.isClosed()) {
37-
bufferCount = in.read(buffer);
34+
while (!socket.isClosed()) {
35+
int bufferCount = in.read(buffer);
36+
waitIfPaused();
3837
if (bufferCount > 0) {
3938
receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount));
4039
} else if (bufferCount == -1) {
4140
clientSocket.destroy();
4241
}
4342
}
44-
} catch (IOException ioe) {
43+
} catch (IOException | InterruptedException ioe) {
4544
if (receiverListener != null && !socket.isClosed()) {
4645
receiverListener.onError(socketId, ioe.getMessage());
4746
}
48-
this.cancel(false);
4947
}
50-
return null;
5148
}
5249

53-
/**
54-
* Listener interface for receive events.
55-
*/
56-
@SuppressWarnings("WeakerAccess")
57-
public interface OnDataReceivedListener {
58-
void onConnection(Integer serverId, Integer clientId, Socket socket);
59-
60-
void onConnect(Integer id, TcpSocketClient client);
61-
62-
void onListen(Integer id, TcpSocketServer server);
63-
64-
void onData(Integer id, byte[] data);
50+
public synchronized void pause() {
51+
paused = true;
52+
}
6553

66-
void onClose(Integer id, String error);
54+
public synchronized void resume() {
55+
paused = false;
56+
notify();
57+
}
6758

68-
void onError(Integer id, String error);
59+
private synchronized void waitIfPaused() throws InterruptedException {
60+
while (paused) {
61+
wait();
62+
}
6963
}
7064
}

android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java

+33-29
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import android.content.Context;
44
import android.net.Network;
5-
import android.util.Pair;
65

76
import com.facebook.react.bridge.ReadableMap;
87

98
import java.io.IOException;
10-
import java.io.OutputStream;
119
import java.net.InetAddress;
1210
import java.net.InetSocketAddress;
1311
import java.net.Socket;
@@ -23,21 +21,19 @@
2321
import androidx.annotation.Nullable;
2422

2523
class TcpSocketClient extends TcpSocket {
26-
private final ExecutorService executorService;
27-
private TcpReceiverTask receiverTask;
24+
private final ExecutorService listenExecutor;
25+
private final ExecutorService writeExecutor;
26+
private final TcpEventListener receiverListener;
27+
private final TcpReceiverTask receiverTask;
2828
private Socket socket;
29-
private TcpReceiverTask.OnDataReceivedListener mReceiverListener;
3029

31-
TcpSocketClient(@NonNull final TcpReceiverTask.OnDataReceivedListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
30+
TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
3231
super(id);
33-
this.executorService = Executors.newFixedThreadPool(1);
32+
listenExecutor = Executors.newSingleThreadExecutor();
33+
writeExecutor = Executors.newSingleThreadExecutor();
34+
receiverTask = new TcpReceiverTask(this, receiverListener);
3435
this.socket = socket;
35-
receiverTask = new TcpReceiverTask();
36-
mReceiverListener = receiverListener;
37-
}
38-
39-
ExecutorService getExecutorService() {
40-
return this.executorService;
36+
this.receiverListener = receiverListener;
4137
}
4238

4339
public Socket getSocket() {
@@ -83,43 +79,43 @@ public void connect(@NonNull final Context context, @NonNull final String addres
8379
startListening();
8480
}
8581

86-
@SuppressWarnings("WeakerAccess")
8782
public void startListening() {
88-
//noinspection unchecked
89-
receiverTask.executeOnExecutor(getExecutorService(), new Pair<>(this, mReceiverListener));
83+
listenExecutor.execute(receiverTask);
9084
}
9185

9286
/**
9387
* Sends data from the socket
9488
*
9589
* @param data data to be sent
9690
*/
97-
public void write(final byte[] data) throws IOException {
98-
if (socket == null) {
99-
throw new IOException("Socket is not connected.");
100-
}
101-
OutputStream output = socket.getOutputStream();
102-
output.write(data);
91+
public void write(final int msgId, final byte[] data) {
92+
writeExecutor.execute(new Runnable() {
93+
@Override
94+
public void run() {
95+
try {
96+
socket.getOutputStream().write(data);
97+
receiverListener.onWritten(getId(), msgId, null);
98+
} catch (IOException e) {
99+
receiverListener.onWritten(getId(), msgId, e.toString());
100+
receiverListener.onError(getId(), e.toString());
101+
}
102+
}
103+
});
103104
}
104105

105106
/**
106107
* Shuts down the receiver task, closing the socket.
107108
*/
108109
public void destroy() {
109110
try {
110-
if (receiverTask != null && !receiverTask.isCancelled()) {
111-
// stop the receiving task
112-
receiverTask.cancel(true);
113-
getExecutorService().shutdown();
114-
}
115111
// close the socket
116112
if (socket != null && !socket.isClosed()) {
117113
socket.close();
118-
mReceiverListener.onClose(getId(), null);
114+
receiverListener.onClose(getId(), null);
119115
socket = null;
120116
}
121117
} catch (IOException e) {
122-
mReceiverListener.onClose(getId(), e.getMessage());
118+
receiverListener.onClose(getId(), e.getMessage());
123119
}
124120
}
125121

@@ -143,4 +139,12 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO
143139
// `initialDelay` is ignored
144140
socket.setKeepAlive(enable);
145141
}
142+
143+
public void pause() {
144+
receiverTask.pause();
145+
}
146+
147+
public void resume() {
148+
receiverTask.resume();
149+
}
146150
}

0 commit comments

Comments
 (0)