Skip to content
This repository has been archived by the owner on Sep 20, 2023. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
George Shaikovski committed Jul 23, 2020
1 parent cfd7989 commit ac608e5
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
8 changes: 4 additions & 4 deletions confluent_kafka/avro/cached_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def register(self, subject, avro_schema):
url = '/'.join([self.url, 'subjects', subject, 'versions'])
# body is { schema : json_string }

body = {'schema': json.dumps(avro_schema.to_json())}
body = {'schema': json.dumps(str(avro_schema))}
result, code = self._send_request(url, method='POST', body=body)
if (code == 401 or code == 403):
raise ClientError("Unauthorized access. Error code:" + str(code))
Expand Down Expand Up @@ -253,7 +253,7 @@ def check_registration(self, subject, avro_schema):
url = '/'.join([self.url, 'subjects', subject])
# body is { schema : json_string }

body = {'schema': json.dumps(avro_schema.to_json())}
body = {'schema': json.dumps(str(avro_schema))}
result, code = self._send_request(url, method='POST', body=body)
if code == 401 or code == 403:
raise ClientError("Unauthorized access. Error code:" + str(code))
Expand Down Expand Up @@ -374,7 +374,7 @@ def get_version(self, subject, avro_schema):
return version

url = '/'.join([self.url, 'subjects', subject])
body = {'schema': json.dumps(avro_schema.to_json())}
body = {'schema': json.dumps(str(avro_schema))}

result, code = self._send_request(url, method='POST', body=body)
if code == 404:
Expand Down Expand Up @@ -402,7 +402,7 @@ def test_compatibility(self, subject, avro_schema, version='latest'):
"""
url = '/'.join([self.url, 'compatibility', 'subjects', subject,
'versions', str(version)])
body = {'schema': json.dumps(avro_schema.to_json())}
body = {'schema': json.dumps(str(avro_schema))}
try:
result, code = self._send_request(url, method='POST', body=body)
if code == 404:
Expand Down
6 changes: 3 additions & 3 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
# Encoder support
def _get_encoder_func(self, writer_schema):
if HAS_FAST:
schema = writer_schema.to_json()
schema = str(writer_schema)
parsed_schema = parse_schema(schema)
return lambda record, fp: schemaless_writer(fp, parsed_schema, record)
writer = avro.io.DatumWriter(writer_schema)
Expand Down Expand Up @@ -174,8 +174,8 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
if HAS_FAST:
# try to use fast avro
try:
fast_avro_writer_schema = parse_schema(writer_schema_obj.to_json())
fast_avro_reader_schema = parse_schema(reader_schema_obj.to_json())
fast_avro_writer_schema = parse_schema(str(writer_schema_obj))
fast_avro_reader_schema = parse_schema(str(reader_schema_obj))
schemaless_reader(payload, fast_avro_writer_schema)

# If we reach this point, this means we have fastavro and it can
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_install_requirements(path):
setup(name='confluent-kafka',
# Make sure to bump CFL_VERSION* in confluent_kafka/src/confluent_kafka.h
# and version and release in docs/conf.py.
version='1.4.2',
version='1.4.2-kaluza',
description='Confluent\'s Python client for Apache Kafka',
author='Confluent Inc',
author_email='[email protected]',
Expand Down

0 comments on commit ac608e5

Please sign in to comment.