Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to specify a preferred store which should be used for queries #439

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add a way to specify a preferred store which should be used for queries
gartens committed Apr 6, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 8a6a65118aeed8a790e30e2aaa445a315916bc97
2 changes: 2 additions & 0 deletions core/src/main/java/org/polypheny/db/plan/AlgOptTable.java
Original file line number Diff line number Diff line change
@@ -135,6 +135,8 @@ public interface AlgOptTable extends Wrapper {
*/
List<ColumnStrategy> getColumnStrategies();

void setPreferredPlacement( String placement );
String getPreferredPlacement();

default Table getTable() {
return null;
15 changes: 15 additions & 0 deletions core/src/main/java/org/polypheny/db/prepare/AlgOptTableImpl.java
Original file line number Diff line number Diff line change
@@ -100,6 +100,8 @@ public class AlgOptTableImpl extends Prepare.AbstractPreparingTable {
private final transient Function<Class, Expression> expressionFunction;
private final ImmutableList<String> names;

String preferredPlacement;

/**
* Estimate for the row count, or null.
* <p>
@@ -121,6 +123,7 @@ private AlgOptTableImpl(
this.table = table; // may be null
this.expressionFunction = expressionFunction; // may be null
this.rowCount = rowCount; // may be null
this.preferredPlacement = null;
}


@@ -403,6 +406,18 @@ public AccessType getAllowedAccess() {
}


@Override
public void setPreferredPlacement( String placement ) {
preferredPlacement = placement;
}


@Override
public String getPreferredPlacement() {
return preferredPlacement;
}


/**
* Helper for {@link #getColumnStrategies()}.
*/
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ public AlgNode visit( LogicalMatch match ) {

@Override
public AlgNode visit( Scan scan ) {
hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() );
hashBasis.add( "Scan#" + scan.getTable().getQualifiedName() + "@" + scan.getTable().getPreferredPlacement() );
// get available columns for every table scan
this.getAvailableColumns( scan );

11 changes: 11 additions & 0 deletions core/src/test/java/org/polypheny/db/catalog/MockCatalogReader.java
Original file line number Diff line number Diff line change
@@ -390,6 +390,17 @@ public AlgDataType getRowType() {
}


@Override
public void setPreferredPlacement( String placement ) {
}


@Override
public String getPreferredPlacement() {
return null;
}


public static MockTable create( MockCatalogReader catalogReader, MockSchema schema, String name, boolean stream, double rowCount ) {
return create( catalogReader, schema, name, stream, rowCount, null );
}
Original file line number Diff line number Diff line change
@@ -17,10 +17,15 @@
package org.polypheny.db.routing.routers;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
@@ -43,7 +48,10 @@
import org.polypheny.db.algebra.logical.relational.LogicalValues;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.Catalog.Pattern;
import org.polypheny.db.catalog.entity.CatalogAdapter;
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.catalog.exceptions.UnknownAdapterIdRuntimeException;
import org.polypheny.db.plan.AlgOptCluster;
import org.polypheny.db.prepare.AlgOptTableImpl;
import org.polypheny.db.rex.RexBuilder;
@@ -82,6 +90,8 @@ public abstract class AbstractDqlRouter extends BaseRouter implements Router {
*/
protected boolean cancelQuery = false;

// catalogTable.id -> unique placement name
final Map<Long, String> preferencePerTable = new HashMap<>();

/**
* Abstract methods which will determine routing strategy. Not implemented in abstract class.
@@ -113,6 +123,45 @@ protected abstract List<RoutedAlgBuilder> handleNonePartitioning(
AlgOptCluster cluster,
LogicalQueryInformation queryInformation );

/* Copied from BaseRouter.java, with added awarness of placement preference */
public Map<Long, List<CatalogColumnPlacement>> selectPlacementWithPreference( CatalogTable table ) {
// Find the adapter with the most column placements
int adapterIdWithMostPlacements = -1;
int numOfPlacements = 0;
final String preferredPlacement = preferencePerTable.getOrDefault( table.id, null );
for ( Entry<Integer, ImmutableList<Long>> entry : catalog.getColumnPlacementsByAdapter( table.id ).entrySet() ) {
if ( preferredPlacement != null ) {
try {
final CatalogAdapter adapter = catalog.getAdapter( entry.getKey() );
if ( ! preferredPlacement.equals( adapter.uniqueName ) ) {
continue;
}
} catch (UnknownAdapterIdRuntimeException e) {
// getColumnplacementsByAdapter can return id
// values which are not valid
continue;
}
}
if ( entry.getValue().size() > numOfPlacements ) {
adapterIdWithMostPlacements = entry.getKey();
numOfPlacements = entry.getValue().size();
}
}

// Take the adapter with most placements as base and add missing column placements
List<CatalogColumnPlacement> placementList = new LinkedList<>();
for ( long cid : table.fieldIds ) {
if ( catalog.getDataPlacement( adapterIdWithMostPlacements, table.id ).columnPlacementsOnAdapter.contains( cid ) ) {
placementList.add( Catalog.getInstance().getColumnPlacement( adapterIdWithMostPlacements, cid ) );
} else {
placementList.add( Catalog.getInstance().getColumnPlacement( cid ).get( 0 ) );
}
}

return new HashMap<>() {{
put( table.partitionProperty.partitionIds.get( 0 ), placementList );
}};
}

/**
* Abstract router only routes DQL queries.
@@ -225,6 +274,7 @@ protected List<RoutedAlgBuilder> buildSelect( AlgNode node, List<RoutedAlgBuilde
}

CatalogTable catalogTable = catalog.getTable( logicalTable.getTableId() );
preferencePerTable.put( catalogTable.id, table.getPreferredPlacement() );

// Check if table is even horizontal partitioned
if ( catalogTable.partitionProperty.isPartitioned ) {
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ protected List<RoutedAlgBuilder> handleVerticalPartitioningOrReplication( AlgNod
@Override
protected List<RoutedAlgBuilder> handleNonePartitioning( AlgNode node, CatalogTable catalogTable, Statement statement, List<RoutedAlgBuilder> builders, AlgOptCluster cluster, LogicalQueryInformation queryInformation ) {
// Get placements and convert into placement distribution
final Map<Long, List<CatalogColumnPlacement>> placements = selectPlacement( catalogTable );
final Map<Long, List<CatalogColumnPlacement>> placements = selectPlacementWithPreference( catalogTable );

// Only one builder available
builders.get( 0 ).addPhysicalInfo( placements );
12 changes: 10 additions & 2 deletions plugins/sql-language/src/main/codegen/templates/Parser.jj
Original file line number Diff line number Diff line change
@@ -4262,7 +4262,8 @@ SqlIdentifier CompoundIdentifier() :
{
List<String> list = new ArrayList<String>();
List<ParserPos> posList = new ArrayList<ParserPos>();
String p;
String p, at = null;
SqlIdentifier s;
boolean star = false;
}
{
@@ -4286,12 +4287,18 @@ SqlIdentifier CompoundIdentifier() :
posList.add(getPos());
}
)?
(
<ATSYMBOL>
at = Identifier()
)?
{
ParserPos pos = ParserPos.sum(posList);
if (star) {
return SqlIdentifier.star(list, pos, posList);
}
return new SqlIdentifier(list, null, pos, posList);
s = new SqlIdentifier(list, null, pos, posList);
s.preferredPlacement = at;
return s;
}
}

@@ -6057,6 +6064,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < ASSIGNMENT: "ASSIGNMENT" >
| < ASYMMETRIC: "ASYMMETRIC" >
| < AT: "AT" >
| < ATSYMBOL: "@" >
| < ATOMIC: "ATOMIC" >
| < ATTRIBUTE: "ATTRIBUTE" >
| < ATTRIBUTES: "ATTRIBUTES" >
Original file line number Diff line number Diff line change
@@ -64,6 +64,8 @@ public class SqlIdentifier extends SqlNode implements Identifier {
protected ImmutableList<ParserPos> componentPositions;


public String preferredPlacement;

/**
* Creates a compound identifier, for example <code>foo.bar</code>.
*
@@ -315,6 +317,11 @@ public void unparse( SqlWriter writer, int leftPrec, int rightPrec ) {
i++;
}

if ( preferredPlacement != null ) {
writer.print(" @ ");
writer.identifier( preferredPlacement );
}

if ( null != collation ) {
collation.unparse( writer, leftPrec, rightPrec );
}
Original file line number Diff line number Diff line change
@@ -1969,6 +1969,7 @@ private void convertIdentifier( Blackboard bb, SqlIdentifier id, SqlNodeList ext
final String datasetName = datasetStack.isEmpty() ? null : datasetStack.peek();
final boolean[] usedDataset = { false };
AlgOptTable table = SqlValidatorUtil.getAlgOptTable( fromNamespace, catalogReader, datasetName, usedDataset );
table.setPreferredPlacement( id.preferredPlacement );
if ( extendedColumns != null && extendedColumns.size() > 0 ) {
assert table != null;
final ValidatorTable validatorTable = table.unwrap( ValidatorTable.class );