Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Cannot supply samplingRatio when creating a DataFrame #111

Open
glytching opened this issue Apr 1, 2016 · 6 comments
Open

Cannot supply samplingRatio when creating a DataFrame #111

glytching opened this issue Apr 1, 2016 · 6 comments

Comments

@glytching
Copy link

Thanks very much for the spark-mongodb connector, much appreciated.

I'm having an issue when creating a DataFrame from a MongoDB collection.

The elapsed time for creating the DataFrame is 2 -3 seconds, in this scenario:

  • I am connecting to a remote MongoDB cluster
  • I am creating the DataFrame from a collection with 250 small documents (average document size is 71 bytes)
  • I am not supplying a schema, so I am expecting Stratio's MongoDB's DefaultSource to infer the schema
  • I am supplying a sample ratio to (I think!) limit the amount of data which must be read in order to infer the schema

Walking the code I can see that:

  • DataFrameReader.load() creates a org.apache.spark.sql.execution.datasources.ResolvedDataSource, which - in case RelationProvider - creates a CaseInsensitiveMap from the given options and then invokes com.stratio.datasource.mongodb.DefaultSource
  • The DefaultSource creates a new MongodbRelation but provides no schema which (see MongodbRelation:58) results in the use of a lazy schema like so:
MongodbSchema(new MongodbRDD(sqlContext, config, rddPartitioner), config.get[Any](MongodbConfig.SamplingRatio).fold(MongodbConfig.DefaultSamplingRatio)(_.toString.toDouble)).schema()  
  • The key item here is this: 'config.get[Any](MongodbConfig.SamplingRatio).fold(MongodbConfig.DefaultSamplingRatio)' which uses or overrides the caller supplied samplingRatio expecting the sampling ratio to be provided under the key "schema_samplingRatio" but because the ResolvedDataSourcehas already case insensitised the caller supplied properties our sampling ratio is actually under the key "schema_samplingratio" so the provided MongodbSchema always uses the default sample ratio: 1.0

Am I correct in the above diagnosis? If so, what can be done about it? If not, how can I realiably provide my own sampling ratio?

Any help gratefully accepted.

Version details etc:

  • org.scala-lang::scala-library::2.11.6
  • com.stratio.datasource::spark-mongodb_2.11::0.11.1
  • com.stratio::spark-mongodb::0.8.7
  • org.apache.spark::spark-sql_2.11::1.6.1

Here's a test case showing the behaviour in action;

  @Test
  public void canReproduceCallerSuppliedSamplingRatioIssue() {
    SparkConf sparkConf = new SparkConf()
        .setAppName("ProvidingASamplingRatio")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sc);

    Double samplingRatio = 0.1;
    long start = System.currentTimeMillis();
    DataFrame countriesSlow = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", samplingRatio))
        .load();
    // the preceding call takes 2 - 3 *seconds*
    System.out.println(String.format("Loaded countries without a schema in: %sms using caller supplied sampling ratio: %s",
        (System.currentTimeMillis() - start), samplingRatio));
    countriesSlow.show(5);

    StructType countriesSchema = DataTypes.createStructType(Arrays.asList(
        DataTypes.createStructField("code", DataTypes.StringType, true),
        DataTypes.createStructField("region", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true)
    ));

    start = System.currentTimeMillis();
    DataFrame countriesFast = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", null))
        .schema(countriesSchema)
        .load();
    // the preceding call takes 2 - 3 *millis*
    System.out.println(String.format("Loaded countries with a schema in: %sms using default sampling ratio",
        (System.currentTimeMillis() - start)));
    countriesFast.show(5);
  }

  private Map getOptions(String databaseName, String collectionName, Double samplingRatio) {
    Map options = new HashMap();
    // see MongodbConfig
    options.put("host", "...");
    options.put("credentials", "...");
    options.put("database", databaseName);
    options.put("collection", collectionName);
    if (samplingRatio != null) {
      options.put("schema_samplingRatio", samplingRatio);
    }
    return options;
  }
@pmadrigal
Copy link
Contributor

Hi @colmmcdonnell ,

You are right, there is a problem with this only when we go through DataFrameReader API. There is a PR #112 solving it.

You can use fromMongodb method instead of DataframeReader. See our first steps.

Anyway, you are using an old version 0.8.7, in the next weeks we are going to release 0.11.2 version.

Thanks for your feedback!

@glytching
Copy link
Author

Hi @pmadrigal

Thanks for your response. A few replies:

  • I think the fromMongodb method is only available in the Scala API, is that correct? The reason I ask is that I am using the Java API and the only way I could find to create a DataFrame was via sqlContext.read().format("com.stratio.datasource.mongodb").options(...).load(), if I am missing something and there is a way to use the fromMongodb call (which I have seen in the Scala API) could you point me at it?
  • The only version of com.stratio::spark-mongodb that I can find in Maven Central is 0.8.7, are the 0.11.x versions to which you refer publically available?

Rgds

@pmadrigal
Copy link
Contributor

@colmmcdonnell

  • You can use fromMongodb in Java. Look an example:
