Skip to content
This repository has been archived by the owner on May 8, 2024. It is now read-only.

Commit

Permalink
refactor, change dir structor
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 23, 2011
1 parent 4765749 commit bbe781f
Show file tree
Hide file tree
Showing 28 changed files with 10,104 additions and 47 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Davies Liu <davies.liu AT gmail.com>
14 changes: 14 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Dpark is a Python clone of Spark, MapReduce computing
framework supporting regression computation.

Work Count Example:

from dpark import DparkContext
ctx = DparkContext()
file = ctx.textFile("/tmp/words.txt")
words = file.flatMap(lambda x:x.split()).map(lambda x:(x,1))
wc = words.reduceByKey(lambda x,y:x+y).collectAsMap()
print wc

This scripts can run locally or on Mesos cluster without
any modification, just with different command arguments.
1 change: 1 addition & 0 deletions dpark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from context import DparkContext
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions cache.py → dpark/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ def test():
pool.join()
assert cache.get('a') == 'b'

from context import SparkContext
sc = SparkContext("local")
nums = sc.parallelize(range(100), 10)
from context import DparkContext
dc = DparkContext("local")
nums = dc.parallelize(range(100), 10)
cache = mmapCache
tracker = CacheTracker(True, cache)
tracker.registerRDD(nums.id, len(nums.splits))
Expand Down
2 changes: 1 addition & 1 deletion context.py → dpark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from env import env
from broadcast import Broadcast

class SparkContext:
class DparkContext:
nextRddId = 0
nextShuffleId = 0

Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions env.py → dpark/env.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os, logging
import threading

class SparkEnv:
class DparkEnv:
environ = {}
@classmethod
def register(cls, name, value):
Expand Down Expand Up @@ -43,4 +43,4 @@ def stop(self):
self.shuffleFetcher.stop()
self.started = False

env = SparkEnv()
env = DparkEnv()
File renamed without changes.
File renamed without changes.
File renamed without changes.
58 changes: 29 additions & 29 deletions rdd.py → dpark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def __init__(self, idx):
self.index = idx

class RDD:
def __init__(self, sc):
self.sc = sc
self.id = sc.newRddId()
def __init__(self, ctx):
self.ctx = ctx
self.id = ctx.newRddId()
self._splits = []
self.dependencies = []
self.aggregator = None
Expand Down Expand Up @@ -68,7 +68,7 @@ def sample(self, withReplacement, faction, seed):
return SampleRDD(self, withReplacement, faction, seed)

def union(self, rdd):
return UnionRDD(self.sc, [self, rdd])
return UnionRDD(self.ctx, [self, rdd])

def glom(self):
return GlommedRDD(self)
Expand All @@ -78,7 +78,7 @@ def cartesion(self, other):

def groupBy(self, f, numSplits=None):
if numSplits is None:
numSplits = min(self.sc.defaultMinSplits, len(self.splits))
numSplits = min(self.ctx.defaultMinSplits, len(self.splits))
return self.map(lambda x: (f(x), x)).groupByKey(numSplits)

def pipe(self, command):
Expand All @@ -93,10 +93,10 @@ def foreach(self, f):
def mf(it):
for i in it:
f(i)
return self.sc.runJob(self, mf)
return self.ctx.runJob(self, mf)

def collect(self):
return sum(self.sc.runJob(self, lambda x:list(x)), [])
return sum(self.ctx.runJob(self, lambda x:list(x)), [])

def __iter__(self):
for i in self.collect():
Expand All @@ -108,15 +108,15 @@ def reducePartition(it):
return [reduce(f, it)]
else:
return []
options = self.sc.runJob(self, reducePartition)
options = self.ctx.runJob(self, reducePartition)
return reduce(f, sum(options, []))

def uniq(self):
g = self.map(lambda x:(x,None)).reduceByKey(lambda x,y:None)
return g.map(lambda (x,y):x)

def count(self):
result = self.sc.runJob(self, lambda x: ilen(x))
result = self.ctx.runJob(self, lambda x: ilen(x))
return sum(result)

def toList(self):
Expand All @@ -127,7 +127,7 @@ def take(self, n):
r = []
p = 0
while len(r) < n and p < len(self.splits):
res = self.sc.runJob(self, lambda x: islice(x, n - len(r)), [p], True)
res = self.ctx.runJob(self, lambda x: islice(x, n - len(r)), [p], True)
if res[0]:
r.extend(res[0])
else:
Expand Down Expand Up @@ -159,7 +159,7 @@ def mergeMaps(m1, m2):

def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numSplits=None):
if numSplits is None:
numSplits = min(self.sc.defaultMinSplits, len(self.splits))
numSplits = min(self.ctx.defaultMinSplits, len(self.splits))
aggregator = Aggregator(createCombiner, mergeValue, mergeCombiners)
partitioner = HashPartitioner(numSplits)
return ShuffledRDD(self, aggregator, partitioner)
Expand Down Expand Up @@ -236,7 +236,7 @@ def flatMapValue(self, f):
return FlatMappedValuesRDD(self, f)

def groupWith(self, *others):
part = self.partitioner or HashPartitioner(self.sc.defaultParallelism)
part = self.partitioner or HashPartitioner(self.ctx.defaultParallelism)
return CoGroupedRDD([self]+list(others), part)

def lookup(self, key):
Expand All @@ -246,14 +246,14 @@ def process(it):
for k,v in it:
if k == key:
return v
return self.sc.runJob(self, process, [index], False)[0]
return self.ctx.runJob(self, process, [index], False)[0]
else:
raise Exception("lookup() called on an RDD without a partitioner")


class MappedRDD(RDD):
def __init__(self, prev, func=lambda x:x):
RDD.__init__(self, prev.sc)
RDD.__init__(self, prev.ctx)
self.prev = prev
self.func = func
self.dependencies = [OneToOneDependency(prev)]
Expand Down Expand Up @@ -292,7 +292,7 @@ def compute(self, split):

class GlommedRDD(RDD):
def __init__(self, prev):
RDD.__init__(self, prev.sc)
RDD.__init__(self, prev.ctx)
self.prev = prev
self.splits = self.prev.splits
self.dependencies = [OneToOneDependency(prev)]
Expand All @@ -306,7 +306,7 @@ def compute(self, split):

class PipedRDD(RDD):
def __init__(self, prev, command):
RDD.__init__(self, prev.sc)
RDD.__init__(self, prev.ctx)
self.prev = prev
self.command = command
self.dependencies = [OneToOneDependency(prev)]
Expand Down Expand Up @@ -350,12 +350,12 @@ def __hash__(self):

class ShuffledRDD(RDD):
def __init__(self, parent, aggregator, part):
RDD.__init__(self, parent.sc)
RDD.__init__(self, parent.ctx)
self.parent = parent
self.aggregator = aggregator
self._partitioner = part
self._splits = [ShuffledRDDSplit(i) for i in range(part.numPartitions)]
self.dependencies = [ShuffleDependency(self.sc.newShuffleId(),
self.dependencies = [ShuffleDependency(self.ctx.newShuffleId(),
parent, aggregator, part)]

def __str__(self):
Expand All @@ -382,7 +382,7 @@ def __init__(self, idx, s1, s2):

class CartesionRDD(RDD):
def __init__(self, rdd1, rdd2):
RDD.__init__(self, rdd1.sc)
RDD.__init__(self, rdd1.ctx)
self.rdd1 = rdd1
self.rdd2 = rdd2
self.numSplitsInRdd2 = n = len(rdd2.splits)
Expand Down Expand Up @@ -429,13 +429,13 @@ def mergeCombiners(self, c, v):

class CoGroupedRDD(RDD):
def __init__(self, rdds, partitioner):
RDD.__init__(self, rdds[0].sc)
RDD.__init__(self, rdds[0].ctx)
self.rdds = rdds
self.aggregator = CoGroupAggregator()
self.partitioner = partitioner
self.dependencies = dep = [rdd.partitioner == partitioner
and OneToOneDependency(rdd)
or ShuffleDependency(self.sc.newShuffleId(),
or ShuffleDependency(self.ctx.newShuffleId(),
rdd, self.aggregator, partitioner)
for i,rdd in enumerate(rdds)]
self.splits = [CoGroupSplit(j,
Expand Down Expand Up @@ -469,7 +469,7 @@ def __init__(self, prev, seed):

class SampleRDD(RDD):
def __init__(self, prev, withReplacement, frac, seed):
RDD.__init__(self, prev.sc)
RDD.__init__(self, prev.ctx)
self.prev = prev
raise NotImplementedError
# TODO
Expand All @@ -481,8 +481,8 @@ def __init__(self, idx, rdd, split):
self.split = split

class UnionRDD(RDD):
def __init__(self, sc, rdds):
RDD.__init__(self, sc)
def __init__(self, ctx, rdds):
RDD.__init__(self, ctx)
self.rdds = rdds
self.splits = [UnionSplit(0, rdd, split)
for rdd in rdds for split in rdd.splits]
Expand Down Expand Up @@ -522,8 +522,8 @@ def __equal__(self, other):


class ParallelCollection(RDD):
def __init__(self, sc, data, numSlices):
RDD.__init__(self, sc)
def __init__(self, ctx, data, numSlices):
RDD.__init__(self, ctx)
self.size = len(data)
slices = self.slice(data, numSlices)
self._splits = [ParallelCollectionSplit(self.id, i, slices[i])
Expand Down Expand Up @@ -559,8 +559,8 @@ def slice(cls, data, numSlices):


class TextFileRDD(RDD):
def __init__(self, sc, path, numSplits=None, splitSize=None):
RDD.__init__(self, sc)
def __init__(self, ctx, path, numSplits=None, splitSize=None):
RDD.__init__(self, ctx)
self.path = path
if not os.path.exists(path):
raise IOError("not exists")
Expand Down Expand Up @@ -603,7 +603,7 @@ def compute(self, split):

class OutputTextFileRDD(RDD):
def __init__(self, rdd, path):
RDD.__init__(self, rdd.sc)
RDD.__init__(self, rdd.ctx)
self.rdd = rdd
self.path = os.path.abspath(path)
if os.path.exists(path):
Expand Down
3 changes: 2 additions & 1 deletion schedule.py → dpark/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ def getFrameworkName(self, driver):
return self.name

def getExecutorInfo(self, driver):
path = os.path.abspath('./executor')
dir = os.path.basename(__FILE__)
path = os.path.abspath(os.path.join(dir, 'executor'))
info = mesos_pb2.ExecutorInfo()
info.executor_id.value = "default"
info.uri = path
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
10 changes: 6 additions & 4 deletions cos.py → examples/cos.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import sys
sys.path.append('../')
import logging
from context import SparkContext
from dpark import DparkContext

spark = SparkContext()
dpark = DparkContext()

name = 'rating.txt'

Expand All @@ -11,7 +13,7 @@ def parse(line):
if r == 'None':
r = defaults[f]
return (sid, (uid, float(r)))
rating = spark.textFile(name, numSplits=2).map(parse).groupByKey(2)#.cache()
rating = dpark.textFile(name, numSplits=2).map(parse).groupByKey(2)#.cache()
#print 'us', rating.first()
print rating.count()

Expand All @@ -32,10 +34,10 @@ def vsum(a, b):
s += r * d.get(u, 0)
return s

# should replace this function with c extension for best performance.
def cos((l1, l2)):
l1 = list(l1)
l2 = list(l2)
print len(l1), len(l2)
d2 = dict(l2)
u2 = reverse(l2)
for sid1, us1 in l1:
Expand Down
9 changes: 5 additions & 4 deletions demo.py → examples/demo.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import math
import random
import os
import os, sys
from pprint import pprint
from context import SparkContext
sys.path.append('../')
from dpark import DparkContext
#import logging
#logging.basicConfig(level=logging.ERROR,
# format="%(process)d:%(threadName)s:%(levelname)s %(message)s")

spark = SparkContext()
ctx = DparkContext()

# range
nums = spark.parallelize(range(100), 4)
nums = ctx.parallelize(range(100), 4)
print nums.count()
print nums.reduce(lambda x,y:x+y)
raise
Expand Down
6 changes: 6 additions & 0 deletions examples/point.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
150 1 0
155 1 0
140 1 0
180 1 1
170 1 1
200 1 1
Loading

0 comments on commit bbe781f

Please sign in to comment.