-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switch from Java to Scala Spark API #57
Comments
Would this change result in new dependencies? |
New Cabal or Java packages? We already depend on scala libs indirectly via Spark (scala is the implementation language for Spark), so I don't think we would have any new dependencies. |
Indeed, we would probably just make the dependency on scala (and its standard library I guess?) direct as opposed to transitive. |
Only the standard library. Nothing else.
|
@mboes Just a little note on your initial post here before I forget, but it's really a detail more than anything else: it would be nice to avoid using inline-java in sparkle for now for the sake of supporting ghc 7.10. |
Didn't mean to imply that inline-java would be required. It was merely a more convenient notation for the sake of discussion than using |
@mboes How much effort is required for this change? Would many more tests be required to verify that this is done correctly? |
@mboes @facundominguez Can you give an estimate about how much effort is required for this change? |
@robinbb 1-2 days I'd say. |
@mboes Thank you for the estimate. Next question: would this create a build-time dependency on Scala that is not now there? |
@robinbb it would turn an indirect dependency on the scala standard library (via Spark) into a direct one. So no new dependencies overall. |
@mboes Understood. Seems like a worthy change to make. |
Per @mboes request, here's a summary of my attempt to add a binding to <U> Dataset<U> mapPartitions(MapPartitionsFunction<T,U> f, Encoder<U> encoder) Since we work with dataframes, we need this instantiated at It is my understanding that in Scala this encoder would have been implicit, and nobody needs to worry about it. But in Java world we need to provide it explicitly, and nobody seems to say how. There are some mysterious references to Encoder<Row> enc = RowEncoder.apply(ds.schema());
Dataset<Row> split = ds.mapPartitions(iter -> new Split(iter), enc); (where newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.Encoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.Encoder")
getRowEncoder :: StructType -> IO RowEncoder
getRowEncoder st =
callStatic (sing :: Sing "org.apache.spark.sql.catalyst.encoders.RowEncoder") "apply" [coerce st] but that yielded the dreaded newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder") now makes mapPartitionsDF :: Closure (Iterator Row -> Iterator Row)
-> DataFrame -> IO DataFrame
mapPartitionsDF fun df = do
RowEncoder enc <- getRowEncoder =<< schema df
let enc' :: J ('Class "org.apache.spark.sql.Encoder")
enc' = unsafeCast enc
jfun <- reflect (HaskellMapPartitionsFunction fun)
call df "mapPartitions" [coerce jfun, coerce enc'] This now finally works. Well, actually, still getting an exception but I think it's at least finding and invoking the method. |
I just tested a proof of concept of this idea: https://gist.github.com/mboes/1f31da7e1859371ce5ab74b51397c492 It seems to work! |
Note that once we move to the dataset API we'll have a potentially more challenging Implicit to deal with: |
Do you mean |
Using inline-java slows down the compilation of sparkle, but is safer because we can thus get the benefit of *both* type checkers (Java and Haskell). In fact the extra safety isn't just theoretical: this patch also includes a fix to the binding for `treeAggregate`, which was supplying arguments in the wrong order. This is preliminary work ahead of implementing #57, which we can do serenely from the moment that the type checkers have our back. This patch only switches over RDD for now. The rest can come later.
We currently bind the Java API for Spark. @alpmestan likely remembers the rationale better than me. I assume it was mostly a choice by default, and because the Java API was more straightforward to bind, because it doesn't refer to "non-standard" language specific types such as
scala.Tuple2
andscala.Function1
. And more importantly, because the Java API doesn't expose Scala-specific implicit arguments in function signatures, when these implicits become entirely explicit like any other arguments when calling said function from other JVM languages.However, there are downsides to the Java API:
FlatMapFunction
is not a subtype ofFunction
. There is alsoForeachFunction
,PairFunction
, etc. Since these are unrelated, it means that we can't straightforwardly write a uniformReflect (Closure (a -> b)) JFun1
instance. The Scala API OTOH only hasFunction1
,Function2
etc.** there are no void methods: all methods return a value.
** pairs are packed into tuple objects, the way one would expect in Haskell.
HadoopRDD < RDD < Object
, in Java we haveJavaHadoopRDD < JavaPairRDD < Object
. BothJavaHadoopRDD
andJavaPairRDD
both reimplement wrappers that are implementations of the Java-specificJavaRDDLike
interface.If we move to the Scala API, we could tighten the codomain of
Uncurry
: theProc
code for void methods would no longer be needed. But really the main goal is: all function objects would be treated uniformly according to their arities, hence allowing us to bindRDD.mapPartition
,RDD.foreach
etc without requiring overlapping instances or newtype wrappers around the closures.As a side-effect, we'd have slightly less overhead, since we'd be calling into the Scala methods directly, rather than trampolining through their respective Java wrappers first.
So how do we deal with implicits? The only implicits in the API are evidence for
Ordering
constraints andClassTag
. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generateClassTag
evidence on demand, inside the binding. Same goes forOrdering
evidence. ForClassTag
, creating evidence goes something like this:This isn't a huge change besides: pretty much only
RDD.hs
andClosure.hs
source files would need to change. The other bindings would stay largely intact.cc @alpmestan @robinbb @dcoutts
The text was updated successfully, but these errors were encountered: