Skip to content

Commit

Permalink
Merge pull request #648 from apache/compressed_iterator
Browse files Browse the repository at this point in the history
wrap(byte[])
  • Loading branch information
AlexanderSaydakov authored Feb 5, 2025
2 parents 2dd1d5e + bde0b6f commit 41d7183
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datasketches.theta;

/*
* This is to uncompress serial version 4 sketch incrementally
*/
class BytesCompactCompressedHashIterator implements HashIterator {
private byte[] bytes;
private int offset;
private int entryBits;
private int numEntries;
private int index;
private long previous;
private int offsetBits;
private long[] buffer;
private boolean isBlockMode;

BytesCompactCompressedHashIterator(
final byte[] bytes,
final int offset,
final int entryBits,
final int numEntries
) {
this.bytes = bytes;
this.offset = offset;
this.entryBits = entryBits;
this.numEntries = numEntries;
index = -1;
previous = 0;
offsetBits = 0;
buffer = new long[8];
isBlockMode = numEntries >= 8;
}

@Override
public long get() {
return buffer[index & 7];
}

@Override
public boolean next() {
if (++index == numEntries) { return false; }
if (isBlockMode) {
if ((index & 7) == 0) {
if (numEntries - index >= 8) {
unpack8();
} else {
isBlockMode = false;
unpack1();
}
}
} else {
unpack1();
}
return true;
}

private void unpack1() {
final int i = index & 7;
BitPacking.unpackBits(buffer, i, entryBits, bytes, offset, offsetBits);
offset += (offsetBits + entryBits) >>> 3;
offsetBits = (offsetBits + entryBits) & 7;
buffer[i] += previous;
previous = buffer[i];
}

private void unpack8() {
BitPacking.unpackBitsBlock8(buffer, 0, bytes, offset, entryBits);
offset += entryBits;
for (int i = 0; i < 8; i++) {
buffer[i] += previous;
previous = buffer[i];
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datasketches.theta;

import org.apache.datasketches.common.ByteArrayUtil;

/*
* This is to iterate over serial version 3 sketch representation
*/
class BytesCompactHashIterator implements HashIterator {
final private byte[] bytes;
final private int offset;
final private int numEntries;
private int index;

BytesCompactHashIterator(
final byte[] bytes,
final int offset,
final int numEntries
) {
this.bytes = bytes;
this.offset = offset;
this.numEntries = numEntries;
index = -1;
}

@Override
public long get() {
return ByteArrayUtil.getLongLE(bytes, offset + index * Long.BYTES);
}

@Override
public boolean next() {
return ++index < numEntries;
}
}
54 changes: 54 additions & 0 deletions src/main/java/org/apache/datasketches/theta/CompactSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

package org.apache.datasketches.theta;

import static org.apache.datasketches.common.ByteArrayUtil.getShortLE;
import static org.apache.datasketches.common.Family.idToFamily;
import static org.apache.datasketches.theta.PreambleUtil.COMPACT_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.EMPTY_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.FLAGS_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.ORDERED_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.PREAMBLE_LONGS_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.READ_ONLY_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.SEED_HASH_SHORT;
import static org.apache.datasketches.theta.PreambleUtil.extractFamilyID;
import static org.apache.datasketches.theta.PreambleUtil.extractFlags;
import static org.apache.datasketches.theta.PreambleUtil.extractPreLongs;
Expand Down Expand Up @@ -224,6 +228,56 @@ else if (serVer == 2) {
"Corrupted: Serialization Version " + serVer + " not recognized.");
}

public static CompactSketch wrap(final byte[] bytes) {
return wrap(bytes, ThetaUtil.DEFAULT_UPDATE_SEED, false);
}

public static CompactSketch wrap(final byte[] bytes, final long expectedSeed) {
return wrap(bytes, expectedSeed, true);
}

private static CompactSketch wrap(final byte[] bytes, final long seed, final boolean enforceSeed) {
final int serVer = bytes[PreambleUtil.SER_VER_BYTE];
final int familyId = bytes[PreambleUtil.FAMILY_BYTE];
final Family family = Family.idToFamily(familyId);
if (family != Family.COMPACT) {
throw new IllegalArgumentException("Corrupted: " + family + " is not Compact!");
}
final short seedHash = ThetaUtil.computeSeedHash(seed);
if (serVer == 4) {
return WrappedCompactCompressedSketch.wrapInstance(bytes, seedHash);
} else if (serVer == 3) {
final int flags = bytes[FLAGS_BYTE];
if ((flags & EMPTY_FLAG_MASK) > 0) {
return EmptyCompactSketch.getHeapInstance(Memory.wrap(bytes));
}
final int preLongs = bytes[PREAMBLE_LONGS_BYTE];
if (otherCheckForSingleItem(preLongs, serVer, familyId, flags)) {
return SingleItemSketch.heapify(Memory.wrap(bytes), enforceSeed ? seedHash : getShortLE(bytes, SEED_HASH_SHORT));
}
//not empty & not singleItem
final boolean compactFlag = (flags & COMPACT_FLAG_MASK) > 0;
if (!compactFlag) {
throw new SketchesArgumentException(
"Corrupted: COMPACT family sketch image must have compact flag set");
}
final boolean readOnly = (flags & READ_ONLY_FLAG_MASK) > 0;
if (!readOnly) {
throw new SketchesArgumentException(
"Corrupted: COMPACT family sketch image must have Read-Only flag set");
}
return WrappedCompactSketch.wrapInstance(bytes,
enforceSeed ? seedHash : getShortLE(bytes, SEED_HASH_SHORT));
} else if (serVer == 1) {
return ForwardCompatibility.heapify1to3(Memory.wrap(bytes), seedHash);
} else if (serVer == 2) {
return ForwardCompatibility.heapify2to3(Memory.wrap(bytes),
enforceSeed ? seedHash : getShortLE(bytes, SEED_HASH_SHORT));
}
throw new SketchesArgumentException(
"Corrupted: Serialization Version " + serVer + " not recognized.");
}

//Sketch Overrides

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/**
* An off-heap (Direct), compact, read-only sketch. The internal hash array can be either ordered
* or unordered.
* or unordered. It is not empty, not a single item.
*
* <p>This sketch can only be associated with a Serialization Version 3 format binary image.</p>
*
Expand All @@ -57,7 +57,7 @@ class DirectCompactSketch extends CompactSketch {
}

/**
* Wraps the given Memory, which must be a SerVer 3, ordered, CompactSketch image.
* Wraps the given Memory, which must be a SerVer 3, CompactSketch image.
* Must check the validity of the Memory before calling. The order bit must be set properly.
* @param srcMem <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* @param seedHash The update seedHash.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datasketches.theta;

import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits;
import static org.apache.datasketches.theta.PreambleUtil.ENTRY_BITS_BYTE_V4;
import static org.apache.datasketches.theta.PreambleUtil.NUM_ENTRIES_BYTES_BYTE_V4;
import static org.apache.datasketches.theta.PreambleUtil.PREAMBLE_LONGS_BYTE;

import org.apache.datasketches.common.ByteArrayUtil;
import org.apache.datasketches.thetacommon.ThetaUtil;

/**
* Wrapper around a serialized compact compressed read-only sketch. It is not empty, not a single item.
*
* <p>This sketch can only be associated with a Serialization Version 4 format binary image.</p>
*/
class WrappedCompactCompressedSketch extends WrappedCompactSketch {

/**
* Construct this sketch with the given bytes.
* @param bytes containing serialized compact compressed sketch.
*/
WrappedCompactCompressedSketch(final byte[] bytes) {
super(bytes);
}

/**
* Wraps the given bytes, which must be a SerVer 4 compressed CompactSketch image.
* @param bytes representation of serialized compressed compact sketch.
* @param seedHash The update seedHash.
* <a href="{@docRoot}/resources/dictionary.html#seedHash">See Seed Hash</a>.
* @return this sketch
*/
static WrappedCompactCompressedSketch wrapInstance(final byte[] bytes, final short seedHash) {
ThetaUtil.checkSeedHashes(ByteArrayUtil.getShortLE(bytes, PreambleUtil.SEED_HASH_SHORT), seedHash);
return new WrappedCompactCompressedSketch(bytes);
}

//Sketch Overrides

@Override
public int getCurrentBytes() {
final int preLongs = bytes_[PREAMBLE_LONGS_BYTE];
final int entryBits = bytes_[ENTRY_BITS_BYTE_V4];
final int numEntriesBytes = bytes_[NUM_ENTRIES_BYTES_BYTE_V4];
return preLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(getRetainedEntries() * entryBits);
}

private static final int START_PACKED_DATA_EXACT_MODE = 8;
private static final int START_PACKED_DATA_ESTIMATION_MODE = 16;

@Override
public int getRetainedEntries(final boolean valid) { //compact is always valid
// number of entries is stored using variable length encoding
// most significant bytes with all zeros are not stored
// one byte in the preamble has the number of non-zero bytes used
final int preLongs = bytes_[PREAMBLE_LONGS_BYTE]; // if > 1 then the second long has theta
final int numEntriesBytes = bytes_[NUM_ENTRIES_BYTES_BYTE_V4];
int offsetBytes = preLongs > 1 ? START_PACKED_DATA_ESTIMATION_MODE : START_PACKED_DATA_EXACT_MODE;
int numEntries = 0;
for (int i = 0; i < numEntriesBytes; i++) {
numEntries |= Byte.toUnsignedInt(bytes_[offsetBytes++]) << (i << 3);
}
return numEntries;
}

@Override
public long getThetaLong() {
final int preLongs = bytes_[PREAMBLE_LONGS_BYTE];
return (preLongs > 1) ? ByteArrayUtil.getLongLE(bytes_, 8) : Long.MAX_VALUE;
}

@Override
public boolean isEmpty() {
return false;
}

@Override
public boolean isOrdered() {
return true;
}

@Override
public HashIterator iterator() {
return new BytesCompactCompressedHashIterator(
bytes_,
(bytes_[PREAMBLE_LONGS_BYTE] > 1 ? START_PACKED_DATA_ESTIMATION_MODE : START_PACKED_DATA_EXACT_MODE)
+ bytes_[NUM_ENTRIES_BYTES_BYTE_V4],
bytes_[ENTRY_BITS_BYTE_V4],
getRetainedEntries()
);
}
}
Loading

0 comments on commit 41d7183

Please sign in to comment.