From 030bd44496033b9ce5f0abaf56daea3686330b48 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 6 Feb 2024 16:09:17 -0800 Subject: [PATCH 1/5] feat: Implement handling for gzip compressed responses. --- .../server-node/src/platform/NodeResponse.ts | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/packages/sdk/server-node/src/platform/NodeResponse.ts b/packages/sdk/server-node/src/platform/NodeResponse.ts index 7cbc0e573c..d331ea9af3 100644 --- a/packages/sdk/server-node/src/platform/NodeResponse.ts +++ b/packages/sdk/server-node/src/platform/NodeResponse.ts @@ -1,13 +1,41 @@ import * as http from 'http'; +import * as zlib from 'zlib'; +import { pipeline, Writable } from 'stream'; import { platform } from '@launchdarkly/js-server-sdk-common'; import HeaderWrapper from './HeaderWrapper'; +/** + * Memory stream for used to allow pipelining with decompression. + * + * This implementation is not general purpose. + */ +class MemoryStream extends Writable { + private chunks: any[] = []; + + override _write(chunk: any, _encoding: BufferEncoding, _onError: (error?: Error | null) => void) { + this.chunks.push(chunk); + } + + override toString() { + return Buffer.concat(this.chunks).toString() + } + + override end(cb?: (() => void) | undefined): this; + override end(chunk: any, cb?: (() => void) | undefined): this; + override end(chunk: any, encoding: BufferEncoding, cb?: (() => void) | undefined): this; + override end(chunk?: unknown, encoding?: unknown, cb?: unknown): this { + this._write(chunk, encoding as BufferEncoding, cb as (error?: Error | null) => void); + this.emit('finish'); + return this; + } +} + export default class NodeResponse implements platform.Response { incomingMessage: http.IncomingMessage; - body: any[] = []; + memoryStream: MemoryStream = new MemoryStream(); promise: Promise; @@ -22,18 +50,24 @@ export default class NodeResponse implements platform.Response { this.status = res.statusCode || 0; this.incomingMessage = res; - this.promise = new Promise((resolve, reject) => { - res.on('data', (chunk) => { - this.body.push(chunk); - }); - res.on('error', (err) => { - reject(err); - }); - res.on('end', () => { - resolve(Buffer.concat(this.body).toString()); - }); + this.promise = new Promise((resolve, reject) => { + // Called on error or completion of the pipeline. + const pipelineCallback = (err: any) => { + if(err) { + reject(err); + } + resolve(this.memoryStream.toString()); + }; + switch (res.headers['content-encoding']) { + case 'gzip': + pipeline(res, zlib.createGunzip(), this.memoryStream, pipelineCallback); + break; + default: + pipeline(res, this.memoryStream, pipelineCallback); + break; + } }); } From 39cce8d06638182b57443265dd1f0e4090b5c389 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 7 Feb 2024 10:08:26 -0800 Subject: [PATCH 2/5] Basic writable working. --- .../server-node/src/platform/NodeResponse.ts | 37 ++++--------------- 1 file changed, 8 insertions(+), 29 deletions(-) diff --git a/packages/sdk/server-node/src/platform/NodeResponse.ts b/packages/sdk/server-node/src/platform/NodeResponse.ts index d331ea9af3..7cbe148b81 100644 --- a/packages/sdk/server-node/src/platform/NodeResponse.ts +++ b/packages/sdk/server-node/src/platform/NodeResponse.ts @@ -1,41 +1,20 @@ import * as http from 'http'; import * as zlib from 'zlib'; -import { pipeline, Writable } from 'stream'; +import { pipeline, Readable, Writable } from 'stream'; import { platform } from '@launchdarkly/js-server-sdk-common'; import HeaderWrapper from './HeaderWrapper'; -/** - * Memory stream for used to allow pipelining with decompression. - * - * This implementation is not general purpose. - */ -class MemoryStream extends Writable { - private chunks: any[] = []; - - override _write(chunk: any, _encoding: BufferEncoding, _onError: (error?: Error | null) => void) { - this.chunks.push(chunk); - } - - override toString() { - return Buffer.concat(this.chunks).toString() - } - - override end(cb?: (() => void) | undefined): this; - override end(chunk: any, cb?: (() => void) | undefined): this; - override end(chunk: any, encoding: BufferEncoding, cb?: (() => void) | undefined): this; - override end(chunk?: unknown, encoding?: unknown, cb?: unknown): this { - this._write(chunk, encoding as BufferEncoding, cb as (error?: Error | null) => void); - this.emit('finish'); - return this; - } -} - export default class NodeResponse implements platform.Response { incomingMessage: http.IncomingMessage; - memoryStream: MemoryStream = new MemoryStream(); + chunks: any[] = []; + + memoryStream: Writable = new Writable({decodeStrings: false, write: (chunk, _enc, next) => { + this.chunks.push(chunk); + next(); + }}); promise: Promise; @@ -58,7 +37,7 @@ export default class NodeResponse implements platform.Response { if(err) { reject(err); } - resolve(this.memoryStream.toString()); + resolve(Buffer.concat(this.chunks).toString()); }; switch (res.headers['content-encoding']) { case 'gzip': From ba57207c6a94a59b65b10763990dae90579fabcd Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 7 Feb 2024 13:10:55 -0800 Subject: [PATCH 3/5] Cleanup, and enable capability. --- contract-tests/index.js | 1 + .../server-node/src/platform/NodeRequests.ts | 12 +++++++++- .../server-node/src/platform/NodeResponse.ts | 24 ++++++++++--------- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/contract-tests/index.js b/contract-tests/index.js index 06430d0a58..e214db2856 100644 --- a/contract-tests/index.js +++ b/contract-tests/index.js @@ -31,6 +31,7 @@ app.get('/', (req, res) => { 'migrations', 'event-sampling', 'strongly-typed', + 'polling-gzip' ], }); }); diff --git a/packages/sdk/server-node/src/platform/NodeRequests.ts b/packages/sdk/server-node/src/platform/NodeRequests.ts index 24f79b0cbf..3dee54c1d2 100644 --- a/packages/sdk/server-node/src/platform/NodeRequests.ts +++ b/packages/sdk/server-node/src/platform/NodeRequests.ts @@ -110,12 +110,22 @@ export default class NodeRequests implements platform.Requests { const isSecure = url.startsWith('https://'); const impl = isSecure ? https : http; + // For get requests we are going to automatically support compressed responses. + // Note this does not affect SSE as the event source is not using this fetch implementation. + const headers = + options.method?.toLowerCase() === 'get' + ? { + ...options.headers, + 'accept-encoding': 'gzip', + } + : options.headers; + return new Promise((resolve, reject) => { const req = impl.request( url, { timeout: options.timeout, - headers: options.headers, + headers, method: options.method, agent: this.agent, }, diff --git a/packages/sdk/server-node/src/platform/NodeResponse.ts b/packages/sdk/server-node/src/platform/NodeResponse.ts index 7cbe148b81..76cc0fc9a0 100644 --- a/packages/sdk/server-node/src/platform/NodeResponse.ts +++ b/packages/sdk/server-node/src/platform/NodeResponse.ts @@ -1,6 +1,7 @@ +import * as fs from 'fs'; import * as http from 'http'; +import { pipeline, Writable } from 'stream'; import * as zlib from 'zlib'; -import { pipeline, Readable, Writable } from 'stream'; import { platform } from '@launchdarkly/js-server-sdk-common'; @@ -11,10 +12,13 @@ export default class NodeResponse implements platform.Response { chunks: any[] = []; - memoryStream: Writable = new Writable({decodeStrings: false, write: (chunk, _enc, next) => { - this.chunks.push(chunk); - next(); - }}); + memoryStream: Writable = new Writable({ + decodeStrings: true, + write: (chunk, _enc, next) => { + this.chunks.push(chunk); + next(); + }, + }); promise: Promise; @@ -29,16 +33,14 @@ export default class NodeResponse implements platform.Response { this.status = res.statusCode || 0; this.incomingMessage = res; - - this.promise = new Promise((resolve, reject) => { // Called on error or completion of the pipeline. const pipelineCallback = (err: any) => { - if(err) { - reject(err); + if (err) { + return reject(err); } - resolve(Buffer.concat(this.chunks).toString()); - }; + return resolve(Buffer.concat(this.chunks).toString()); + }; switch (res.headers['content-encoding']) { case 'gzip': pipeline(res, zlib.createGunzip(), this.memoryStream, pipelineCallback); From 8461ad4ff89fb31b735e4a0f8dd9c93e7260285f Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 7 Feb 2024 13:12:33 -0800 Subject: [PATCH 4/5] Remove unused import. --- packages/sdk/server-node/src/platform/NodeResponse.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sdk/server-node/src/platform/NodeResponse.ts b/packages/sdk/server-node/src/platform/NodeResponse.ts index 76cc0fc9a0..b039052808 100644 --- a/packages/sdk/server-node/src/platform/NodeResponse.ts +++ b/packages/sdk/server-node/src/platform/NodeResponse.ts @@ -1,4 +1,3 @@ -import * as fs from 'fs'; import * as http from 'http'; import { pipeline, Writable } from 'stream'; import * as zlib from 'zlib'; From b524e195a05d0cfbb31babacfa701db23c7143a0 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 8 Feb 2024 14:52:25 -0800 Subject: [PATCH 5/5] add unit tests --- .../__tests__/platform/NodeRequests.test.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/sdk/server-node/__tests__/platform/NodeRequests.test.ts b/packages/sdk/server-node/__tests__/platform/NodeRequests.test.ts index fda2c3c669..ab4b7168f8 100644 --- a/packages/sdk/server-node/__tests__/platform/NodeRequests.test.ts +++ b/packages/sdk/server-node/__tests__/platform/NodeRequests.test.ts @@ -1,4 +1,5 @@ import * as http from 'http'; +import * as zlib from 'zlib'; import NodeRequests from '../../src/platform/NodeRequests'; @@ -57,6 +58,9 @@ describe('given a default instance of NodeRequests', () => { res.destroy(); resetResolve(); }, 0); + } else if ((req.url?.indexOf('gzip') || -1) >= 0) { + res.setHeader('Content-Encoding', 'gzip'); + res.end(zlib.gzipSync(Buffer.from(JSON_RESPONSE, 'utf8'))); } else { res.end(TEXT_RESPONSE); } @@ -144,4 +148,21 @@ describe('given a default instance of NodeRequests', () => { await res.json(); }).rejects.toThrow(); }); + + it('includes accept-encoding header', async () => { + await requests.fetch(`http://localhost:${PORT}/gzip`, { method: 'GET' }); + const serverResult = await promise; + expect(serverResult.method).toEqual('GET'); + expect(serverResult.headers['accept-encoding']).toEqual('gzip'); + }); + + it('can get compressed json from a response', async () => { + const res = await requests.fetch(`http://localhost:${PORT}/gzip`, { method: 'GET' }); + expect(res.headers.get('content-type')).toEqual('text/plain'); + const json = await res.json(); + expect(json).toEqual({ text: 'value' }); + const serverResult = await promise; + expect(serverResult.method).toEqual('GET'); + expect(serverResult.body).toEqual(''); + }); });