Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 205 additions & 8 deletions ext/node/polyfills/internal/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,141 @@ const IPC_HANDLE_NET_SERVER = "net.Server";
// (SharedHandle). Mirrors Node's `handleConversion["net.Native"]`.
const IPC_HANDLE_NET_NATIVE = "net.Native";
const IPC_HANDLE_DGRAM_SOCKET = "dgram.Socket";
const NODE_SOCKET_GET_COUNT = "NODE_SOCKET_GET_COUNT";
const NODE_SOCKET_COUNT = "NODE_SOCKET_COUNT";
const NODE_SOCKET_NOTIFY_CLOSE = "NODE_SOCKET_NOTIFY_CLOSE";
const NODE_SOCKET_CLOSE_ACK = "NODE_SOCKET_CLOSE_ACK";

let nextSocketListKey = 0;
const socketListsByChild = new WeakMap();

class SocketListSend extends EventEmitter {
constructor(child, server) {
super();
this.child = child;
this.key = `socket-list:${++nextSocketListKey}`;
this.callbacks = new Map();
this.closeCallbacks = new Map();
this.seq = 0;
this.exited = false;
this.onInternalMessage = (message) => {
if (!message || message.key !== this.key) {
return;
}
if (message.cmd === NODE_SOCKET_COUNT) {
const cb = this.callbacks.get(message.id);
if (cb) {
this.callbacks.delete(message.id);
cb(null, message.count);
}
} else if (message.cmd === NODE_SOCKET_CLOSE_ACK) {
const cb = this.closeCallbacks.get(message.id);
if (cb) {
this.closeCallbacks.delete(message.id);
cb();
}
}
};
child.on("internalMessage", this.onInternalMessage);
child.once("exit", () => {
this.exited = true;
child.removeListener("internalMessage", this.onInternalMessage);
for (const cb of this.callbacks.values()) {
cb(null, 0);
}
this.callbacks.clear();
for (const cb of this.closeCallbacks.values()) {
cb();
}
this.closeCallbacks.clear();
this.emit("exit", this);
});
server._setupWorker(this);
}

_send(message, cb) {
if (this.exited || !this.child.connected) {
nextTick(cb);
return;
}
this.child.send(message, (err) => {
if (err) {
cb(err);
}
});
}

getConnections(cb) {
const id = ++this.seq;
this.callbacks.set(id, cb);
this._send({ cmd: NODE_SOCKET_GET_COUNT, key: this.key, id }, (err) => {
if (err) {
this.callbacks.delete(id);
cb(err);
}
});
}

close(cb) {
const id = ++this.seq;
this.closeCallbacks.set(id, cb);
this._send({ cmd: NODE_SOCKET_NOTIFY_CLOSE, key: this.key, id }, (err) => {
if (err) {
this.closeCallbacks.delete(id);
cb();
}
});
}
}

class SocketListReceive {
constructor() {
this.sockets = new Set();
this.closeCallbacks = [];
}

add(socket) {
this.sockets.add(socket);
socket.once("close", () => {
this.sockets.delete(socket);
this._maybeClose();
});
}

getConnections() {
return this.sockets.size;
}

close(cb) {
ArrayPrototypePush(this.closeCallbacks, cb);
this._maybeClose();
}

_maybeClose() {
if (this.sockets.size !== 0) {
return;
}
const callbacks = this.closeCallbacks;
this.closeCallbacks = [];
for (const cb of callbacks) {
nextTick(cb);
}
}
}

function getSocketListSend(child, server) {
let socketLists = socketListsByChild.get(child);
if (!socketLists) {
socketLists = new WeakMap();
socketListsByChild.set(child, socketLists);
}
let socketList = socketLists.get(server);
if (!socketList) {
socketList = new SocketListSend(child, server);
socketLists.set(server, socketList);
}
return socketList;
}

