Skip to content

Commit

Permalink
[proxima-beam-core] #206 enhance StreamElementCoder with sequential ID
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Apr 27, 2021
1 parent e9c3662 commit 206a6e4
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ public void encode(StreamElement value, OutputStream outStream) throws IOExcepti

final DataOutput output = new DataOutputStream(outStream);
output.writeUTF(value.getEntityDescriptor().getName());
output.writeUTF(value.getUuid());
if (value.hasSequentialId()) {
output.writeUTF("");
output.writeLong(value.getSequentialId());
} else {
output.writeUTF(value.getUuid());
}
output.writeUTF(value.getKey());
final Type type;
if (value.isDelete()) {
Expand Down Expand Up @@ -101,6 +106,7 @@ public StreamElement decode(InputStream inStream) throws IOException {
() -> new IOException(String.format("Unable to find entity [%s].", entityName)));

final String uuid = input.readUTF();
final long sequentialId = uuid.length() == 0 ? input.readLong() : -1L;
final String key = input.readUTF();
final int typeOrdinal = input.readInt();
final Type type = Type.values()[typeOrdinal];
Expand All @@ -122,18 +128,34 @@ public StreamElement decode(InputStream inStream) throws IOException {
final long stamp = input.readLong();

byte[] value = readBytes(input);
switch (type) {
case DELETE_WILDCARD:
return StreamElement.deleteWildcard(
entityDescriptor, attributeDescriptor, uuid, key, stamp);
case DELETE:
return StreamElement.delete(
entityDescriptor, attributeDescriptor, uuid, key, attribute, stamp);
case UPDATE:
return StreamElement.upsert(
entityDescriptor, attributeDescriptor, uuid, key, attribute, stamp, value);
default:
throw new IllegalStateException("Unknown type " + type);
if (sequentialId == -1L) {
switch (type) {
case DELETE_WILDCARD:
return StreamElement.deleteWildcard(
entityDescriptor, attributeDescriptor, uuid, key, stamp);
case DELETE:
return StreamElement.delete(
entityDescriptor, attributeDescriptor, uuid, key, attribute, stamp);
case UPDATE:
return StreamElement.upsert(
entityDescriptor, attributeDescriptor, uuid, key, attribute, stamp, value);
default:
throw new IllegalStateException("Unknown type " + type);
}
} else {
switch (type) {
case DELETE_WILDCARD:
return StreamElement.deleteWildcard(
entityDescriptor, attributeDescriptor, sequentialId, key, stamp);
case DELETE:
return StreamElement.delete(
entityDescriptor, attributeDescriptor, sequentialId, key, attribute, stamp);
case UPDATE:
return StreamElement.upsert(
entityDescriptor, attributeDescriptor, sequentialId, key, attribute, stamp, value);
default:
throw new IllegalStateException("Unknown type " + type);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package cz.o2.proxima.beam.core.io;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.TestUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.beam.sdk.coders.Coder;
import org.junit.Test;
Expand All @@ -34,27 +39,82 @@ public class StreamElementCoderTest {

private final Repository repo = Repository.of(ConfigFactory.load("test-reference.conf"));
private final Coder<StreamElement> coder = StreamElementCoder.of(repo);
private final EntityDescriptor gateway = repo.getEntity("gateway");
private final AttributeDescriptor<Object> armed = gateway.getAttribute("armed");
private final AttributeDescriptor<Object> device = gateway.getAttribute("device.*");

@Test
public void testCoderSerializable() throws IOException, ClassNotFoundException {
TestUtils.assertSerializable(coder);
}

@Test
public void testStreamElement() throws IOException {
List<StreamElement> elements =
Arrays.asList(
StreamElement.upsert(
gateway,
armed,
UUID.randomUUID().toString(),
"key",
armed.getName(),
System.currentTimeMillis(),
new byte[] {1, 2, 3}),
StreamElement.delete(
gateway,
armed,
UUID.randomUUID().toString(),
"key",
armed.getName(),
System.currentTimeMillis()),
StreamElement.deleteWildcard(
gateway, device, UUID.randomUUID().toString(), "key", System.currentTimeMillis()));

elements.forEach(
e -> assertEquals(e, ExceptionUtils.uncheckedFactory(() -> decode(encode(e)))));
}

@Test
public void testStreamElementWithSequentialId() throws IOException {
List<StreamElement> elements =
Arrays.asList(
StreamElement.upsert(
gateway,
armed,
1L,
"key",
armed.getName(),
System.currentTimeMillis(),
new byte[] {1, 2, 3}),
StreamElement.delete(
gateway, armed, 2L, "key", armed.getName(), System.currentTimeMillis()));

elements.forEach(
e -> assertEquals(e, ExceptionUtils.uncheckedFactory(() -> decode(encode(e)))));
}

@Test
public void testCoderOutputReasonableSize() throws IOException {
EntityDescriptor gateway = repo.getEntity("gateway");
AttributeDescriptor<Object> armed = gateway.getAttribute("armed");
StreamElement element =
StreamElement.upsert(
gateway,
armed,
"key",
UUID.randomUUID().toString(),
"key",
armed.getName(),
System.currentTimeMillis(),
new byte[] {1, 2, 3});
assertTrue(encode(element).length < 100);
}

private byte[] encode(StreamElement element) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
coder.encode(element, baos);
assertTrue(baos.toByteArray().length < 100);
return baos.toByteArray();
}

private StreamElement decode(byte[] bytes) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return coder.decode(bais);
}
}

0 comments on commit 206a6e4

Please sign in to comment.