diff --git a/api-docs/docs/classes/SchemaRegistry.md b/api-docs/docs/classes/SchemaRegistry.md index a13f255..00060a8 100644 --- a/api-docs/docs/classes/SchemaRegistry.md +++ b/api-docs/docs/classes/SchemaRegistry.md @@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form. ▸ **getSchema**(`schema`): [`Schema`](../interfaces/Schema.md) -**`method`** -Get a schema from Schema Registry by version and subject. +**`method`** Get a schema from Schema Registry +* if only `schema.subject` is set: returns the latest schema for the given subject +* if `schema.subject` and `schema.schema` is set: returns the schema for the given schema string +* if `schema.subject` and `schema.version` is set: returns the schema for the given version #### Parameters diff --git a/api-docs/index.d.ts b/api-docs/index.d.ts index 5b69d44..7d79727 100644 --- a/api-docs/index.d.ts +++ b/api-docs/index.d.ts @@ -488,7 +488,8 @@ export class SchemaRegistry { constructor(schemaRegistryConfig: SchemaRegistryConfig); /** * @method - * Get a schema from Schema Registry by version and subject. + * Get latest schema from Schema Registry by subject. + * Alternatively a specific schema version can be fetched by either specifing schema.version of schema.schema * @param {Schema} schema - Schema configuration. * @returns {Schema} - Schema. */ diff --git a/schema_registry.go b/schema_registry.go index 379470d..165ad0f 100644 --- a/schema_registry.go +++ b/schema_registry.go @@ -274,12 +274,16 @@ func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.Sch return srClient } -// getSchema returns the schema for the given subject and schema ID and version. +// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given) func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema { // If EnableCache is set, check if the schema is in the cache. if schema.EnableCaching { - if schema, ok := k.schemaCache[schema.Subject]; ok { - return schema + if cachedSchema, ok := k.schemaCache[schema.Subject]; ok { + // the cache should contain the latest version of a schema for the given subject + // we must not return the cached schema if it does not match the requested version or schema string + if (schema.Version == 0 && schema.Schema != "") || schema.Version == cachedSchema.Version || schema.Schema == cachedSchema.Schema { + return cachedSchema; + } } } @@ -287,12 +291,20 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) // The client always caches the schema. var schemaInfo *srclient.Schema var err error - // Default version of the schema is the latest version. - if schema.Version == 0 { + var isLatestSchema = false; + if schema.Schema != "" { // fetch schema for given schema string + var schemaType srclient.SchemaType + if schema.SchemaType != nil { + schemaType = *schema.SchemaType + } else { + schemaType = srclient.Avro + } + schemaInfo, err = client.LookupSchema(schema.Subject, schema.Schema, schemaType, schema.References...) + } else if schema.Version == 0 { // fetch schema by version schemaInfo, err = client.GetLatestSchema(schema.Subject) - } else { - schemaInfo, err = client.GetSchemaByVersion( - schema.Subject, schema.Version) + } else { // fetch latest schema for given subject + schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version) + isLatestSchema = true; } if err == nil { @@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) Subject: schema.Subject, } // If the Cache is set, cache the schema. - if wrappedSchema.EnableCaching { + if wrappedSchema.EnableCaching && isLatestSchema { k.schemaCache[wrappedSchema.Subject] = wrappedSchema } return wrappedSchema