Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compatibility configuration #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Available Commands:
exists checks if the schema provided through stdin exists for the subject
get retrieves a schema specified by id or subject
get-config retrieves global or suject specific configuration
set-config set global or subject specific configuration
subjects lists all registered subjects
versions lists all available versions

Expand Down
82 changes: 71 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *Client) Versions(subject string) (versions []int, err error) {
return
}

// DeleteSubject deletes the specified subject and its associated compatibility level if registered.
// DeleteSubject deletes the specified subject and its associated compatibility type if registered.
// It is recommended to use this API only when a topic needs to be recycled or in development environment.
// Returns the versions of the schema deleted under this subject.
func (c *Client) DeleteSubject(subject string) (versions []int, err error) {
Expand Down Expand Up @@ -452,6 +452,18 @@ func (c *Client) IsRegistered(subject, schema string) (bool, Schema, error) {
return true, fs, nil
}

// see https://docs.confluent.io/current/schema-registry/develop/api.html#compatibility
// for a list of valid compatibility types
const (
BACKWARD CompatibilityType = "BACKWARD"
BACKWARD_TRANSITIVE CompatibilityType = "BACKWARD_TRANSITIVE"
FORWARD CompatibilityType = "FORWARD"
FORWARD_TRANSITIVE CompatibilityType = "FORWARD_TRANSITIVE"
FULL CompatibilityType = "FULL"
FULL_TRANSITIVE CompatibilityType = "FULL_TRANSITIVE"
NONE CompatibilityType = "NONE"
)

type (
schemaOnlyJSON struct {
Schema string `json:"schema"`
Expand All @@ -476,13 +488,32 @@ type (
ID int `json:"id,omitempty"`
}

// Config describes a subject or globa schema-registry configuration
Config struct {
// CompatibilityLevel mode of subject or global
CompatibilityLevel string `json:"compatibilityLevel"`
// CompatibilityType defines the allowed actions for changing a schema
CompatibilityType string

// ConfigGet is the result structure for getting the config
// (of a subject or global schema-registry configuration)
ConfigGet struct {
// CompatibilityType mode of subject or global
CompatibilityType CompatibilityType `json:"compatibilityLevel"`
}

// ConfigPut is the parameter structure for setting the config
// (of a subject or global schema-registry configuration)
ConfigPut struct {
// CompatibilityType mode of subject or global
CompatibilityType CompatibilityType `json:"compatibility"`
}
)

func (compatibilityType CompatibilityType) IsValid() error {
switch compatibilityType {
case BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE:
return nil
}
return fmt.Errorf("compatibility type '%s' is invalid", compatibilityType)
}

// RegisterNewSchema registers a schema.
// The returned identifier should be used to retrieve
// this schema from the schemas resource and is different from
Expand Down Expand Up @@ -617,14 +648,14 @@ func (c *Client) GetLatestSchema(subject string) (Schema, error) {

// getConfigSubject returns the Config of global or for a given subject. It handles 404 error in a
// different way, since not-found for a subject configuration means it's using global.
func (c *Client) getConfigSubject(subject string) (Config, error) {
func (c *Client) getConfigSubject(subject string) (ConfigGet, error) {
var err error
var config = Config{}
var config = ConfigGet{}

path := fmt.Sprintf("/config/%s", subject)
resp, respErr := c.do(http.MethodGet, path, "", nil)
if respErr != nil && respErr.(ResourceError).ErrorCode != 404 {
return config, respErr
if respErr != nil && subject != "" && respErr.(ResourceError).ErrorCode != 404 {
return c.getConfigSubject("")
}
if resp != nil {
err = c.readJSON(resp, &config)
Expand All @@ -633,12 +664,41 @@ func (c *Client) getConfigSubject(subject string) (Config, error) {
return config, err
}

// GetConfig returns the configuration (Config type) for global Schema-Registry or a specific
// setConfigSubject sets the Config of global or for a given subject.
func (c *Client) setConfigSubject(subject string, config ConfigPut) error {
var err error = nil

if err = config.CompatibilityType.IsValid(); err != nil {
return err
}

send, err := json.Marshal(config)
if err != nil {
return err
}

path := fmt.Sprintf("/config/%s", subject)
_, respErr := c.do(http.MethodPut, path, contentTypeSchemaJSON, send)
if respErr != nil {
return respErr
}

return err
}

// GetConfig returns the configuration (ConfigGet type) for global Schema-Registry or a specific
// subject. When Config returned has "compatibilityLevel" empty, it's using global settings.
func (c *Client) GetConfig(subject string) (Config, error) {
func (c *Client) GetConfig(subject string) (ConfigGet, error) {
return c.getConfigSubject(subject)
}

// SetConfig set the configuration (ConfigPut type) for global Schema-Registry or a specific
// subject.
// https://docs.confluent.io/current/schema-registry/develop/api.html#put--config-(string-%20subject)
func (c *Client) SetConfig(subject string, config ConfigPut) error {
return c.setConfigSubject(subject, config)
}

// subject (string) – Name of the subject
// version (versionId [string "latest" or 1,2^31-1]) – Version of the schema to be returned.
// Valid values for versionId are between [1,2^31-1] or the string “latest”.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/coursehero/schema-registry
module github.com/WildBeavers/schema-registry

go 1.12

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nicksnyder/go-i18n v2.0.3+incompatible h1:XCCaWsCoy4KlWkhOr+63dkv6oJmitJ573uJqDBAiFiQ=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
4 changes: 2 additions & 2 deletions schema-registry-cli/cmd/compatible.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
var compatibleCmd = &cobra.Command{
Use: "compatible <subject> [version]",
Short: "tests compatibility between a schema from stdin and a given subject",
Long: `The compatibility level of the subject is used for this check.
If it has never been changed, the global compatibility level applies.
Long: `The compatibility type of the subject is used for this check.
If it has never been changed, the global compatibility type applies.
If no schema version is specified, the latest version is tested.
`,
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down
15 changes: 9 additions & 6 deletions schema-registry-cli/cmd/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (

var getConfigCmd = &cobra.Command{
Use: "get-config [subject]",
Short: "retrieves global or suject specific configuration",
Long: `Configuration can be requested for all or a specific subject. When "compatibility-level"
is not defined for a specific subject, then it's using global compatibility level. To check global
setting just call "get-config" without arguments.
Compatibility levels in Schema-Registry may be: "NONE", "BACKWARD", "FORWARD" and "FULL". Please
consider official documentation for more details.
Short: "retrieves global or subject specific configuration",
Long: `Configuration can be requested for all or a specific subject.
When "compatibility-type" is not defined for a specific subject,
then it's using global compatibility type. To check global setting
just call "get-config" without arguments.
Compatibility types in Schema-Registry may be: "NONE", "BACKWARD",
"BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE", "FULL" and
"FULL_TRANSITIVE".
Please consider official documentation for more details.
`,
RunE: func(cmd *cobra.Command, args []string) error {
switch {
Expand Down
10 changes: 5 additions & 5 deletions schema-registry-cli/cmd/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/hokaccha/go-prettyjson"
"github.com/spf13/viper"

schemaregistry "github.com/coursehero/schema-registry"
schemaregistry "github.com/WildBeavers/schema-registry"
)

func stdinToString() string {
Expand Down Expand Up @@ -64,14 +64,14 @@ func getBySubjectVersion(subj string, ver int) error {
return nil
}

func printConfig(cfg schemaregistry.Config, subj string) {
func printConfig(cfg schemaregistry.ConfigGet, subj string) {
if subj == "" {
subj = "global"
}
if cfg.CompatibilityLevel == "" {
cfg.CompatibilityLevel = "not defined, using global"
if cfg.CompatibilityType == "" {
cfg.CompatibilityType = "not defined, using global"
}
fmt.Printf("%s compatibility-level: %s\n", subj, cfg.CompatibilityLevel)
fmt.Printf("%s compatibility type: %s\n", subj, cfg.CompatibilityType)
}

func getConfig(subj string) error {
Expand Down
14 changes: 7 additions & 7 deletions schema-registry-cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

schemaregistry "github.com/coursehero/schema-registry"
schemaregistry "github.com/WildBeavers/schema-registry"
)

var (
cfgFile string
registryURL string
basicAuthUser string
basicAuthPass string
verbose bool
nocolor bool
cfgFile string
registryURL string
basicAuthUser string
basicAuthPass string
verbose bool
nocolor bool
noConfirmation bool
)

Expand Down
56 changes: 56 additions & 0 deletions schema-registry-cli/cmd/set_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cmd

import (
"fmt"

schemaregistry "github.com/WildBeavers/schema-registry"
"github.com/spf13/cobra"
)

// constants for parameter names
const (
Compatibility = "compatibility-type"
)

var (
compatibility string
)

var setConfigCmd = &cobra.Command{
Use: "set-config [subject]",
Short: "set global or subject specific configuration",
Long: `Configuration currently contains only the compatibility type.
It can be set for all or a specific subject.
Compatibility types in Schema-Registry may be: "NONE", "BACKWARD",
"BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE", "FULL" and
"FULL_TRANSITIVE".
Please consider official documentation for more details.
`,
RunE: func(cmd *cobra.Command, args []string) error {
switch {
case len(args) > 1:
return fmt.Errorf("only one subject allowed")
case len(args) == 0:
if err := setConfig("", compatibility); err != nil {
return err
}
case len(args) == 1:
if err := setConfig(args[0], compatibility); err != nil {
return err
}
}

return nil
},
}

func init() {
RootCmd.AddCommand(setConfigCmd)
setConfigCmd.PersistentFlags().StringVar(&compatibility, Compatibility, "", "compatibility level to set")
setConfigCmd.MarkFlagRequired(Compatibility)
}

func setConfig(subj string, compatibilityType string) error {
client := assertClient()
return client.SetConfig(subj, schemaregistry.ConfigPut{CompatibilityType: schemaregistry.CompatibilityType(compatibilityType)})
}
2 changes: 1 addition & 1 deletion schema-registry-cli/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package main

import "github.com/coursehero/schema-registry/schema-registry-cli/cmd"
import "github.com/WildBeavers/schema-registry/schema-registry-cli/cmd"

func main() {
cmd.Execute()
Expand Down