Skip to content

Commit

Permalink
Add EncodedContent
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jul 26, 2024
1 parent b731f12 commit b767d24
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/saturn_engine/core/encoded_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import typing as t

import functools
import json
import pickle
import zlib
from dataclasses import dataclass


class ContentType(t.NamedTuple):
mimetype: str
decoder: t.Callable[[bytes], object]
encoder: t.Callable[[object], bytes]


def json_dumps(obj: object) -> bytes:
return json.dumps(obj).encode()


def json_loads(data: bytes) -> object:
return json.loads(data.decode())


class ContentTypes:
JSON = ContentType(
mimetype="application/json",
encoder=json_dumps,
decoder=json_loads,
)

PYTHON_PICKLE = ContentType(
mimetype="application/python-pickle",
encoder=pickle.dumps,
decoder=pickle.loads,
)

all = [JSON, PYTHON_PICKLE]

by_mimetype = {t.mimetype: t for t in all}


class ContentEncoding(t.NamedTuple):
mimetype: str
decoder: t.Callable[[bytes], bytes]
encoder: t.Callable[[bytes], bytes]


class ContentEncodings:
GZIP = ContentEncoding(
mimetype="application/gzip",
encoder=functools.partial(zlib.compress, wbits=31),
decoder=functools.partial(zlib.decompress, wbits=31),
)

all = [GZIP]

by_mimetype = {t.mimetype: t for t in all}


@dataclass(frozen=True)
class EncodedContent:
content: bytes
content_type: str
content_encoding: list[str]

def decode(self) -> object:
parser = ContentTypes.by_mimetype.get(self.content_type)
if not parser:
raise ValueError(f"Unknown content type: {self.content_type}")

decoders = []
for encoding in self.content_encoding:
typ = ContentEncodings.by_mimetype.get(encoding)
if not typ:
raise ValueError(f"Unknown content encoding: {encoding}")
decoders.append(typ.decoder)

data = self.content
for decoder in decoders:
data = decoder(data)
return parser.decoder(data)

@classmethod
def encode(
cls,
obj: object,
*,
content_type: ContentType,
content_encoding: t.Optional[list[ContentEncoding]],
) -> "EncodedContent":
content_encoding = content_encoding or []
data = content_type.encoder(obj)
for encoder in content_encoding:
data = encoder.encoder(data)

return cls(
content=data,
content_type=content_type.mimetype,
content_encoding=[t.mimetype for t in content_encoding],
)
4 changes: 4 additions & 0 deletions src/saturn_engine/core/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import datetime
import uuid

from .encoded_content import EncodedContent
from .types import MessageId


@dataclasses.dataclass
class TopicMessage:
#: Raw content used to pass to executors without any decoding in the worker.
body: t.Optional[EncodedContent]

#: Message arguments used to call the pipeline.
args: dict[str, t.Optional[t.Any]]

Expand Down
35 changes: 35 additions & 0 deletions tests/core/test_encoded_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import typing as t

import itertools

import pytest

from saturn_engine.core.encoded_content import ContentEncoding
from saturn_engine.core.encoded_content import ContentEncodings
from saturn_engine.core.encoded_content import ContentType
from saturn_engine.core.encoded_content import ContentTypes
from saturn_engine.core.encoded_content import EncodedContent


@pytest.mark.parametrize(
"content_type,content_encoding",
list(
itertools.product(
ContentTypes.all, [None] + [[x] for x in ContentEncodings.all]
)
),
)
def test_encode_decode(
content_type: ContentType,
content_encoding: t.Optional[list[ContentEncoding]],
) -> None:
obj = {"x": "0" * 1000}
encoded = EncodedContent.encode(
obj, content_type=content_type, content_encoding=content_encoding
)
assert isinstance(encoded.content, bytes)
if content_encoding:
assert len(encoded.content) < 1000

decoded = encoded.decode()
assert decoded == obj

0 comments on commit b767d24

Please sign in to comment.