diff --git a/doc/api/quic.md b/doc/api/quic.md index 723c26c5bd0b99..9ea06e8e8bd1cf 100644 --- a/doc/api/quic.md +++ b/doc/api/quic.md @@ -475,7 +475,7 @@ added: v23.8.0 --> * `options` {Object} - * `body` {ArrayBuffer | ArrayBufferView | Blob} + * `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream} * `sendOrder` {number} * Returns: {Promise} for a {quic.QuicStream} @@ -489,7 +489,7 @@ added: v23.8.0 --> * `options` {Object} - * `body` {ArrayBuffer | ArrayBufferView | Blob} + * `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream} * `sendOrder` {number} * Returns: {Promise} for a {quic.QuicStream} @@ -820,7 +820,7 @@ The callback to invoke when the stream is reset. Read/write. added: v23.8.0 --> -* Type: {ReadableStream} +* Type: {ReadableStream | undefined} ### `stream.session` diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 6ad476eed5216f..d61b732650605a 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -34,6 +34,7 @@ const { Endpoint: Endpoint_, Http3Application: Http3, setCallbacks, + DataQueueFeeder, // The constants to be exposed to end users for various options. CC_ALGO_RENO_STR: CC_ALGO_RENO, @@ -114,6 +115,10 @@ const { buildNgHeaderString, } = require('internal/http2/util'); +const { + isReadableStream, +} = require('internal/webstreams/readablestream'); + const kEmptyObject = { __proto__: null }; const { @@ -546,6 +551,37 @@ setCallbacks({ function validateBody(body) { // TODO(@jasnell): Support streaming sources if (body === undefined) return body; + if (isReadableStream(body)) { + const feeder = new DataQueueFeeder(); + const reader = body.getReader(); + + const feeding = async () => { + await feeder.ready(); + let cont = true; + + while (cont) { + let read; + try { + read = await reader.read(); + } catch (error) { + feeder.error(error); + } + const { value, done } = read; + try { + cont = await feeder.submit(value, done); + } catch (error) { + reader.cancel(error.toString()); + break; + } + } + if (!cont) { + reader.releaseLock(); + } + + }; + feeding(); + return feeder; + } // Transfer ArrayBuffers... if (isArrayBuffer(body)) { return ArrayBufferPrototypeTransfer(body); @@ -578,6 +614,7 @@ function validateBody(body) { 'ArrayBuffer', 'ArrayBufferView', 'Blob', + 'ReadableStream', ], body); } diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 64643680903a78..e0a6dd7aef312e 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -18,6 +18,8 @@ #include #include +#include "../quic/streams.h" + namespace node { using v8::ArrayBufferView; @@ -1061,9 +1063,76 @@ class FdEntry final : public EntryImpl { friend class ReaderImpl; }; +} // namespace // ============================================================================ -} // namespace +class FeederEntry final : public EntryImpl { + public: + FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) {} + + static std::unique_ptr Create(DataQueueFeeder* feeder) { + return std::make_unique(feeder); + } + + std::shared_ptr get_reader() override { + return ReaderImpl::Create(this); + } + + std::unique_ptr slice( + uint64_t start, std::optional end = std::nullopt) override { + // we are not idempotent + return std::unique_ptr(nullptr); + } + + std::optional size() const override { + return std::optional(); + } + + bool is_idempotent() const override { return false; } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(FeederEntry) + SET_SELF_SIZE(FeederEntry) + + private: + DataQueueFeeder* feeder_; + + class ReaderImpl final : public DataQueue::Reader, + public std::enable_shared_from_this { + public: + static std::shared_ptr Create(FeederEntry* entry) { + return std::make_shared(entry); + } + + explicit ReaderImpl(FeederEntry* entry) : entry_(entry) {} + + ~ReaderImpl() { entry_->feeder_->DrainAndClose(); } + + int Pull(Next next, + int options, + DataQueue::Vec* data, + size_t count, + size_t max_count_hint = bob::kMaxCountHint) override { + if (entry_->feeder_->Done()) { + std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + return bob::STATUS_EOS; + } + entry_->feeder_->addPendingPull( + DataQueueFeeder::PendingPull(std::move(next))); + entry_->feeder_->tryWakePulls(); + return bob::STATUS_WAIT; + } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(FeederEntry::Reader) + SET_SELF_SIZE(ReaderImpl) + + private: + FeederEntry* entry_; + }; +}; + +// ============================================================================ std::shared_ptr DataQueue::CreateIdempotent( std::vector> list) { @@ -1137,6 +1206,11 @@ std::unique_ptr DataQueue::CreateFdEntry(Environment* env, return FdEntry::Create(env, path); } +std::unique_ptr DataQueue::CreateFeederEntry( + DataQueueFeeder* feeder) { + return FeederEntry::Create(feeder); +} + void DataQueue::Initialize(Environment* env, v8::Local target) { // Nothing to do here currently. } diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index a37bd27549986e..dc9871b712753f 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -16,6 +16,8 @@ #include namespace node { +using v8::Local; +using v8::Value; // Represents a sequenced collection of data sources that can be // consumed as a single logical stream of data. Sources can be @@ -124,6 +126,8 @@ namespace node { // For non-idempotent DataQueues, only a single reader is ever allowed for // the DataQueue, and the data can only ever be read once. +class DataQueueFeeder; + class DataQueue : public MemoryRetainer { public: struct Vec { @@ -224,6 +228,8 @@ class DataQueue : public MemoryRetainer { static std::unique_ptr CreateFdEntry(Environment* env, v8::Local path); + static std::unique_ptr CreateFeederEntry(DataQueueFeeder* feeder); + // Creates a Reader for the given queue. If the queue is idempotent, // any number of readers can be created, all of which are guaranteed // to provide the same data. Otherwise, only a single reader is diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index 1b29a54a8c1199..7be72a45e08d24 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -15,7 +15,9 @@ #include #include "defs.h" -namespace node::quic { +namespace node { +class DataQueueFeeder; +namespace quic { class Endpoint; class Packet; @@ -24,6 +26,7 @@ class Packet; // The FunctionTemplates the BindingData will store for us. #define QUIC_CONSTRUCTORS(V) \ + V(dataqueuefeeder) \ V(endpoint) \ V(http3application) \ V(logstream) \ @@ -68,6 +71,7 @@ class Packet; V(ciphers, "ciphers") \ V(crl, "crl") \ V(cubic, "cubic") \ + V(dataqueuefeeder, "DataQueueFeeder") \ V(disable_stateless_reset, "disableStatelessReset") \ V(enable_connect_protocol, "enableConnectProtocol") \ V(enable_datagrams, "enableDatagrams") \ @@ -264,6 +268,7 @@ struct CallbackScope final : public CallbackScopeBase { explicit CallbackScope(T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {} }; -} // namespace node::quic +} // namespace quic +} // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/quic.cc b/src/quic/quic.cc index edfb5dc9e66295..13fc8c2fda8a07 100644 --- a/src/quic/quic.cc +++ b/src/quic/quic.cc @@ -27,6 +27,7 @@ void CreatePerIsolateProperties(IsolateData* isolate_data, Endpoint::InitPerIsolate(isolate_data, target); Session::InitPerIsolate(isolate_data, target); Stream::InitPerIsolate(isolate_data, target); + DataQueueFeeder::InitPerIsolate(isolate_data, target); } void CreatePerContextProperties(Local target, @@ -38,6 +39,7 @@ void CreatePerContextProperties(Local target, Endpoint::InitPerContext(realm, target); Session::InitPerContext(realm, target); Stream::InitPerContext(realm, target); + DataQueueFeeder::InitPerContext(realm, target); } void RegisterExternalReferences(ExternalReferenceRegistry* registry) { @@ -45,6 +47,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { Endpoint::RegisterExternalReferences(registry); Session::RegisterExternalReferences(registry); Stream::RegisterExternalReferences(registry); + DataQueueFeeder::RegisterExternalReferences(registry); } } // namespace quic diff --git a/src/quic/session.cc b/src/quic/session.cc index e546803fa881e5..3d48cffae21f04 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -1940,6 +1940,8 @@ BaseObjectPtr Session::CreateStream( if (auto stream = Stream::Create(this, id, std::move(data_source))) [[likely]] { AddStream(stream, option); + ResumeStream(id); // ok, we need to resume, as the Resume before fails + // as the stream was not added yet return stream; } return {}; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 8fe5b72ce1fe5b..ba72e23271eb05 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -17,9 +17,11 @@ namespace node { +using quic::BindingData; using v8::Array; using v8::ArrayBuffer; using v8::ArrayBufferView; +using v8::BackingStore; using v8::BigInt; using v8::Global; using v8::Integer; @@ -30,6 +32,8 @@ using v8::Nothing; using v8::Object; using v8::ObjectTemplate; using v8::SharedArrayBuffer; +using v8::TypedArray; +using v8::Uint8Array; using v8::Value; namespace quic { @@ -215,8 +219,16 @@ Maybe> Stream::GetDataQueueFromSource( entries.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( std::move(backing), 0, backing->ByteLength())); return Just(DataQueue::CreateIdempotent(std::move(entries))); + } else if (DataQueueFeeder::HasInstance(env, value)) { + // a DataQueueFeeder + DataQueueFeeder* dataQueueFeeder; + ASSIGN_OR_RETURN_UNWRAP( + &dataQueueFeeder, value, Nothing>()); + std::shared_ptr dataQueue = DataQueue::Create(); + dataQueue->append(DataQueue::CreateFeederEntry(dataQueueFeeder)); + return Just(dataQueue); } - // TODO(jasnell): Add streaming sources... + THROW_ERR_INVALID_ARG_TYPE(env, "Invalid data source type"); return Nothing>(); } @@ -367,9 +379,13 @@ struct Stream::Impl { // Returns a Blob::Reader that can be used to read data that has been // received on the stream. + // returns undefined if local unidirectional stream JS_METHOD(GetReader) { Stream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.This()); + if (stream->is_local_unidirectional()) { + return args.GetReturnValue().SetUndefined(); + } BaseObjectPtr reader = stream->get_reader(); if (reader) return args.GetReturnValue().Set(reader->object()); THROW_ERR_INVALID_STATE(Environment::GetCurrent(args), @@ -1319,6 +1335,160 @@ void Stream::Unschedule() { } } // namespace quic + +DataQueueFeeder::DataQueueFeeder(Environment* env, Local object) + : AsyncWrap(env, object) { + MakeWeak(); +} + +void DataQueueFeeder::tryWakePulls() { + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + // I do not think, that this can error... + (void)resolver->Resolve(env()->context(), v8::True(env()->isolate())); + readFinish_.Reset(); + } +} + +void DataQueueFeeder::DrainAndClose() { + if (done) return; + while (!pendingPulls_.empty()) { + auto& pending = pendingPulls_.front(); + auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); }); + pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); + } + if (!readFinish_.IsEmpty()) { + Local resolver = readFinish_.Get(env()->isolate()); + (void)resolver->Resolve(env()->context(), v8::False(env()->isolate())); + readFinish_.Reset(); + } + done = true; +} + +JS_METHOD_IMPL(DataQueueFeeder::New) { + DCHECK(args.IsConstructCall()); + auto env = Environment::GetCurrent(args); + new DataQueueFeeder(env, args.This()); +} + +JS_METHOD_IMPL(DataQueueFeeder::Ready) { + Environment* env = Environment::GetCurrent(args); + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + if (feeder->pendingPulls_.size() > 0) { + feeder->readFinish_.Reset(); + return; + } else { + Local readFinish = + Promise::Resolver::New(env->context()).ToLocalChecked(); + feeder->readFinish_.Reset(env->isolate(), readFinish); + args.GetReturnValue().Set(readFinish->GetPromise()); + return; + } +} + +JS_METHOD_IMPL(DataQueueFeeder::Submit) { + Environment* env = Environment::GetCurrent(args); + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + + bool done = false; + if (args[1]->IsBoolean() && args[1].As()->Value()) { + done = true; + } + if (!args[0].IsEmpty()) { + CHECK_GT(feeder->pendingPulls_.size(), 0); + auto chunk = args[0]; + + if (chunk->IsArrayBuffer()) { + auto buffer = chunk.As(); + chunk = Uint8Array::New(buffer, 0, buffer->ByteLength()); + } + if (!chunk->IsTypedArray()) { + THROW_ERR_INVALID_ARG_TYPE( + env, "Invalid data must be Arraybuffer or TypedArray"); + } + Local typedArray = chunk.As(); + // now we create a copy + // detaching, would not be a good idea for example, such + // a limitation is not given with W3C Webtransport + // if we do not do it here, a transform stream would + // be needed to do the copy in the Webtransport case. + // there may be also troubles, if multiple Uint8Array + // are derived in a parser from a single ArrayBuffer + size_t nread = typedArray->ByteLength(); + JS_TRY_ALLOCATE_BACKING(env, backingUniq, nread); + std::shared_ptr backing = std::move(backingUniq); + + auto originalStore = typedArray->Buffer()->GetBackingStore(); + const void* originalData = + static_cast(originalStore->Data()) + typedArray->ByteOffset(); + memcpy(backing->Data(), originalData, nread); + auto& pending = feeder->pendingPulls_.front(); + auto pop = OnScopeLeave([feeder] { feeder->pendingPulls_.pop_front(); }); + DataQueue::Vec vec; + vec.base = static_cast(backing->Data()); + vec.len = static_cast(nread); + pending.next(bob::STATUS_CONTINUE, &vec, 1, [backing](uint64_t) {}); + } + if (done) { + feeder->DrainAndClose(); + feeder->readFinish_.Reset(); + args.GetReturnValue().Set(v8::False(env->isolate())); + return; + } else { + if (feeder->pendingPulls_.size() > 0) { + feeder->readFinish_.Reset(); + args.GetReturnValue().Set(v8::True(env->isolate())); + return; + } else { + Local readFinish = + Promise::Resolver::New(env->context()).ToLocalChecked(); + feeder->readFinish_.Reset(env->isolate(), readFinish); + args.GetReturnValue().Set(readFinish->GetPromise()); + return; + } + } +} + +JS_METHOD_IMPL(DataQueueFeeder::Error) { + DataQueueFeeder* feeder; + ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This()); + // FIXME, how should I pass on the error + // ResetStream must be send also + feeder->DrainAndClose(); +} + +JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, { + auto isolate = env->isolate(); + JS_NEW_CONSTRUCTOR(); + JS_INHERIT(AsyncWrap); + JS_CLASS(dataqueuefeeder); + SetProtoMethod(isolate, tmpl, "error", Error); + SetProtoMethod(isolate, tmpl, "submit", Submit); + SetProtoMethod(isolate, tmpl, "ready", Ready); +}) + +void DataQueueFeeder::InitPerIsolate(IsolateData* data, + Local target) { + // TODO(@jasnell): Implement the per-isolate state +} + +void DataQueueFeeder::InitPerContext(Realm* realm, Local target) { + SetConstructorFunction(realm->context(), + target, + "DataQueueFeeder", + GetConstructorTemplate(realm->env())); +} + +void DataQueueFeeder::RegisterExternalReferences( + ExternalReferenceRegistry* registry) { + registry->Register(New); + registry->Register(Submit); + registry->Register(Error); + registry->Register(Ready); +} + } // namespace node #endif // OPENSSL_NO_QUIC diff --git a/src/quic/streams.h b/src/quic/streams.h index c230815d78e4be..038415686a6fbd 100644 --- a/src/quic/streams.h +++ b/src/quic/streams.h @@ -15,7 +15,11 @@ #include "bindingdata.h" #include "data.h" -namespace node::quic { +namespace node { + +using v8::Global; +using v8::Promise; +namespace quic { class Session; class Stream; @@ -387,7 +391,51 @@ class Stream final : public AsyncWrap, void Schedule(Queue* queue); void Unschedule(); }; +} // namespace quic +class DataQueueFeeder final : public AsyncWrap { + public: + using Next = bob::Next; + + DataQueueFeeder(Environment* env, v8::Local object); + + JS_CONSTRUCTOR(DataQueueFeeder); + JS_BINDING_INIT_BOILERPLATE(); + + static BaseObjectPtr Create(); + + void setDataQueue(std::shared_ptr queue) { dataQueue_ = queue; } + + void tryWakePulls(); + void DrainAndClose(); + + struct PendingPull { + Next next; + explicit PendingPull(Next next) : next(std::move(next)) {} + }; + + void addPendingPull(PendingPull toAdd) { + pendingPulls_.emplace_back(std::move(toAdd)); + } + + bool Done() { return done; } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(DataQueueFeeder) + SET_SELF_SIZE(DataQueueFeeder) + + JS_METHOD(New); + JS_METHOD(Submit); + JS_METHOD(Error); + JS_METHOD(Ready); + + private: + std::shared_ptr dataQueue_; + Global readFinish_; + + std::deque pendingPulls_; + bool done = false; +}; -} // namespace node::quic +} // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/test/parallel/test-quic-server-to-client-unidirectional.mjs b/test/parallel/test-quic-server-to-client-unidirectional.mjs new file mode 100644 index 00000000000000..933b8860ab8a22 --- /dev/null +++ b/test/parallel/test-quic-server-to-client-unidirectional.mjs @@ -0,0 +1,109 @@ +// Flags: --experimental-quic --no-warnings + +import { hasQuic, skip, mustCall } from '../common/index.mjs'; +import { ok, strictEqual, deepStrictEqual } from 'node:assert'; +import { readKey } from '../common/fixtures.mjs'; +import { TransformStream } from 'node:stream/web'; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +// Import after the hasQuic check +const { listen, connect } = await import('node:quic'); +const { createPrivateKey } = await import('node:crypto'); + +const keys = createPrivateKey(readKey('agent1-key.pem')); +const certs = readKey('agent1-cert.pem'); + +// The opened promise should resolve when the client finished reading +const clientFinished = Promise.withResolvers(); + +// start demo data +// FIX ME: move the following to a central place +// if used in several tests +// taken from @fails-components/webtransport tests +// by the original author +function createBytesChunk(length) { + const workArray = new Array(length / 2); + for (let i = 0; i < length / 4; i++) { + workArray[2 * i + 1] = length % 0xffff; + workArray[2 * i] = i; + } + const helper = new Uint16Array(workArray); + const toreturn = new Uint8Array( + helper.buffer, + helper.byteOffset, + helper.byteLength + ); + return toreturn; +} + +// The number in the comments, help you identify the chunk, as it is the length first two bytes +// this is helpful, when debugging buffer passing +const KNOWN_BYTES_LONG = [ + createBytesChunk(60000), // 96, 234 + createBytesChunk(12), // 0, 12 + createBytesChunk(50000), // 195, 80 + createBytesChunk(1600), // 6, 64 + createBytesChunk(20000), // 78, 32 + createBytesChunk(30000), // 117, 48 +]; + +// end demo data + +function uint8concat(arrays) { + const length = arrays.reduce((acc, curr) => acc + curr.length, 0); + const result = new Uint8Array(length); + let pos = 0; + let array = 0; + while (pos < length) { + const curArr = arrays[array]; + const curLen = curArr.byteLength; + const dest = new Uint8Array(result.buffer, result.byteOffset + pos, curLen); + dest.set(curArr); + array++; + pos += curArr.byteLength; + } +} + +const serverEndpoint = await listen(async (serverSession) => { + await serverSession.opened; + const transformStream = new TransformStream(); + const sendStream = await serverSession.createUnidirectionalStream({ body: transformStream.readable }); + strictEqual(sendStream.direction, 'uni'); + const serverWritable = transformStream.writable; + const writer = serverWritable.getWriter(); + for (const chunk of KNOWN_BYTES_LONG) { + await writer.ready; + await writer.write(chunk); + } + await writer.ready; + await writer.close(); + serverSession.close(); +}, { keys, certs }); + +// The server must have an address to connect to after listen resolves. +ok(serverEndpoint.address !== undefined); + +const clientSession = await connect(serverEndpoint.address); + +clientSession.onstream = mustCall(async (stream) => { + strictEqual(stream.direction, 'uni', 'Expects an unidirectional stream'); + const reader = stream.readable.getReader(); + const readChunks = []; + while (true) { + const { done, value } = await reader.read(); + if (value) { + ok(value instanceof Uint8Array, 'Expects value to be a Uint8Array'); + readChunks.push(value); + } + if (done) break; + } + // Now compare what we got + deepStrictEqual(uint8concat(KNOWN_BYTES_LONG), uint8concat(readChunks)); + clientFinished.resolve(); +}, 1); + +await clientFinished.promise; +clientSession.close();