import org.apache.spark.sql.types.StructType;
import scala.*;
import scala.collection.JavaConverters;
import com.stratio.datasource.mongodb.MongodbContext;
import com.stratio.datasource.mongodb.config.MongodbConfigBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class TestJavaAPI {

    public static void main(String[] args){

        JavaSparkContext sc = new JavaSparkContext("local[2]", "test spark-mongodb java");
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

        List<String> javaHost = new ArrayList<String>();
        javaHost.add("localhost:27017");

        scala.collection.immutable.List<String> host = JavaConverters.asScalaBufferConverter(javaHost)
                .asScala().toList();

        HashMap<String, Object> javaOptions = new HashMap<String, Object>();
        javaOptions.put("host", host);
        javaOptions.put("database", "highschool");
        javaOptions.put("collection", "students");

        Option<StructType> schema = scala.Option.apply(null);

        scala.collection.immutable.Map<String, Object> options = JavaConverters.mapAsScalaMapConverter(javaOptions)
                .asScala().toMap(Predef.<Tuple2<String, Object>>conforms());

        MongodbConfigBuilder builder = new MongodbConfigBuilder(options);
        MongodbContext mc = new MongodbContext(sqlContext);
        DataFrame df = mc.fromMongoDB(builder.build(), schema);
        df.registerTempTable("students");
        sqlContext.sql("SELECT * FROM students"); df.show();

    }
}

For using this library with Java, some Scala types are needed and as you can see, code is less clean. We highly recommend Scala!

  • There is a new groupId for this project, com.stratio.datasource, you can find it here.

Hope help you!

@glytching
Copy link
Author

Hi @pmadrigal

Thanks for your response, I can now invoke fromMongoDB from Java (and I am using v0.11.1) but ... the original issue I had was that this call: sqlContext.read().format("com.stratio.datasource.mongodb").options(...).load() was taking 2 - 3 seconds on a tiny collection. I had thought that this was because I was unable to supply a sampleRatio, however using the code you have provided I can now create the DataFrame using fromMongoDB and I can provide a sampleRatio (I walked through the code and confirmed that the sameplRation was used) but the elapsed time for the fromMongoDB call is still 2 - 3 seconds for this tiny collection.

I am seeing the same unexpected long duration whether I use fromMongoDB or sqlContext.read().format("com.stratio.datasource.mongodb").options(...).load(), providing a sampleRatio seems to make no difference. The only thing which reduces this elapsed time to a manageable number is supplying a schema.

Have I misunderstood the meaning/purpose of sampleRatio? Is there any way I can load a DataFrame quickly without supplying a schema? I know that "quickly" is subjective so to give a concrete example; in my test I am reading from a collection with 250 small documents (average document size is 71 bytes), I would hope to be able to create a DataFrame for this collection (whether using the DataFrame API or fromMongoDB) in < 5ms, this is easily achieved if I supply a schema so I am wondering if there is some way to achieve the same performance (or at least to achieve sub second performance) without providing a schema?

Thanks again for your help, much appreciated.

Rgds

Test case showing that the fromMongoDB call shows no improvement even with a small sample ratio:

  @Test
  public void canReproduceCallerSuppliedSamplingRatioIssue() {
    SparkConf sparkConf = new SparkConf()
        .setAppName("ProvidingASamplingRatio")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sc);

    List<String> javaHost = new ArrayList<>();
    javaHost.add("...");

    List<MongodbCredentials> javaCredentials = new ArrayList<>();
    javaCredentials.add(new MongodbCredentials(...));

    scala.collection.immutable.List<String> host = JavaConverters.asScalaBufferConverter(javaHost)
        .asScala().toList();
    scala.collection.immutable.List<MongodbCredentials> credentials = JavaConverters.asScalaBufferConverter(javaCredentials)
        .asScala().toList();

    HashMap<String, Object> javaOptions = new HashMap<>();
    javaOptions.put("host", host);
    javaOptions.put("credentials", credentials);
    javaOptions.put("database", "Scratch");
    javaOptions.put("collection", "countries");
    double samplingRatio = 0.1;
    javaOptions.put("schema_samplingRatio", samplingRatio);

    Option<StructType> schema = scala.Option.apply(null);
    scala.collection.immutable.Map<String, Object> options = JavaConverters.mapAsScalaMapConverter(javaOptions)
        .asScala().toMap(Predef.<Tuple2<String, Object>>conforms());

    MongodbConfigBuilder builder = new MongodbConfigBuilder(options);
    MongodbContext mc = new MongodbContext(sqlContext);

    long start = System.currentTimeMillis();
    DataFrame countriesSlow = mc.fromMongoDB(builder.build(), schema);
    // the preceding call takes >3 *seconds*
    System.out.println(String.format("Created dataframe for countries without a schema in: %sms using caller supplied sampling ratio: %s",
        (System.currentTimeMillis() - start), samplingRatio));
    countriesSlow.show(5);

    schema = scala.Option.apply(DataTypes.createStructType(Arrays.asList(
        DataTypes.createStructField("code", DataTypes.StringType, true),
        DataTypes.createStructField("region", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true)
    )));

    start = System.currentTimeMillis();
    DataFrame countriesFast = mc.fromMongoDB(builder.build(), schema);
    // the preceding call takes 1 - 2 *millis*
    System.out.println(String.format("Created dataframe for countries with a schema in: %sms",
        (System.currentTimeMillis() - start)));
    countriesFast.show(5);
  }

@pmadrigal
Copy link
Contributor

Hi @colmmcdonnell ,

As you know, by supplying schema we avoid to infer it and the time that it takes.

To create a DataFrame, a schema is needed. To infer it, we need to get data from Mongo, create an RDD, iterate over each record getting the partial schema of each one, and choosing a final schema valid for all the records. All this process is taking the time that you see in your example.

On the other side, SamplingRatio is a config property that allow us to scan only a part of the collection when we infer it schema. If collection is small, like in your case, it won't be much difference reducing ratio, but with a big collection the time will be considerably reduced.

Note that with SamplingRatio set to 1.0, you ensure that the schema is correct, because we have scanned all the collection.

Hope I have clarified the question.

Thanks for your feedback!

@glytching
Copy link
Author

Hi @pmadrigal

Thanks for your help, I think I have enough detail to take it from here.

Rgds

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants