Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ added: v23.8.0
-->

* `options` {Object}
* `body` {ArrayBuffer | ArrayBufferView | Blob}
* `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream<Uint8Array>}
* `sendOrder` {number}
* Returns: {Promise} for a {quic.QuicStream}

Expand All @@ -489,7 +489,7 @@ added: v23.8.0
-->

* `options` {Object}
* `body` {ArrayBuffer | ArrayBufferView | Blob}
* `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream<Uint8Array>}
* `sendOrder` {number}
* Returns: {Promise} for a {quic.QuicStream}

Expand Down Expand Up @@ -820,7 +820,7 @@ The callback to invoke when the stream is reset. Read/write.
added: v23.8.0
-->

* Type: {ReadableStream}
* Type: {ReadableStream | undefined}

### `stream.session`

Expand Down
37 changes: 37 additions & 0 deletions lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const {
Endpoint: Endpoint_,
Http3Application: Http3,
setCallbacks,
DataQueueFeeder,

// The constants to be exposed to end users for various options.
CC_ALGO_RENO_STR: CC_ALGO_RENO,
Expand Down Expand Up @@ -114,6 +115,10 @@ const {
buildNgHeaderString,
} = require('internal/http2/util');

const {
isReadableStream,
} = require('internal/webstreams/readablestream');

const kEmptyObject = { __proto__: null };

const {
Expand Down Expand Up @@ -546,6 +551,37 @@ setCallbacks({
function validateBody(body) {
// TODO(@jasnell): Support streaming sources
if (body === undefined) return body;
if (isReadableStream(body)) {
const feeder = new DataQueueFeeder();
const reader = body.getReader();

const feeding = async () => {
await feeder.ready();
let cont = true;

while (cont) {
let read;
try {
read = await reader.read();
} catch (error) {
feeder.error(error);
}
const { value, done } = read;
try {
cont = await feeder.submit(value, done);
} catch (error) {
reader.cancel(error.toString());
break;
}
}
if (!cont) {
reader.releaseLock();
}

};
feeding();
return feeder;
}
// Transfer ArrayBuffers...
if (isArrayBuffer(body)) {
return ArrayBufferPrototypeTransfer(body);
Expand Down Expand Up @@ -578,6 +614,7 @@ function validateBody(body) {
'ArrayBuffer',
'ArrayBufferView',
'Blob',
'ReadableStream',
], body);
}

Expand Down
76 changes: 75 additions & 1 deletion src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <memory>
#include <vector>

#include "../quic/streams.h"

namespace node {

using v8::ArrayBufferView;
Expand Down Expand Up @@ -1061,9 +1063,76 @@ class FdEntry final : public EntryImpl {
friend class ReaderImpl;
};

} // namespace
// ============================================================================

} // namespace
class FeederEntry final : public EntryImpl {
public:
FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) {}

static std::unique_ptr<FeederEntry> Create(DataQueueFeeder* feeder) {
return std::make_unique<FeederEntry>(feeder);
}

std::shared_ptr<DataQueue::Reader> get_reader() override {
return ReaderImpl::Create(this);
}

std::unique_ptr<Entry> slice(
uint64_t start, std::optional<uint64_t> end = std::nullopt) override {
// we are not idempotent
return std::unique_ptr<Entry>(nullptr);
}

std::optional<uint64_t> size() const override {
return std::optional<uint64_t>();
}

bool is_idempotent() const override { return false; }

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(FeederEntry)
SET_SELF_SIZE(FeederEntry)

private:
DataQueueFeeder* feeder_;

class ReaderImpl final : public DataQueue::Reader,
public std::enable_shared_from_this<ReaderImpl> {
public:
static std::shared_ptr<ReaderImpl> Create(FeederEntry* entry) {
return std::make_shared<ReaderImpl>(entry);
}

explicit ReaderImpl(FeederEntry* entry) : entry_(entry) {}

~ReaderImpl() { entry_->feeder_->DrainAndClose(); }

int Pull(Next next,
int options,
DataQueue::Vec* data,
size_t count,
size_t max_count_hint = bob::kMaxCountHint) override {
if (entry_->feeder_->Done()) {
std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
return bob::STATUS_EOS;
}
entry_->feeder_->addPendingPull(
DataQueueFeeder::PendingPull(std::move(next)));
entry_->feeder_->tryWakePulls();
return bob::STATUS_WAIT;
}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(FeederEntry::Reader)
SET_SELF_SIZE(ReaderImpl)

private:
FeederEntry* entry_;
};
};

// ============================================================================

std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
std::vector<std::unique_ptr<Entry>> list) {
Expand Down Expand Up @@ -1137,6 +1206,11 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(Environment* env,
return FdEntry::Create(env, path);
}

std::unique_ptr<DataQueue::Entry> DataQueue::CreateFeederEntry(
DataQueueFeeder* feeder) {
return FeederEntry::Create(feeder);
}

void DataQueue::Initialize(Environment* env, v8::Local<v8::Object> target) {
// Nothing to do here currently.
}
Expand Down
6 changes: 6 additions & 0 deletions src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <vector>

namespace node {
using v8::Local;
using v8::Value;

// Represents a sequenced collection of data sources that can be
// consumed as a single logical stream of data. Sources can be
Expand Down Expand Up @@ -124,6 +126,8 @@ namespace node {
// For non-idempotent DataQueues, only a single reader is ever allowed for
// the DataQueue, and the data can only ever be read once.

class DataQueueFeeder;

class DataQueue : public MemoryRetainer {
public:
struct Vec {
Expand Down Expand Up @@ -224,6 +228,8 @@ class DataQueue : public MemoryRetainer {
static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
v8::Local<v8::Value> path);

static std::unique_ptr<Entry> CreateFeederEntry(DataQueueFeeder* feeder);

// Creates a Reader for the given queue. If the queue is idempotent,
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
Expand Down
9 changes: 7 additions & 2 deletions src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <unordered_map>
#include "defs.h"

namespace node::quic {
namespace node {
class DataQueueFeeder;
namespace quic {

class Endpoint;
class Packet;
Expand All @@ -24,6 +26,7 @@ class Packet;

// The FunctionTemplates the BindingData will store for us.
#define QUIC_CONSTRUCTORS(V) \
V(dataqueuefeeder) \
V(endpoint) \
V(http3application) \
V(logstream) \
Expand Down Expand Up @@ -68,6 +71,7 @@ class Packet;
V(ciphers, "ciphers") \
V(crl, "crl") \
V(cubic, "cubic") \
V(dataqueuefeeder, "DataQueueFeeder") \
V(disable_stateless_reset, "disableStatelessReset") \
V(enable_connect_protocol, "enableConnectProtocol") \
V(enable_datagrams, "enableDatagrams") \
Expand Down Expand Up @@ -264,6 +268,7 @@ struct CallbackScope final : public CallbackScopeBase {
explicit CallbackScope(T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {}
};

} // namespace node::quic
} // namespace quic
} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
3 changes: 3 additions & 0 deletions src/quic/quic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void CreatePerIsolateProperties(IsolateData* isolate_data,
Endpoint::InitPerIsolate(isolate_data, target);
Session::InitPerIsolate(isolate_data, target);
Stream::InitPerIsolate(isolate_data, target);
DataQueueFeeder::InitPerIsolate(isolate_data, target);
}

void CreatePerContextProperties(Local<Object> target,
Expand All @@ -38,13 +39,15 @@ void CreatePerContextProperties(Local<Object> target,
Endpoint::InitPerContext(realm, target);
Session::InitPerContext(realm, target);
Stream::InitPerContext(realm, target);
DataQueueFeeder::InitPerContext(realm, target);
}

void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
BindingData::RegisterExternalReferences(registry);
Endpoint::RegisterExternalReferences(registry);
Session::RegisterExternalReferences(registry);
Stream::RegisterExternalReferences(registry);
DataQueueFeeder::RegisterExternalReferences(registry);
}

} // namespace quic
Expand Down
2 changes: 2 additions & 0 deletions src/quic/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,8 @@ BaseObjectPtr<Stream> Session::CreateStream(
if (auto stream = Stream::Create(this, id, std::move(data_source)))
[[likely]] {
AddStream(stream, option);
ResumeStream(id); // ok, we need to resume, as the Resume before fails
// as the stream was not added yet
return stream;
}
return {};
Expand Down
Loading
Loading