From bb1286da4808ad2c6f63d80913cf5afc96b741e9 Mon Sep 17 00:00:00 2001 From: Yubao Liu Date: Sat, 7 Mar 2020 19:53:36 +0800 Subject: [PATCH] Support ForSchemaOption to call avro.Type.forSchema() (#47) So that we can use @ovotech/avro-logical-types with @kafkajs/confluent-schema-registry. --- Dockerfile | 0 src/SchemaRegistry.ts | 6 +++--- src/api/index.ts | 5 +++++ src/cache.ts | 8 +++++--- 4 files changed, 13 insertions(+), 6 deletions(-) mode change 100755 => 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile old mode 100755 new mode 100644 diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 9ac9858..749eecc 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -1,7 +1,7 @@ import { encode, MAGIC_BYTE } from './encoder' import decode from './decoder' import { COMPATIBILITY, DEFAULT_SEPERATOR } from './constants' -import API, { SchemaRegistryAPIClientArgs, SchemaRegistryAPIClient } from './api' +import API, { SchemaRegistryAPIClientArgs, SchemaRegistryAPIClientOptions, SchemaRegistryAPIClient } from './api' import Cache from './cache' import { ConfluentSchemaRegistryArgumentError, @@ -28,9 +28,9 @@ export default class SchemaRegistry { private api: SchemaRegistryAPIClient public cache: Cache - constructor({ auth, clientId, host, retry }: SchemaRegistryAPIClientArgs) { + constructor({ auth, clientId, host, retry }: SchemaRegistryAPIClientArgs, options?: SchemaRegistryAPIClientOptions) { this.api = API({ auth, clientId, host, retry }) - this.cache = new Cache() + this.cache = new Cache(options?.forSchemaOptions) } public async register(schema: Schema, userOpts?: Opts): Promise { diff --git a/src/api/index.ts b/src/api/index.ts index f5f04fe..a6e0b06 100644 --- a/src/api/index.ts +++ b/src/api/index.ts @@ -1,6 +1,7 @@ import forge, { Client, Authorization } from 'mappersmith' import RetryMiddleware, { RetryMiddlewareOptions } from 'mappersmith/middleware/retry/v2' import BasicAuthMiddleware from 'mappersmith/middleware/basic-auth' +import { ForSchemaOptions } from 'avsc' import { DEFAULT_API_CLIENT_ID } from '../constants' import errorMiddleware from './middleware/errorMiddleware' @@ -21,6 +22,10 @@ export interface SchemaRegistryAPIClientArgs { retry?: Partial } +export interface SchemaRegistryAPIClientOptions { + forSchemaOptions?: Partial +} + // TODO: Improve typings export type SchemaRegistryAPIClient = Client<{ Schema: { diff --git a/src/cache.ts b/src/cache.ts index 70f2e5b..8371075 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -1,14 +1,16 @@ -import avro from 'avsc' +import avro, { ForSchemaOptions } from 'avsc' import { Schema } from './@types' export default class Cache { registryIdBySubject: { [key: string]: number } schemasByRegistryId: { [key: string]: Schema } + forSchemaOptions?: Partial - constructor() { + constructor(forSchemaOptions?: Partial) { this.registryIdBySubject = {} this.schemasByRegistryId = {} + this.forSchemaOptions = forSchemaOptions } getLatestRegistryId = (subject: string): number | undefined => this.registryIdBySubject[subject] @@ -23,7 +25,7 @@ export default class Cache { setSchema = (registryId: number, schema: Schema) => { // @ts-ignore TODO: Fix typings for Schema... - this.schemasByRegistryId[registryId] = avro.Type.forSchema(schema) + this.schemasByRegistryId[registryId] = avro.Type.forSchema(schema, this.forSchemaOptions) return this.schemasByRegistryId[registryId] }