Skip to content

Commit

Permalink
Switch Thor to RoxieMM and use new swapping paradigm
Browse files Browse the repository at this point in the history
+group/local sort merged
 - group sort now spills
 - an overflowing local sort, used to spill last mem's worth unecessarily
+global sort, either all in mem, or all on disk.
 - refactored out minisort and overflow intercept handling for clarify
+local join, used to spill both sides unconditionally
+globaljoin, each side either all in mem or disk. It still unecessarily
spills first side to disk before gathernig other side (TODO)
+Spillable streams, consuming or shared, retain rows in mem. until need to spill. Used by various things.
+Unified the array classes, will be easier to add spilling elsewhere now, e.g. extend hashdedup.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed May 4, 2012
1 parent 9242b6f commit a95931f
Show file tree
Hide file tree
Showing 72 changed files with 3,233 additions and 3,736 deletions.
1 change: 1 addition & 0 deletions thorlcr/activities/activitymasters_lcr.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set ( SRCS
filter/thfilter.cpp
firstn/thfirstn.cpp
funnel/thfunnel.cpp
group/thgroup.cpp
hashdistrib/thhashdistrib.cpp
indexread/thindexread.cpp
indexwrite/thindexwrite.cpp
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/activityslaves_lcr.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ include_directories (
./../../common/commonext
./../activities
./../../rtl/eclrtl
./../../roxie/roxiemem
)

HPCC_ADD_LIBRARY( activityslaves_lcr SHARED ${SRCS} )
Expand All @@ -132,6 +133,7 @@ target_link_libraries ( activityslaves_lcr
jhtree
graph_lcr
graphslave_lcr
roxiemem
)


2 changes: 1 addition & 1 deletion thorlcr/activities/aggregate/thaggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "jlib.hpp"
#include "mpbase.hpp"
#include "mputil.hpp"

#include "thmem.hpp"
#include "thaggregate.ipp"
#include "thexception.hpp"
#define NO_BWD_COMPAT_MAXSIZE
Expand Down
9 changes: 4 additions & 5 deletions thorlcr/activities/aggregate/thaggregateslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class AggregateSlaveBase : public CSlaveActivity, public CThorDataLink
if (1 == numPartialResults)
return firstRow;

CThorRowArray partialResults;
partialResults.reserve(numPartialResults);
CThorExpandingRowArray partialResults(*this, true, false, true, numPartialResults);
partialResults.setRow(0, firstRow);
--numPartialResults;

Expand All @@ -76,7 +75,7 @@ class AggregateSlaveBase : public CSlaveActivity, public CThorDataLink
msg.read(sz);
if (sz)
{
assertex(NULL == partialResults.item(sender-1));
assertex(NULL == partialResults.query(sender-1));
CThorStreamDeserializerSource mds(sz, msg.readDirect(sz));
RtlDynamicRowBuilder rowBuilder(queryRowAllocator());
size32_t sz = queryRowDeserializer()->deserialize(rowBuilder, mds);
Expand All @@ -89,13 +88,13 @@ class AggregateSlaveBase : public CSlaveActivity, public CThorDataLink
unsigned p=0;
for (;p<numPartialResults; p++)
{
const void *row = partialResults.item(p);
const void *row = partialResults.query(p);
if (row)
{
if (first)
{
first = false;
sz = cloneRow(rowBuilder, partialResults.item(p), queryRowMetaData());
sz = cloneRow(rowBuilder, row, queryRowMetaData());
}
else
sz = helper->mergeAggregate(rowBuilder, row);
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/catch/thcatchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class CSkipCatchSlaveActivity : public CCatchSlaveActivityBase
try
{
gathered = true;
Owned<IRowWriterMultiReader> overflowBuf = createOverflowableBuffer(queryRowInterfaces(input), CATCH_BUFFER_SIZE);
Owned<IRowWriterMultiReader> overflowBuf = createOverflowableBuffer(*this, queryRowInterfaces(input), true);
running = true;
while (running)
{
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/choosesets/thchoosesetsslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class InputCounter : public CSimpleInterface, implements IThorDataLink
unsigned __int64 queryTotalCycles() const;

ChooseSetsPlusActivity & activity;
IThorRowAllocator *queryRowAllocator();
IEngineRowAllocator *queryRowAllocator();
};


Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/diskread/thdiskread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "eclhelper.hpp"
#include "jlzw.hpp"

#include "thmem.hpp"
#include "thdiskread.ipp"

class CDiskReadMasterVF : public CDiskReadMasterBase
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ class CDiskGroupAggregateSlave
ActivityTimer s(totalCycles, timeActivities, NULL);
CDiskReadSlaveActivityRecord::start();
gathered = eoi = false;
localAggTable.setown(new CThorRowAggregator(*this, *helper, *helper, queryLargeMemSize()/10, container.queryOwnerId()==0));
localAggTable.setown(new CThorRowAggregator(*this, *helper, *helper));
localAggTable->start(queryRowAllocator());
dataLinkStart("DISKGROUPAGGREGATE", container.queryId());
}
Expand Down Expand Up @@ -990,7 +990,7 @@ class CDiskGroupAggregateSlave
{
BooleanOnOff onOff(merging);
bool ordered = 0 != (TDRorderedmerge & helper->getFlags());
localAggTable.setown(mergeLocalAggs(*this, *helper, *helper, localAggTable, mpTag, queryLargeMemSize()/10, container.queryOwnerId()==0, ordered));
localAggTable.setown(mergeLocalAggs(*this, *helper, *helper, localAggTable, mpTag, ordered));
}
}
Owned<AggregateRowBuilder> next = localAggTable->nextResult();
Expand Down
5 changes: 2 additions & 3 deletions thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "jhtree.hpp"
#include "thsortu.hpp"
#include "thactivityutil.ipp"
#include "thmem.hpp"
#include "thormisc.hpp"
#include "thbufdef.hpp"
#include "thexception.hpp"
Expand Down Expand Up @@ -271,7 +270,7 @@ class CFetchSlaveBase : public CSlaveActivity, public CThorDataLink, implements
unsigned offsetMapSz;
MemoryBuffer offsetMapBytes;
Owned<IExpander> eexp;
Owned<IThorRowAllocator> keyRowAllocator;
Owned<IEngineRowAllocator> keyRowAllocator;

protected:
Owned<IRowInterfaces> fetchDiskRowIf;
Expand Down Expand Up @@ -343,7 +342,7 @@ class CFetchSlaveBase : public CSlaveActivity, public CThorDataLink, implements
{
IOutputMetaData *keyRowMeta = QUERYINTERFACE(fetchBaseHelper->queryExtractedSize(), IOutputMetaData);
assertex(keyRowMeta);
keyRowAllocator.setown(createThorRowAllocator(keyRowMeta, queryActivityId()));
keyRowAllocator.setown(queryJob().getRowAllocator(keyRowMeta, queryActivityId()));
}
appendOutputLinked(this);
}
Expand Down
66 changes: 32 additions & 34 deletions thorlcr/activities/filter/thfilterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,16 @@ class CFilterProjectSlaveActivity : public CFilterSlaveActivityBase

class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorSteppable
{
unsigned nextIndex;
CThorRowArray group;
IHThorFilterGroupArg *helper;
Owned<IThorRowLoader> groupLoader;
Owned<IRowStream> groupStream;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CFilterGroupSlaveActivity(CGraphElementBase *container) : CFilterSlaveActivityBase(container), CThorSteppable(this)
{
groupLoader.setown(createThorRowLoader(*this, NULL, false, rc_allMem));
}
void init(MemoryBuffer &data, MemoryBuffer &slaveData)
{
Expand All @@ -259,31 +260,32 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
{
ActivityTimer s(totalCycles, timeActivities, NULL);
abortSoon = !helper->canMatchAny();
nextIndex = 0;
CFilterSlaveActivityBase::start("FILTERGROUP");
}
CATCH_NEXTROW()
{
ActivityTimer t(totalCycles, timeActivities, NULL);
while (!abortSoon)
{
if (group.ordinality())
if (groupStream)
{
if (nextIndex < group.ordinality())
OwnedConstThorRow row = groupStream->nextRow();
if (row)
{
OwnedConstThorRow itm = group.itemClear(nextIndex++);
dataLinkIncrement();
return itm.getClear();
return row.getClear();
}
nextIndex = 0;
group.clear();
groupStream.clear();
return NULL;
}
unsigned num = group.load(*input, false);
if (num)
CThorExpandingRowArray rows(*this);
groupStream.setown(groupLoader->loadGroup(input, abortSoon, &rows));
if (rows.ordinality())
{
if (!helper->isValid(num, (const void **)group.base()))
group.clear(); // read next group
// JCSMORE - if isValid would take a stream, group wouldn't need to be in mem.
if (!helper->isValid(rows.ordinality(), rows.getRowArray()))
groupStream.clear();
// read next group
}
else
abortSoon = true; // eof
Expand All @@ -301,19 +303,20 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
if (abortSoon)
return NULL;

if (group.ordinality())
if (groupStream)
{
while (nextIndex < group.ordinality())
{
OwnedConstThorRow ret = group.itemClear(nextIndex++);
if (stepCompare->docompare(ret, seek, numFields) >= 0)
loop
{
OwnedConstThorRow row = groupStream->nextRow();
if (!row)
break;
if (stepCompare->docompare(row, seek, numFields) >= 0)
{
dataLinkIncrement();
return ret.getClear();
return row.getClear();
}
}
nextIndex = 0;
group.clear();
groupStream.clear();
//nextRowGE never returns an end of group marker. JCSMORE - Is this right?
}

Expand All @@ -338,18 +341,13 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
ret.setown(input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra));
#endif

OwnedConstThorRow ret = input->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
while (ret)
{
group.append(ret.getClear());
ret.setown(input->nextRow());
}

unsigned num = group.ordinality();
if (num)
CThorExpandingRowArray rows(*this);
groupStream.setown(groupLoader->loadGroup(input, abortSoon, &rows));
if (rows.ordinality())
{
if (!helper->isValid(num, (const void **)group.base()))
group.clear();
// JCSMORE - if isValid would take a stream, group wouldn't need to be in mem.
if (!helper->isValid(rows.ordinality(), rows.getRowArray()))
groupStream.clear();
}
else
abortSoon = true; // eof
Expand All @@ -363,12 +361,12 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
void resetEOF()
{
abortSoon = false;
group.clear();
groupStream.clear();
input->resetEOF();
}
void stop()
{
group.clear();
groupStream.clear();
stopInput(input);
dataLinkStop();
}
Expand Down
19 changes: 9 additions & 10 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
CriticalBlock b2(fullCrit); // exclusivity for totSize / full
if (stopped) return;
rows.enqueue(row);
totSize += thorRowMemoryFootprint(row);
totSize += thorRowMemoryFootprint(serializer, row);
while (totSize > FUNNEL_MIN_BUFF_SIZE)
{
full = true;
Expand Down Expand Up @@ -251,7 +251,7 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
rows.stop();
return NULL;
}
size32_t sz = thorRowMemoryFootprint(row.get());
size32_t sz = thorRowMemoryFootprint(serializer, row.get());
{
CriticalBlock b(fullCrit);
assertex(totSize>=sz);
Expand Down Expand Up @@ -528,16 +528,15 @@ class CombineSlaveActivity : public CSlaveActivity, public CThorDataLink
bool grouped;
bool eogNext;
MemoryBuffer recbuf;
CThorRowArray rows;
CThorExpandingRowArray rows;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);


CombineSlaveActivity(CGraphElementBase *_container)
: CSlaveActivity(_container), CThorDataLink(this)
: CSlaveActivity(_container), CThorDataLink(this), rows(*this)
{
rows.setSizing(true,true);
grouped = container.queryGrouped();
}
void init()
Expand Down Expand Up @@ -587,7 +586,7 @@ class CombineSlaveActivity : public CSlaveActivity, public CThorDataLink
err = true;
break;
}
rows.append((void *)row.getClear());
rows.append(row.getClear());
}
else {
if (i&&!eog) {
Expand All @@ -599,20 +598,20 @@ class CombineSlaveActivity : public CSlaveActivity, public CThorDataLink
}
if (err) {
eog = true;
rows.clear();
rows.kill();
throw MakeActivityException(this, -1, "mismatched input row count for Combine");
}
if (eog)
break;
RtlDynamicRowBuilder row(queryRowAllocator());
size32_t sizeGot = helper->transform(row, rows.ordinality(), (const void * *)rows.base());
rows.clear();
size32_t sizeGot = helper->transform(row, rows.ordinality(), rows.getRowArray());
rows.kill();
if (sizeGot) {
dataLinkIncrement();
return row.finalizeRowClear(sizeGot);
}
}
rows.clear();
rows.kill();
return NULL;
}
bool isGrouped()
Expand Down
43 changes: 43 additions & 0 deletions thorlcr/activities/group/thgroup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
############################################################################## */


#include "thactivitymaster.ipp"


class CGroupActivityMaster : public CMasterActivity
{
public:
CGroupActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
{
mpTag = container.queryJob().allocateMPTag();
}
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
{
dst.append((int)mpTag);
}
};

CActivityBase *createGroupActivityMaster(CMasterGraphElement *container)
{
if (container->queryLocalOrGrouped())
return new CMasterActivity(container);
else
return new CGroupActivityMaster(container);
}

Loading

0 comments on commit a95931f

Please sign in to comment.