Skip to content

Commit

Permalink
feat: fetch schema from schema-regsitry by schema string
Browse files Browse the repository at this point in the history
* enable users to lookup the schema id for a given schema string
  • Loading branch information
rroesch1 committed Oct 9, 2024
1 parent ea13e01 commit 8ffee37
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
6 changes: 4 additions & 2 deletions api-docs/docs/classes/SchemaRegistry.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form.

**getSchema**(`schema`): [`Schema`](../interfaces/Schema.md)

**`method`**
Get a schema from Schema Registry by version and subject.
**`method`** Get a schema from Schema Registry
* if only `schema.subject` is set: returns the latest schema for the given subject
* if `schema.subject` and `schema.schema` is set: returns the schema for the given schema string
* if `schema.subject` and `schema.version` is set: returns the schema for the given version

#### Parameters

Expand Down
3 changes: 2 additions & 1 deletion api-docs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ export class SchemaRegistry {
constructor(schemaRegistryConfig: SchemaRegistryConfig);
/**
* @method
* Get a schema from Schema Registry by version and subject.
* Get latest schema from Schema Registry by subject.
* Alternatively a specific schema version can be fetched by either specifing schema.version of schema.schema
* @param {Schema} schema - Schema configuration.
* @returns {Schema} - Schema.
*/
Expand Down
30 changes: 21 additions & 9 deletions schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,25 +274,37 @@ func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.Sch
return srClient
}

// getSchema returns the schema for the given subject and schema ID and version.
// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given)
func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
// If EnableCache is set, check if the schema is in the cache.
if schema.EnableCaching {
if schema, ok := k.schemaCache[schema.Subject]; ok {
return schema
if cachedSchema, ok := k.schemaCache[schema.Subject]; ok {
// the cache should contain the latest version of a schema for the given subject
// we must not return the cached schema if it does not match the requested version or schema string
if (schema.Version == 0 && schema.Schema != "") || schema.Version == cachedSchema.Version || schema.Schema == cachedSchema.Schema {
return cachedSchema;
}
}
}

runtime := k.vu.Runtime()
// The client always caches the schema.
var schemaInfo *srclient.Schema
var err error
// Default version of the schema is the latest version.
if schema.Version == 0 {
var isLatestSchema = false;
if schema.Schema != "" { // fetch schema for given schema string
var schemaType srclient.SchemaType
if schema.SchemaType != nil {
schemaType = *schema.SchemaType
} else {
schemaType = srclient.Avro
}
schemaInfo, err = client.LookupSchema(schema.Subject, schema.Schema, schemaType, schema.References...)
} else if schema.Version == 0 { // fetch schema by version
schemaInfo, err = client.GetLatestSchema(schema.Subject)
} else {
schemaInfo, err = client.GetSchemaByVersion(
schema.Subject, schema.Version)
} else { // fetch latest schema for given subject
schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version)
isLatestSchema = true;
}

if err == nil {
Expand All @@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema)
Subject: schema.Subject,
}
// If the Cache is set, cache the schema.
if wrappedSchema.EnableCaching {
if wrappedSchema.EnableCaching && isLatestSchema {
k.schemaCache[wrappedSchema.Subject] = wrappedSchema
}
return wrappedSchema
Expand Down

0 comments on commit 8ffee37

Please sign in to comment.