Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions main/lib/dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@

export interface WebTransportDatagramStats {
timestamp: number
expiredOutgoing: bigint
droppedIncoming: bigint
lostOutgoing: bigint
expiredOutgoing: number
droppedIncoming: number
lostOutgoing: number
}

export interface WebTransportStats {
timestamp: number
bytesSent: bigint
packetsSent: bigint
packetsLost: bigint
bytesSent: number
packetsSent: number
packetsLost: number
numOutgoingStreamsCreated: number
numIncomingStreamsCreated: number
bytesReceived: bigint
packetsReceived: bigint
bytesReceived: number
packetsReceived: number
smoothedRtt: number
rttVariation: number
minRtt: number
estimatedSendRate: bigint
estimatedSendRate: number
datagrams: WebTransportDatagramStats
}

export interface WebTransportSendStreamStats {
bytesWritten: bigint
bytesSent: bigint
bytesAcknowledged: bigint
bytesWritten: number
bytesSent: number
bytesAcknowledged: number
}

export interface WebTransportSendGroup {
Expand All @@ -41,7 +41,7 @@ export interface WebTransportCloseInfo {

export interface WebTransportDatagramsWritable extends WritableStream {
sendGroup?: WebTransportSendGroup;
sendOrder: bigint;
sendOrder: number;
};

export interface WebTransportSendOptions {
Expand All @@ -61,7 +61,7 @@ export interface WebTransportDatagramDuplexStream {

export interface WebTransportSendStream extends WritableStream<Uint8Array> {
sendGroup?: WebTransportSendGroup;
sendOrder: bigint;
sendOrder: number;
getStats: () => Promise<WebTransportSendStreamStats>
}

Expand Down Expand Up @@ -106,15 +106,9 @@ export interface WebTransportOptions {
protocols?: string[]
}

export interface WebTransportSendStreamStats {
bytesWritten: bigint;
bytesSent: bigint;
bytesAcknowledged: bigint;
}

export interface WebTransportSendStreamOptions {
sendGroup: WebTransportSendGroup|null
sendOrder?: bigint
sendOrder?: number
waitUntilAvailable?: boolean
}

Expand Down
2 changes: 1 addition & 1 deletion main/lib/http2/browser/browserparser.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export class BrowserParser extends ParserBase {
let object = this.wtstreams.get(streamid)
if (!object) {
object = this.newStream(streamid, {
sendOrder: 0n,
sendOrder: 0,
sendGroupId: 0n
})
if (!object) return // stream broken
Expand Down
2 changes: 1 addition & 1 deletion main/lib/http2/node/capsuleparser.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class Http2CapsuleParser extends ParserBaseHttp2 {
let object = this.wtstreams.get(streamid)
if (!object) {
object = this.newStream(streamid, {
sendOrder: 0n,
sendOrder: 0,
sendGroupId: 0n
})
if (!object) return // stream broken
Expand Down
6 changes: 2 additions & 4 deletions main/lib/http2/node/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ export class Http2WebTransportServer {
this.jsobj.onServerListening(retObj)
})

this.serverInt.on('error', () => {
const retObj = {}
// @ts-ignore
this.jsobj.onServerError(retObj)
this.serverInt.on('error', (error) => {
this.jsobj.onServerError(error)
/* TODO (stream) => {
stream.close(constants.NGHTTP2_CONNECT_ERROR)
}) */
Expand Down
2 changes: 1 addition & 1 deletion main/lib/http2/node/websocketparser.js
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ export class WebSocketParser extends ParserBaseHttp2 {
let object = this.wtstreams.get(streamid)
if (!object) {
object = this.newStream(streamid, {
sendOrder: 0n,
sendOrder: 0,
sendGroupId: 0n
})
if (!object) return // stream broken
Expand Down
4 changes: 2 additions & 2 deletions main/lib/http2/parserbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class ParserBase {

/**
* @param {bigint} streamid
* @param {{sendOrder: bigint,sendGroupId: bigint}} priority
* @param {{sendOrder: number,sendGroupId: bigint}} priority
*/
newStream(streamid, priority) {
const incoming = this.isclient ? !(streamid & 0x1n) : !!(streamid & 0x1n)
Expand Down Expand Up @@ -321,7 +321,7 @@ export class ParserBase {
/**
*
* @param {bigint} streamid
* @param {{sendOrder: bigint, sendGroupId: bigint}} arg2
* @param {{sendOrder: number, sendGroupId: bigint}} arg2
*/
streamUpdateSendOrderAndGroup(streamid, { sendOrder, sendGroupId }) {
this.scheduler.UpdateSendGroup(streamid, sendGroupId)
Expand Down
8 changes: 4 additions & 4 deletions main/lib/http2/priorityscheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
/**
* @typedef {object} Priority
* @property {SendGroupId} sendGroupId
* @property {bigint} sendOrder
* @property {number} sendOrder
*/

/**
* @typedef {bigint} SendGroupId
*/

/**
* @typedef {bigint} SendOrder
* @typedef {number} SendOrder
*/

/**
Expand Down Expand Up @@ -506,7 +506,7 @@ export class PriorityScheduler {
// Alters the priority of an already registered stream.
/**
* @param {StreamId} streamId
* @param {bigint} newSendOrder
* @param {number} newSendOrder
*/
UpdateSendOrder(streamId, newSendOrder) {
const scheduler = this.SchedulerForStream(streamId)
Expand All @@ -527,7 +527,7 @@ export class PriorityScheduler {
}
const isScheduled = scheduler.IsScheduled(streamId)
const sendOrder = scheduler.GetPriorityFor(streamId)
if (!sendOrder) {
if (sendOrder == null) {
throw new Error(
'Stream registered at the top level scheduler, but not at the per-group one'
)
Expand Down
12 changes: 6 additions & 6 deletions main/lib/http2/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ export class Http2WebTransportSession {
})
/** @type {Array<Uint8Array>} */
this.datagramsWaiting_ = []
/** @type {Array<{sendOrder: bigint, sendGroupId: bigint}>} */
/** @type {Array<{sendOrder: number, sendGroupId: bigint}>} */
this.orderUniStreams = []
/** @type {Array<{sendOrder: bigint, sendGroupId: bigint}>} */
/** @type {Array<{sendOrder: number, sendGroupId: bigint}>} */
this.orderBiStreams = []
if (stream) {
if (isclient) {
Expand Down Expand Up @@ -172,7 +172,7 @@ export class Http2WebTransportSession {
})
this.capsParser.newStream(
streamid,
priority || { sendGroupId: 0n, sendOrder: 0n }
priority || { sendGroupId: 0n, sendOrder: 0 }
)
}
}
Expand All @@ -188,7 +188,7 @@ export class Http2WebTransportSession {
this.orderUniStreams.push({
// @ts-ignore
sendGroupId: sendGroup?._sendGroupId || 0n,
sendOrder: sendOrder ?? 0n
sendOrder: sendOrder ?? 0
})
this.trySendingUnidirectionalStreams()
return true
Expand All @@ -210,7 +210,7 @@ export class Http2WebTransportSession {
})
this.capsParser.newStream(
streamid,
priority || { sendGroupId: 0n, sendOrder: 0n }
priority || { sendGroupId: 0n, sendOrder: 0 }
)
}
}
Expand All @@ -226,7 +226,7 @@ export class Http2WebTransportSession {
this.orderBiStreams.push({
// @ts-ignore
sendGroupId: sendGroup?._sendGroupId || 0n,
sendOrder: sendOrder ?? 0n
sendOrder: sendOrder ?? 0
})
this.trySendingBidirectionalStreams()
return true
Expand Down
2 changes: 1 addition & 1 deletion main/lib/http2/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ export class Http2WebTransportStream {

/**
*
* @param {{sendOrder: bigint, sendGroupId: bigint}} args
* @param {{sendOrder: number, sendGroupId: bigint}} args
*/
updateSendOrderAndGroup({ sendOrder, sendGroupId }) {
this.capsuleParser.streamUpdateSendOrderAndGroup(this.streamid, {
Expand Down
3 changes: 3 additions & 0 deletions main/lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ export class HttpServer {
this.sessionStreams[path] = new ReadableStream({
start: async (controller) => {
this.sessionController[path] = controller
},
cancel: () => {
delete this.sessionController[path]
}
})
if (!args || !args.noAutoPaths) {
Expand Down
47 changes: 24 additions & 23 deletions main/lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export class HttpWTSession {
* @return {import('./dom').WebTransportDatagramsWritable}
*/
createWritable: (options) => {
let sendOrder = options?.sendOrder ?? 0n
let sendOrder = options?.sendOrder ?? 0
let sendGroup = options?.sendGroup
/** @type {WebTransportSendStream} */
// @ts-expect-error some props are initially missing
Expand Down Expand Up @@ -199,7 +199,7 @@ export class HttpWTSession {
return sendOrder
},
/**
* @param {bigint} value
* @param {number} value
*/
set: (value) => {
sendOrder = value
Expand Down Expand Up @@ -318,8 +318,8 @@ export class HttpWTSession {
*/
onSessionStats({
timestamp,
expiredOutgoing = BigInt(0),
lostOutgoing = BigInt(0),
expiredOutgoing = 0,
lostOutgoing = 0,
// non Datagram
minRtt = 0,
smoothedRtt = 0,
Expand All @@ -331,21 +331,21 @@ export class HttpWTSession {
if (res)
res({
timestamp,
bytesSent: BigInt(0),
packetsSent: BigInt(0),
packetsLost: BigInt(0),
bytesSent: 0,
packetsSent: 0,
packetsLost: 0,
numOutgoingStreamsCreated: 0,
numIncomingStreamsCreated: 0,
bytesReceived: BigInt(0),
packetsReceived: BigInt(0),
bytesReceived: 0,
packetsReceived: 0,
smoothedRtt,
rttVariation,
minRtt,
estimatedSendRate: estimatedSendRateBps,
datagrams: {
timestamp,
expiredOutgoing,
droppedIncoming: BigInt(0),
droppedIncoming: 0,
lostOutgoing
}
})
Expand All @@ -354,18 +354,14 @@ export class HttpWTSession {
/**
* @param {DatagramStatsEvent} evt
*/
onDatagramStats({
timestamp,
expiredOutgoing = BigInt(0),
lostOutgoing = BigInt(0)
}) {
onDatagramStats({ timestamp, expiredOutgoing = 0, lostOutgoing = 0 }) {
const res = this.resolveDatagramStats.pop()
this.rejectDatagramStats.pop()
if (res)
res({
timestamp,
expiredOutgoing,
droppedIncoming: BigInt(0),
droppedIncoming: 0,
lostOutgoing
})
}
Expand Down Expand Up @@ -447,7 +443,7 @@ export class HttpWTSession {
})
const notblocked = this.objint.orderBidiStream({
sendGroup: opts?.sendGroup || null, // maybe replace, when implemented
sendOrder: BigInt(opts?.sendOrder || 0n),
sendOrder: opts?.sendOrder || 0,
waitUntilAvailable: opts?.waitUntilAvailable || false
})
if (!notblocked) {
Expand Down Expand Up @@ -479,7 +475,7 @@ export class HttpWTSession {
})
const notblocked = this.objint.orderUnidiStream({
sendGroup: opts?.sendGroup || null, // maybe replace, when implemented
sendOrder: opts?.sendOrder || 0n,
sendOrder: opts?.sendOrder || 0,
waitUntilAvailable: opts?.waitUntilAvailable || false
})
if (!notblocked) {
Expand All @@ -503,7 +499,7 @@ export class HttpWTSession {
if (this.objint) {
this.objint.close({
code: closeInfo?.closeCode ?? 0,
reason: closeInfo?.reason.substring(0, 1023) ?? ''
reason: (closeInfo?.reason ?? '').substring(0, 1023)
})
}
}
Expand All @@ -516,14 +512,13 @@ export class HttpWTSession {
throw new Error('InvalidState')
const _sendGroupId = this._sendGroupNum++
const sendGroup = {
// @ts-ignore
_sendGroupId,
getStats: async () => {
// TODO implement
return {
bytesWritten: 0n,
bytesSent: 0n,
bytesAcknowledged: 0n
bytesWritten: 0,
bytesSent: 0,
bytesAcknowledged: 0
}
}
}
Expand Down Expand Up @@ -630,6 +625,12 @@ export class HttpWTSession {
* @param {NewStreamEvent} args
*/
onStream(args) {
if (this.state === 'closed' || this.state === 'failed') {
log('received stream after session closing')
args.stream.stopSending(0)
args.stream.resetStream(0)
return
}
const strobj = new HttpWTStream({
object: args.stream,
parentobj: this,
Expand Down
Loading