Skip to content

Commit

Permalink
feat: Add support for Payload Filtering (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Aug 30, 2024
1 parent 30b822e commit 6f44383
Show file tree
Hide file tree
Showing 23 changed files with 272 additions and 31 deletions.
2 changes: 2 additions & 0 deletions contract-tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ app.get('/', (req, res) => {
'all-flags-with-reasons',
'tags',
'big-segments',
'filtering',
'filtering-strict',
'user-type',
'migrations',
'event-sampling',
Expand Down
6 changes: 6 additions & 0 deletions contract-tests/sdkClientEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ export function makeSdkConfig(options, tag) {
if (options.streaming) {
cf.streamUri = options.streaming.baseUri;
cf.streamInitialReconnectDelay = maybeTime(options.streaming.initialRetryDelayMs);
if (options.streaming.filter) {
cf.payloadFilterKey = options.streaming.filter;
}
}
if (options.polling) {
cf.stream = false;
cf.baseUri = options.polling.baseUri;
cf.pollInterface = options.polling.pollIntervalMs / 1000;
if (options.polling.filter) {
cf.payloadFilterKey = options.polling.filter;
}
}
if (options.events) {
cf.allAttributesPrivate = options.events.allAttributesPrivate;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import ServiceEndpoints from '../../src/options/ServiceEndpoints';
import ServiceEndpoints, {
getEventsUri,
getPollingUri,
getStreamingUri,
} from '../../src/options/ServiceEndpoints';

describe.each([
[
Expand Down Expand Up @@ -33,3 +37,26 @@ describe.each([
expect(endpoints.events).toEqual(expected.eventsUri);
});
});

it('applies payload filter to polling and streaming endpoints', () => {
const endpoints = new ServiceEndpoints(
'https://stream.launchdarkly.com',
'https://sdk.launchdarkly.com',
'https://events.launchdarkly.com',
'/bulk',
'/diagnostic',
true,
'filterKey',
);

expect(getStreamingUri(endpoints, '/all', [])).toEqual(
'https://stream.launchdarkly.com/all?filter=filterKey',
);
expect(getPollingUri(endpoints, '/sdk/latest-all', [])).toEqual(
'https://sdk.launchdarkly.com/sdk/latest-all?filter=filterKey',
);
expect(
getPollingUri(endpoints, '/sdk/latest-all', [{ key: 'withReasons', value: 'true' }]),
).toEqual('https://sdk.launchdarkly.com/sdk/latest-all?withReasons=true&filter=filterKey');
expect(getEventsUri(endpoints, '/bulk', [])).toEqual('https://events.launchdarkly.com/bulk');
});
17 changes: 8 additions & 9 deletions packages/shared/common/src/internal/events/EventSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
isHttpRecoverable,
LDUnexpectedResponseError,
} from '../../errors';
import { ClientContext } from '../../options';
import { ClientContext, getEventsUri } from '../../options';
import { defaultHeaders, httpErrorMessage, sleep } from '../../utils';

export default class EventSender implements LDEventSender {
Expand All @@ -26,19 +26,18 @@ export default class EventSender implements LDEventSender {
const { basicConfiguration, platform } = clientContext;
const {
sdkKey,
serviceEndpoints: {
events,
analyticsEventPath,
diagnosticEventPath,
includeAuthorizationHeader,
},
serviceEndpoints: { analyticsEventPath, diagnosticEventPath, includeAuthorizationHeader },
tags,
} = basicConfiguration;
const { crypto, info, requests } = platform;

this.defaultHeaders = defaultHeaders(sdkKey, info, tags, includeAuthorizationHeader);
this.eventsUri = `${events}${analyticsEventPath}`;
this.diagnosticEventsUri = `${events}${diagnosticEventPath}`;
this.eventsUri = getEventsUri(basicConfiguration.serviceEndpoints, analyticsEventPath, []);
this.diagnosticEventsUri = getEventsUri(
basicConfiguration.serviceEndpoints,
diagnosticEventPath,
[],
);
this.requests = requests;
this.crypto = crypto;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ describe('given a stream processor with mock event source', () => {
platform: basicPlatform,
},
'/all',
[],
listeners,
diagnosticsManager,
mockErrorHandler,
Expand Down Expand Up @@ -142,6 +143,7 @@ describe('given a stream processor with mock event source', () => {
platform: basicPlatform,
},
'/all',
[],
listeners,
diagnosticsManager,
mockErrorHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { LDStreamProcessor } from '../../api/subsystem';
import { LDStreamingError } from '../../errors';
import { ClientContext } from '../../options';
import { getStreamingUri } from '../../options/ServiceEndpoints';
import { defaultHeaders, httpErrorMessage, shouldRetry } from '../../utils';
import { DiagnosticsManager } from '../diagnostics';
import { StreamingErrorHandler } from './types';
Expand Down Expand Up @@ -37,6 +38,7 @@ class StreamingProcessor implements LDStreamProcessor {
sdkKey: string,
clientContext: ClientContext,
streamUriPath: string,
parameters: { key: string; value: string }[],
private readonly listeners: Map<EventName, ProcessStreamResponse>,
private readonly diagnosticsManager?: DiagnosticsManager,
private readonly errorHandler?: StreamingErrorHandler,
Expand All @@ -49,7 +51,11 @@ class StreamingProcessor implements LDStreamProcessor {
this.headers = defaultHeaders(sdkKey, info, tags);
this.logger = logger;
this.requests = requests;
this.streamUri = `${basicConfiguration.serviceEndpoints.streaming}${streamUriPath}`;
this.streamUri = getStreamingUri(
basicConfiguration.serviceEndpoints,
streamUriPath,
parameters,
);
}

private logConnectionStarted() {
Expand Down
77 changes: 77 additions & 0 deletions packages/shared/common/src/options/ServiceEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ function canonicalizeUri(uri: string): string {
return uri.replace(/\/+$/, '');
}

function canonicalizePath(path: string): string {
return path.replace(/^\/+/, '').replace(/\?$/, '');
}

/**
* Specifies the base service URIs used by SDK components.
*/
Expand All @@ -11,6 +15,7 @@ export default class ServiceEndpoints {
public readonly streaming: string;
public readonly polling: string;
public readonly events: string;
public readonly payloadFilterKey?: string;

/** Valid paths are:
* /bulk
Expand All @@ -36,12 +41,84 @@ export default class ServiceEndpoints {
analyticsEventPath: string = '/bulk',
diagnosticEventPath: string = '/diagnostic',
includeAuthorizationHeader: boolean = true,
payloadFilterKey?: string,
) {
this.streaming = canonicalizeUri(streaming);
this.polling = canonicalizeUri(polling);
this.events = canonicalizeUri(events);
this.analyticsEventPath = analyticsEventPath;
this.diagnosticEventPath = diagnosticEventPath;
this.includeAuthorizationHeader = includeAuthorizationHeader;
this.payloadFilterKey = payloadFilterKey;
}
}

function getWithParams(uri: string, parameters: { key: string; value: string }[]) {
if (parameters.length === 0) {
return uri;
}

const parts = parameters.map(({ key, value }) => `${key}=${value}`);
return `${uri}?${parts.join('&')}`;
}

/**
* Get the URI for the streaming endpoint.
*
* @param endpoints The service endpoints.
* @param path The path to the resource, devoid of any query parameters or hrefs.
* @param parameters The query parameters. These query parameters must already have the appropriate encoding applied. This function WILL NOT apply it for you.
*/
export function getStreamingUri(
endpoints: ServiceEndpoints,
path: string,
parameters: { key: string; value: string }[],
): string {
const canonicalizedPath = canonicalizePath(path);

const combinedParameters = [...parameters];
if (endpoints.payloadFilterKey) {
combinedParameters.push({ key: 'filter', value: endpoints.payloadFilterKey });
}

return getWithParams(`${endpoints.streaming}/${canonicalizedPath}`, combinedParameters);
}

/**
* Get the URI for the polling endpoint.
*
* @param endpoints The service endpoints.
* @param path The path to the resource, devoid of any query parameters or hrefs.
* @param parameters The query parameters. These query parameters must already have the appropriate encoding applied. This function WILL NOT apply it for you.
*/
export function getPollingUri(
endpoints: ServiceEndpoints,
path: string,
parameters: { key: string; value: string }[],
): string {
const canonicalizedPath = canonicalizePath(path);

const combinedParameters = [...parameters];
if (endpoints.payloadFilterKey) {
combinedParameters.push({ key: 'filter', value: endpoints.payloadFilterKey });
}

return getWithParams(`${endpoints.polling}/${canonicalizedPath}`, combinedParameters);
}

/**
* Get the URI for the events endpoint.
*
* @param endpoints The service endpoints.
* @param path The path to the resource, devoid of any query parameters or hrefs.
* @param parameters The query parameters. These query parameters must already have the appropriate encoding applied. This function WILL NOT apply it for you.
*/
export function getEventsUri(
endpoints: ServiceEndpoints,
path: string,
parameters: { key: string; value: string }[],
): string {
const canonicalizedPath = canonicalizePath(path);

return getWithParams(`${endpoints.events}/${canonicalizedPath}`, parameters);
}
12 changes: 10 additions & 2 deletions packages/shared/common/src/options/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import ApplicationTags from './ApplicationTags';
import ClientContext from './ClientContext';
import OptionMessages from './OptionMessages';
import ServiceEndpoints from './ServiceEndpoints';
import ServiceEndpoints, { getEventsUri, getPollingUri, getStreamingUri } from './ServiceEndpoints';

export { ApplicationTags, OptionMessages, ServiceEndpoints, ClientContext };
export {
ApplicationTags,
OptionMessages,
ServiceEndpoints,
ClientContext,
getStreamingUri,
getPollingUri,
getEventsUri,
};
2 changes: 1 addition & 1 deletion packages/shared/common/src/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class StringMatchingRegex extends Type<string> {
}

override is(u: unknown): u is string {
return !!(u as string).match(this.expression);
return typeof u === 'string' && !!(u as string).match(this.expression);
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/shared/mocks/src/streamingProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const setupMockStreamingProcessor = (
sdkKey: string,
clientContext: ClientContext,
streamUriPath: string,
parameters: { key: string; value: string }[],
listeners: Map<EventName, ProcessStreamResponse>,
diagnosticsManager: internal.DiagnosticsManager,
errorHandler: internal.StreamingErrorHandler,
Expand Down
4 changes: 3 additions & 1 deletion packages/shared/sdk-client/src/LDClientImpl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ describe('sdk-client object', () => {
expect.anything(),
'/stream/path',
expect.anything(),
expect.anything(),
undefined,
expect.anything(),
);
Expand All @@ -130,7 +131,8 @@ describe('sdk-client object', () => {
expect(MockStreamingProcessor).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
'/stream/path?withReasons=true',
'/stream/path',
[{ key: 'withReasons', value: 'true' }],
expect.anything(),
undefined,
expect.anything(),
Expand Down
15 changes: 9 additions & 6 deletions packages/shared/sdk-client/src/LDClientImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,17 @@ export default class LDClientImpl implements LDClient {
identifyResolve: any,
identifyReject: any,
) {
let pollingPath = this.createPollUriPath(context);
const parameters: { key: string; value: string }[] = [];
if (this.config.withReasons) {
pollingPath = `${pollingPath}?withReasons=true`;
parameters.push({ key: 'withReasons', value: 'true' });
}

this.updateProcessor = new PollingProcessor(
this.sdkKey,
this.clientContext.platform.requests,
this.clientContext.platform.info,
pollingPath,
this.createPollUriPath(context),
parameters,
this.config,
async (flags) => {
this.logger.debug(`Handling polling result: ${Object.keys(flags)}`);
Expand Down Expand Up @@ -438,15 +440,16 @@ export default class LDClientImpl implements LDClient {
identifyResolve: any,
identifyReject: any,
) {
let streamingPath = this.createStreamUriPath(context);
const parameters: { key: string; value: string }[] = [];
if (this.config.withReasons) {
streamingPath = `${streamingPath}?withReasons=true`;
parameters.push({ key: 'withReasons', value: 'true' });
}

this.updateProcessor = new internal.StreamingProcessor(
this.sdkKey,
this.clientContext,
streamingPath,
this.createStreamUriPath(context),
parameters,
this.createStreamListeners(checkedContext, identifyResolve),
this.diagnosticsManager,
(e) => {
Expand Down
16 changes: 16 additions & 0 deletions packages/shared/sdk-client/src/api/LDOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,20 @@ export interface LDOptions {
* If `wrapperName` is unset, this field will be ignored.
*/
wrapperVersion?: string;

/**
* LaunchDarkly Server SDKs historically downloaded all flag configuration and segments for a particular environment
* during initialization.
*
* For some customers, this is an unacceptably large amount of data, and has contributed to performance issues
* within their products.
*
* Filtered environments aim to solve this problem. By allowing customers to specify subsets of an environment's
* flags using a filter key, SDKs will initialize faster and use less memory.
*
* This payload filter key only applies to the default streaming and polling data sources. It will not affect
* TestData or FileData data sources, nor will it be applied to any data source provided through the featureStore
* config property.
*/
payloadFilterKey?: string;
}
Loading

0 comments on commit 6f44383

Please sign in to comment.