Skip to content

Commit

Permalink
Finish dedup benchmark.
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurp committed Oct 10, 2017
1 parent aa74f3a commit 5476426
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 17 deletions.
6 changes: 4 additions & 2 deletions OrcTests/src/orc/test/item/scalabenchmarks/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class ControllableThread[+T](op: => T) extends Thread {

def value(): Option[T] = {
join()
assert(result != null)
result
synchronized {
assert(result != null)
result
}
}
}
20 changes: 7 additions & 13 deletions OrcTests/src/orc/test/item/scalabenchmarks/dedup/Dedup.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package orc.test.item.scalabenchmarks.dedup

import orc.test.item.scalabenchmarks.BenchmarkApplication
import orc.test.item.scalabenchmarks.Util
import java.io.DataOutputStream
import java.io.FileOutputStream
import java.io.FileInputStream
import java.io.IOException
import scala.annotation.tailrec
import java.io.{ DataOutputStream, FileInputStream, FileOutputStream, IOException }
import java.security.MessageDigest
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.Await
import java.util.concurrent.{ ConcurrentHashMap, ForkJoinPool }

import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration.Duration
import java.util.concurrent.ForkJoinPool

import orc.test.item.scalabenchmarks.{ BenchmarkApplication, Util }

object Dedup extends BenchmarkApplication {
val threadPool = new ForkJoinPool()
Expand Down Expand Up @@ -105,7 +99,7 @@ object Dedup extends BenchmarkApplication {
(fineChunk, fineID) <- segment(0, roughChunk)
} yield (roughChunk, roughID, fineChunk, fineID, compress(fineChunk, dedupMap))

for ((roughChunk, roughID, fineChunk, fineID, cchunk) <- cchunks) {
for ((roughChunk, roughID, fineChunk, fineID, cchunk) <- cchunks if cchunk.uncompressedSize != 0) {
cchunk.outputChunkID = id
id += 1
writeChunk(out, cchunk, alreadyOutput.containsKey(cchunk.uncompressedSHA1))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package orc.test.item.scalabenchmarks.dedup

import java.io.{ DataOutputStream, FileInputStream, FileOutputStream }
import java.util.concurrent.ConcurrentHashMap

import orc.test.item.scalabenchmarks.{ BenchmarkApplication, Util }
import Util.thread
import java.util.concurrent.ArrayBlockingQueue
import scala.annotation.tailrec

object DedupBoundedQueue extends BenchmarkApplication {
import Dedup._

def dedup(inFn: String, outFn: String): Unit = {
val dedupMap = new ConcurrentHashMap[ArrayKey, CompressedChunk]()
val roughChunks = new ArrayBlockingQueue[(Chunk, Int)](1024)
val fineChunks = new ArrayBlockingQueue[(Chunk, Int, Int)](2 * 1025)
val compressedChunks = new ArrayBlockingQueue[(CompressedChunk, Int, Int)](2 * 1024)

val in = new FileInputStream(inFn)

val readThread = thread {
for (p <- readSegments(largeChunkMin, in)) {
roughChunks.put(p)
}
}
val segmentThreads = for (_ <- 0 until 8) yield thread {
while (true) {
val (roughChunk, roughID) = roughChunks.take()
for ((fineChunk, fineID) <- segment(0, roughChunk)) {
fineChunks.put((fineChunk, roughID, fineID))
}
}
}
val compressThreads = for (_ <- 0 until 8) yield thread {
while (true) {
val (fineChunk, roughID, fineID) = fineChunks.take()
compressedChunks.put((compress(fineChunk, dedupMap), roughID, fineID))
}
}

val out = new DataOutputStream(new FileOutputStream(outFn))
val alreadyOutput = new ConcurrentHashMap[ArrayKey, Boolean]()
val outputPool = collection.mutable.HashMap[(Int, Int), CompressedChunk]()

@tailrec
def doOutput(roughID: Int, fineID: Int, id: Int): Unit = {
outputPool.get((roughID, fineID)) match {
case Some(cchunk) if cchunk.uncompressedSize == 0 && fineID == 0 => {
outputPool -= ((roughID, fineID))
}
case Some(cchunk) if cchunk.uncompressedSize == 0 => {
outputPool -= ((roughID, fineID))
doOutput(roughID + 1, 0, id)
}
case Some(cchunk) => {
cchunk.outputChunkID = id
writeChunk(out, cchunk, alreadyOutput.containsKey(cchunk.uncompressedSHA1))
alreadyOutput.put(cchunk.uncompressedSHA1, true)
//print(s"$id: ($roughID, $fineID) $roughChunk (${roughChunk.size}), $fineChunk (${fineChunk.size})\r")
outputPool -= ((roughID, fineID))
doOutput(roughID, fineID + 1, id + 1)
}
case None => {
val (cchunk, rID, fID) = compressedChunks.take()
outputPool += (rID, fID) -> cchunk
doOutput(roughID, fineID, id)
}
}
}

doOutput(0, 0, 0)

readThread.join()
segmentThreads foreach { _.terminate() }
compressThreads foreach { _.terminate() }
}

def main(args: Array[String]): Unit = {
if (args.size == 0) {
dedup("test.in", "test.out")
} else if (args.size == 1) {
val n = args(0).toInt
for (_ <- 0 until n) {
Util.timeIt {
dedup("test.in", "test.out")
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package orc.test.item.scalabenchmarks.dedup

import java.io.{ DataOutputStream, FileInputStream, FileOutputStream }
import java.util.concurrent.ConcurrentHashMap

import orc.test.item.scalabenchmarks.{ BenchmarkApplication, Util }
import java.util.concurrent.ArrayBlockingQueue
import Util.thread
import scala.annotation.tailrec

object DedupNestedPar extends BenchmarkApplication {
import Dedup._

def dedup(inFn: String, outFn: String): Unit = {
val dedupMap = new ConcurrentHashMap[ArrayKey, CompressedChunk]()
val compressedChunks = new ArrayBlockingQueue[(CompressedChunk, Int, Int)](2 * 1024)

val in = new FileInputStream(inFn)

val loopThread = thread {
for {
(roughChunk, roughID) <- readSegments(largeChunkMin, in).par
(fineChunk, fineID) <- segment(0, roughChunk).par
} {
compressedChunks.put((compress(fineChunk, dedupMap), roughID, fineID))
}
}

val out = new DataOutputStream(new FileOutputStream(outFn))
val alreadyOutput = new ConcurrentHashMap[ArrayKey, Boolean]()
val outputPool = collection.mutable.HashMap[(Int, Int), CompressedChunk]()

@tailrec
def doOutput(roughID: Int, fineID: Int, id: Int): Unit = {
outputPool.get((roughID, fineID)) match {
case Some(cchunk) if cchunk.uncompressedSize == 0 && fineID == 0 => {
outputPool -= ((roughID, fineID))
}
case Some(cchunk) if cchunk.uncompressedSize == 0 => {
outputPool -= ((roughID, fineID))
doOutput(roughID + 1, 0, id)
}
case Some(cchunk) => {
cchunk.outputChunkID = id
writeChunk(out, cchunk, alreadyOutput.containsKey(cchunk.uncompressedSHA1))
alreadyOutput.put(cchunk.uncompressedSHA1, true)
//print(s"$id: ($roughID, $fineID) $roughChunk (${roughChunk.size}), $fineChunk (${fineChunk.size})\r")
outputPool -= ((roughID, fineID))
doOutput(roughID, fineID + 1, id + 1)
}
case None => {
val (cchunk, rID, fID) = compressedChunks.take()
outputPool += (rID, fID) -> cchunk
doOutput(roughID, fineID, id)
}
}
}

doOutput(0, 0, 0)

loopThread.join()
}

def main(args: Array[String]): Unit = {
if (args.size == 0) {
dedup("test.in", "test.out")
} else if (args.size == 1) {
val n = args(0).toInt
for (_ <- 0 until n) {
Util.timeIt {
dedup("test.in", "test.out")
}
}
}
}
}
151 changes: 151 additions & 0 deletions OrcTests/test_data/performance/dedup/dedup-boundedchannel.orc
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{-
An implementation of the PARSEC 3.0 dedup benchmark.

This implementation uses Scala/Java classes for core computations but structures the entire computation
using Orc combinators. It does not use explicit channels making it totally different from the PARSEC
implementation.

-}

include "benchmark.inc"

import class Rabin = "orc.test.item.scalabenchmarks.dedup.Rabin"
import class Chunk = "orc.test.item.scalabenchmarks.dedup.Chunk"
import class ArrayKey = "orc.test.item.scalabenchmarks.dedup.ArrayKey"
import class Map = "java.util.concurrent.ConcurrentHashMap"
import class FileInputStream = "java.io.FileInputStream"
import class FileOutputStream = "java.io.FileOutputStream"
import class DataOutputStream = "java.io.DataOutputStream"
import class MessageDigest = "java.security.MessageDigest"
import class Integer = "java.lang.Integer"

def logHalt(name, f) =
val c = Channel() #
(f() >v> Println(name + " published: " + v) >> c.put(v) >> stop ; Println(name + " halted") >> c.close() >> stop) |
repeat(c.get)

class CompressedChunk {
val compressedDataCell = Cell()

val outputChunkID = Cell()

def compress(chunk) = compressedDataCell := chunk.deflate()

def compressedData() = compressedDataCell?.buffer()

val uncompressedSHA1
val uncompressedSize
}
def CompressedChunk(s, n) = new CompressedChunk { val uncompressedSHA1 = s # val uncompressedSize = n }

val rabin = Rabin()
val largeChunkMin = 2 * 1024 * 1024
val readChunkSize = 128 * 1024 * 1024

def sha1(chunk) = ArrayKey(
val m = MessageDigest.getInstance("SHA-1")
m.update(chunk.buffer(), chunk.start(), chunk.size()) >>
m.digest())

{-- Read chunks from an InputStream and publish chucks of it which are at least minimumSegmentSize long.
-}
def readSegements(minimumSegmentSize, in, callback) =
def process(currentChunk, i) =
val splitPoint = rabin.segment(currentChunk, minimumSegmentSize)
if splitPoint = currentChunk.size() then
-- TODO: PERFORMANCE: This repeatedly reallocates a 128MB buffer. Even the JVM GC cannot handle that well, probably.
Chunk.readFromInputStream(in, readChunkSize) >data>
process(currentChunk.append(data), i) ;
callback(currentChunk, i) >> callback(Chunk.empty(), i + 1)
else
callback(currentChunk.slice(0, splitPoint), i) >>
process(currentChunk.slice(splitPoint, currentChunk.size()), i+1)
process(Chunk.empty(), 0)


{-- Publish some number of subchunks of chunk where each chunk is at least minimumSegmentSize long.
-}
def segment(minimumSegmentSize, chunk, callback) =
def process(chunk, i) if (chunk.size() = 0) = callback(Chunk.empty(), i)
def process(chunk, i) =
val splitPoint = rabin.segment(chunk, minimumSegmentSize) #
callback(chunk.slice(0, splitPoint), i) >>
process(chunk.slice(splitPoint, chunk.size()), i + 1)
process(chunk, 0)

{-- Compress a chunk with deduplication by publishing an existing compressed chuck if an identical one exists.
-}
def compress(chunk, dedupPool, id) =
val hash = sha1(chunk)
val old = dedupPool.putIfAbsent(hash, CompressedChunk(hash, chunk.size()))
val compChunk = old >> dedupPool.get(hash)
Ift(old = null) >> --Println("Compressing CompressedChunk: " + compChunk.uncompressedSHA1) >>
compChunk.compress(chunk) >> stop |
compChunk

def writeChunk(out, cchunk, isAlreadyOutput) =
if isAlreadyOutput then
--Println("R chunk: " + (roughID, fineID) + cchunk.uncompressedSHA1) >>
out.writeBytes("R") >>
out.writeLong(cchunk.outputChunkID?)
else
--Println("D chunk: " + (roughID, fineID) + cchunk.uncompressedSHA1) >>
out.writeBytes("D") >>
out.writeLong(cchunk.compressedData().length?) >>
out.write(cchunk.compressedData())

{-- Read sequential elements from the pool and write to the provided OutputStream.
-}
def write(out, outputPool) =
val alreadyOutput = Map()
def process((roughID, fineID), id) =
val cchunk = outputPool.get((roughID, fineID))
if cchunk = null then
--Println("Pool: " + (roughID, fineID) + " " + outputPool) >>
Rwait(100) >> process((roughID, fineID), id)
else if cchunk.uncompressedSize = 0 then
if fineID = 0 then
Println("Done") >>
signal
else
process((roughID + 1, 0), id)
else
cchunk.outputChunkID := id >> stop |
outputPool.remove((roughID, fineID)) >> stop |
writeChunk(out, cchunk, alreadyOutput.containsKey(cchunk.uncompressedSHA1)) >>
alreadyOutput.put(cchunk.uncompressedSHA1, true) >>
process((roughID, fineID + 1), id + 1)
process((0, 0), 0)

{-- Connect the various stages using branch combinators
-}
def dedup(in, out) =
val dedupPool = Map()
val outputPool = Map()
val roughChunks = BoundedChannel(64)
val fineChunks = BoundedChannel(1024)

def fineSegment(roughChunk, roughID) =
segment(0, roughChunk, { fineChunks.put((_, roughID, _)) })
def fineSegmentThread() =
repeat({ roughChunks.get() >(roughChunk, roughID)> fineSegment(roughChunk, roughID) }) >> stop

def compressThread() =
repeat({ fineChunks.get() >(fineChunk, roughID, fineID)>
compress(fineChunk, dedupPool, (roughID, fineID)) >compressedChunk>
outputPool.put((roughID, fineID), compressedChunk) }) >> stop

readSegements(largeChunkMin, in, lambda(c, i) = roughChunks.put((c, i))) >> stop |
signals(8) >> fineSegmentThread() |
signals(8) >> compressThread() |
write(out, outputPool) >>
roughChunks.close() >>
fineChunks.close()


benchmark({
val (in, out) = (FileInputStream("test.in"), DataOutputStream(FileOutputStream("test.out")))
dedup(in, out) >>
in.close() >>
out.close()
})
4 changes: 2 additions & 2 deletions OrcTests/test_data/performance/dedup/dedup.orc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
An implementation of the PARSEC 3.0 dedup benchmark.

This implementation uses Scala/Java classes for core computations but structures the entire computation
using Orc combinators. It does not use explicit channels making it totally different from the PARSEC or
Scala implementation.
using Orc combinators. It does not use explicit channels making it totally different from the PARSEC
implementation.

-}

Expand Down

0 comments on commit 5476426

Please sign in to comment.