function rawFdFromTcpHandle(tcpHandle) {
if (typeof tcpHandle.fdForIpc !== "function") {
Expand All @@ -1780,22 +1915,31 @@ function rawFdFromTcpHandle(tcpHandle) {
return rawFd;
}

function getIpcHandleInfo(handle, options) {
function getIpcHandleInfo(handle, options, target) {
const { Socket } = lazyNet();
const { Server: NetServer } = lazyNet();
const { Socket: DgramSocket } = lazyDgram();
if (handle instanceof Socket) {
if (!(handle._handle instanceof TCP)) {
notImplemented("ChildProcess.send with non-TCP net.Socket handle");
}
const closeAfterSend = options.keepOpen !== true;
const message = {
cmd: "NODE_HANDLE",
type: IPC_HANDLE_NET_SOCKET,
msg: undefined,
};
if (
closeAfterSend &&
handle.server &&
typeof handle.server._setupWorker === "function"
) {
message.socketListKey = getSocketListSend(target, handle.server).key;
}
return {
rawFd: rawFdFromTcpHandle(handle._handle),
message: {
cmd: "NODE_HANDLE",
type: IPC_HANDLE_NET_SOCKET,
msg: undefined,
},
closeAfterSend: options.keepOpen !== true,
message,
closeAfterSend,
close() {
handle.parser = null;
handle._httpMessage = null;
Expand Down Expand Up @@ -1997,6 +2141,7 @@ function setupChannel(
// sends (handle or plain message) are queued on it to preserve ordering.
let pendingHandleInfo = null;
let handleQueue = null;
const receivedSocketLists = new Map();

function sendHandleAck() {
const queueOk = [true];
Expand All @@ -2023,6 +2168,53 @@ function setupChannel(
}
}

function sendInternalMessage(message) {
const queueOk = [true];
control.refCounted();
writeFn(ipc, message, NO_RAW_FD, queueOk).then(
() => control.unrefCounted(),
() => control.unrefCounted(),
);
}

function getReceivedSocketList(key) {
let socketList = receivedSocketLists.get(key);
if (!socketList) {
socketList = new SocketListReceive();
receivedSocketLists.set(key, socketList);
}
return socketList;
}

function handleSocketListMessage(message) {
if (message.cmd === NODE_SOCKET_GET_COUNT) {
const socketList = receivedSocketLists.get(message.key);
sendInternalMessage({
cmd: NODE_SOCKET_COUNT,
key: message.key,
id: message.id,
count: socketList ? socketList.getConnections() : 0,
});
return true;
}
if (message.cmd === NODE_SOCKET_NOTIFY_CLOSE) {
const socketList = receivedSocketLists.get(message.key);
const sendAck = () =>
sendInternalMessage({
cmd: NODE_SOCKET_CLOSE_ACK,
key: message.key,
id: message.id,
});
if (socketList) {
socketList.close(sendAck);
} else {
sendAck();
}
return true;
}
return false;
}

async function readLoop() {
try {
while (true) {
Expand All @@ -2048,11 +2240,16 @@ function setupChannel(
// Acknowledge receipt so the sender can close its local copy.
sendHandleAck();
const handle = createIpcHandle(msg, rawFd);
if (msg.socketListKey && handle instanceof Socket) {
getReceivedSocketList(msg.socketListKey).add(handle);
}
nextTick(handleMessage, msg.msg, handle);
continue;
} else if (cmd === "HANDLE_ACK") {
onHandleAck();
continue;
} else if (handleSocketListMessage(msg)) {
continue;
} else {
// TODO(nathanwhit): if we want to support deno-node IPC interop,
// handle any future NODE_HANDLE_* control messages here.
Expand Down Expand Up @@ -2188,7 +2385,7 @@ function setupChannel(
}

if (handle !== undefined) {
handleInfo = getIpcHandleInfo(handle, options);
handleInfo = getIpcHandleInfo(handle, options, target);
handleInfo.message.msg = message;
// Start queueing subsequent sends until the ACK arrives.
handleQueue = [];
Expand Down
4 changes: 4 additions & 0 deletions tests/node_compat/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@
"parallel/test-child-process-fork-close.js": {},
"parallel/test-child-process-fork-detached.js": {},
"parallel/test-child-process-fork-dgram.js": {},
"parallel/test-child-process-fork-getconnections.js": {
"windows": false,
"reason": "net.Socket IPC handle passing uses SCM_RIGHTS, not yet implemented on Windows"
},
"parallel/test-child-process-fork-net-socket.js": {
"windows": false,
"reason": "net.Socket IPC handle passing uses SCM_RIGHTS, not yet implemented on Windows"
Expand Down
Loading