diff --git a/doc/src/site/sphinx/First_Steps.rst b/doc/src/site/sphinx/First_Steps.rst index a7e0b8a..979ef04 100644 --- a/doc/src/site/sphinx/First_Steps.rst +++ b/doc/src/site/sphinx/First_Steps.rst @@ -105,6 +105,8 @@ Configuration parameters +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | readPreference | "nearest" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ +| readPreferenceTags | "taga:1,tagb:2" | No | ++-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | language | "en" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | connectTimeout | "10000" | No | diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala index 03482de..3731d10 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala @@ -174,7 +174,7 @@ object MongodbClientFactory { val builder = new MongoClientOptions.Builder() .readPreference(extractValue[String](clientOptions, ProviderReadPreference) match { - case Some(preference) => parseReadPreference(preference) + case Some(preference) => parseReadPreference(preference, extractValue[String](clientOptions, ReadPreferenceTags).getOrElse(DefaultReadPreferenceTags)) case None => DefaultReadPreference }) .connectTimeout(extractValue[String](clientOptions, ConnectTimeout).map(_.toInt).getOrElse(DefaultConnectTimeout)) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala index 27a4b26..e3bf910 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala @@ -17,7 +17,8 @@ package com.stratio.datasource.mongodb.config import com.mongodb.casbah.Imports._ -import com.mongodb.{MongoClientOptions => JavaMongoClientOptions} +import com.mongodb.{MongoClientOptions => JavaMongoClientOptions, TagSet, Tag} +import scala.collection.JavaConversions._ import com.stratio.datasource.util.Config._ /** @@ -31,6 +32,7 @@ object MongodbConfig { val Collection = "collection" val SSLOptions = "sslOptions" val ReadPreference = "readPreference" + val ReadPreferenceTags = "readPreferenceTags" val ConnectTimeout = "connectTimeout" val ConnectionsPerHost = "connectionsPerHost" val MaxWaitTime = "maxWaitTime" @@ -51,6 +53,7 @@ object MongodbConfig { // List of parameters for mongoClientOptions val ListMongoClientOptions = List( ReadPreference, + ReadPreferenceTags, ConnectionsPerHost, ConnectTimeout, MaxWaitTime, @@ -68,6 +71,7 @@ object MongodbConfig { // Default MongoDB values val DefaultMongoClientOptions = new JavaMongoClientOptions.Builder().build() val DefaultReadPreference = com.mongodb.casbah.ReadPreference.Nearest + val DefaultReadPreferenceTags = "" val DefaultConnectTimeout = DefaultMongoClientOptions.getConnectTimeout val DefaultConnectionsPerHost = DefaultMongoClientOptions.getConnectionsPerHost val DefaultMaxWaitTime = DefaultMongoClientOptions.getMaxWaitTime @@ -129,17 +133,26 @@ object MongodbConfig { } } + /** + * Parse one key to the associated readPreferenceTags + * @param readPreferenceTags string key for identify the correct object + * @return TagSet object + */ + def parseReadPreferenceTags(readPreferenceTags: String): TagSet = { + return new TagSet( readPreferenceTags.split(",").map(_.split(":")).filter(_.length == 2).map( kv => new Tag(kv(0),kv(1)) ).toList ) + } + /** * Parse one key to the associated readPreference * @param readPreference string key for identify the correct object * @return readPreference object */ - def parseReadPreference(readPreference: String): ReadPreference = { + def parseReadPreference(readPreference: String, readPreferenceTags: String): ReadPreference = { readPreference.toUpperCase match { case "PRIMARY" => com.mongodb.casbah.ReadPreference.Primary - case "SECONDARY" => com.mongodb.casbah.ReadPreference.Secondary - case "NEAREST" => com.mongodb.casbah.ReadPreference.Nearest - case "PRIMARYPREFERRED" => com.mongodb.casbah.ReadPreference.primaryPreferred + case "SECONDARY" => com.mongodb.casbah.ReadPreference.secondary(parseReadPreferenceTags(readPreferenceTags)) + case "NEAREST" => com.mongodb.casbah.ReadPreference.nearest(parseReadPreferenceTags(readPreferenceTags)) + case "PRIMARYPREFERRED" => com.mongodb.casbah.ReadPreference.primaryPreferred(parseReadPreferenceTags(readPreferenceTags)) case "SECONDARYPREFERRED" => com.mongodb.casbah.ReadPreference.SecondaryPreferred case _ => com.mongodb.casbah.ReadPreference.Nearest }