Skip to content

Commit

Permalink
Bug fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
esmaeilmirvakili committed Aug 11, 2020
1 parent cd34107 commit 3741bcb
Showing 1 changed file with 54 additions and 50 deletions.
104 changes: 54 additions & 50 deletions scripts/radossim.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def kvAndAioThread(env, srcQ, latencyModel, batchManagement, data=None, useCoDel
batch = []
with srcQ.get() as get:
req = yield get
((_, reqSize, _), _) = req
((priority, reqSize, arrivalOSD), arrivalKV) = req
req = ((priority, reqSize, arrivalOSD), env.now)
batchReqSize += reqSize
batch.append(req)
if useCoDel:
Expand All @@ -56,11 +57,13 @@ def kvAndAioThread(env, srcQ, latencyModel, batchManagement, data=None, useCoDel
for i in range(batchSize):
with srcQ.get() as get:
req = yield get
((_, reqSize, _), _) = req
((priority, reqSize, arrivalOSD), arrivalKV) = req
req = ((priority, reqSize, arrivalOSD), env.now)
batchReqSize += reqSize
batch.append(req)
aioSubmit = env.now
yield env.timeout(latencyModel.submitAIO(batchReqSize))
timeout = latencyModel.submitAIO(batchReqSize)
yield env.timeout(timeout)
aioDone = env.now
kvBatch = []
for req in batch:
Expand All @@ -78,45 +81,45 @@ def kvAndAioThread(env, srcQ, latencyModel, batchManagement, data=None, useCoDel
req = (req, kvQDispatch, kvCommit)
data.append(req)
if useCoDel:
batchManagement.manageBatch(kvBatch, batchReqSize, aioSubmit, kvCommit)
batchManagement.manageBatch(kvBatch, batchReqSize, kvQDispatch, kvCommit)


# Batch incoming requests and process
def kvThreadOld(env, srcQ, latencyModel, targetLat=5000, measInterval=100000):
bm = BatchManagement(srcQ, targetLat, measInterval)
while True:
# Create batch
batch = []
# Build request of entire batch (see Issue #6)
batchReqSize = 0
# Wait until there is something in the srcQ
with srcQ.get() as get:
bsTxn = yield get
batch.append(bsTxn)
# Unpack transaction
((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
batchReqSize += reqSize
# Batch everything that is now in srcQ or up to bm.batchSize
# initial batch size is governed by srcQ.capacity
# but then updated by BatchManagement
if bm.batchSize == float("inf"):
batchSize = len(srcQ.items)
else:
batchSize = int(min(bm.batchSize - 1, len(srcQ.items)))
# Do batch
for i in range(batchSize):
with srcQ.get() as get:
bsTxn = yield get
batch.append(bsTxn)
# Unpack transaction
((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
batchReqSize += reqSize
# Process batch
kvQDispatch = env.now
yield env.timeout(latencyModel.applyWrite(batchReqSize))
kvCommit = env.now
# Diagnose and manage batching
bm.manageBatch(batch, batchReqSize, kvQDispatch, kvCommit)
# def kvThreadOld(env, srcQ, latencyModel, targetLat=5000, measInterval=100000):
# bm = BatchManagement(srcQ, targetLat, measInterval)
# while True:
# # Create batch
# batch = []
# # Build request of entire batch (see Issue #6)
# batchReqSize = 0
# # Wait until there is something in the srcQ
# with srcQ.get() as get:
# bsTxn = yield get
# batch.append(bsTxn)
# # Unpack transaction
# ((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
# batchReqSize += reqSize
# # Batch everything that is now in srcQ or up to bm.batchSize
# # initial batch size is governed by srcQ.capacity
# # but then updated by BatchManagement
# if bm.batchSize == float("inf"):
# batchSize = len(srcQ.items)
# else:
# batchSize = int(min(bm.batchSize - 1, len(srcQ.items)))
# # Do batch
# for i in range(batchSize):
# with srcQ.get() as get:
# bsTxn = yield get
# batch.append(bsTxn)
# # Unpack transaction
# ((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
# batchReqSize += reqSize
# # Process batch
# kvQDispatch = env.now
# yield env.timeout(latencyModel.applyWrite(batchReqSize))
# kvCommit = env.now
# # Diagnose and manage batching
# bm.manageBatch(batch, batchReqSize, kvQDispatch, kvCommit)


# Manage batch sizing
Expand Down Expand Up @@ -195,6 +198,9 @@ def fightBufferbloat(self, currQLat, currentTime):
else:
if self.maxQueueLen < len(self.queue.items):
self.maxQueueLen = len(self.queue.items)
if self.batchSize != float("inf"):
self.batchSizeLog.append(self.batchSize)
self.timeLog.append(self.queue._env.now)

def batchSizing(self, isTooLarge):
if isTooLarge:
Expand All @@ -206,14 +212,12 @@ def batchSizing(self, isTooLarge):
if self.batchSize == 0:
self.batchSize = 1
print("new batch size is", self.batchSize)
#elif self.batchSize != float("inf"):
elif self.batchSize < self.maxQueueLen * 2:
# elif self.batchSize != float("inf"):
elif self.batchSize < 1.5 * self.maxQueueLen:
# elif self.batchSize < 200:
# print('batch size', self.batchSize, 'gets larger')
self.batchSize = self.batchUpSize(self.batchSize)
# print('new batch size is', self.batchSize)
if self.batchSize != float("inf"):
self.batchSizeLog.append(self.batchSize)
self.timeLog.append(self.queue._env.now)

def printLats(self, freq=1000):
if self.count % freq == 0:
Expand Down Expand Up @@ -276,7 +280,7 @@ def __init__(self):
# OSD queue(s)
# Add capacity parameter for max queue lengths
queueDepth = 48
osdQ1 = simpy.PriorityStore(env, capacity=queueDepth)
osdQ1 = simpy.PriorityStore(env)
# osdQ2 = simpy.PriorityStore(env, capacity=queueDepth)
# osdQ = simpy.Store(env) # infinite capacity

Expand Down Expand Up @@ -338,10 +342,10 @@ def __init__(self):
# ax.set_ylabel('Likelihood of occurrence')
# ax.plot(queuLenMonitor.logTimeList, queuLenMonitor.queueLenList)
# plt.show()
def Average(lst):
return sum(lst) / len(lst)
print(f'Batch size: {max(bm.batchSizeLog)}')
print(f'Time size: {len(bm.timeLog)}')
# def Average(lst):
# return sum(lst) / len(lst)
# print(f'Batch size: {max(bm.batchSizeLog)}')
# print(f'Time size: {len(bm.timeLog)}')
fig, ax = plt.subplots(figsize=(8, 4))
ax.grid(True)
ax.set_xlabel('time')
Expand Down Expand Up @@ -371,7 +375,7 @@ def Average(lst):
help='Use CoDel algorithm for batch sizing?'
)
args = parser.parse_args()
targetLat = 500
targetLat = 300
measInterval = 1000
time = 60 * 1_000_000 # 5 mins
if args.useCoDel:
Expand Down

0 comments on commit 3741bcb

Please sign in to comment.