Sparks RDD is Invariant, Write generic types to make it Covariant

What is Spark?

Apache Spark is a library that let you write code, that will be executed in a distributed fashion, as a simple single-threaded program. Spark will make the dirty job for you: it will distribute the code and the execution, manage remote objects for you, optimize the algorithm for you. However, be aware: there is no kind of magic behind Spark.


Spark distributed data structures

One of the core concepts of Apache Spark is the way it represents distributed data structures. Resilient Distributed Dataset, a.k.a. RDD is the type that is used by Spark to work with data in a distributed way. RDD[T] is an invariant generic type and in its simplest form can be seen as a distributed implementation of a List.


RDDs are “Invariant” in nature

If you’ve written reusable code that uses Spark’s RDD API, you might have run into headaches related to variance. The RDD is an invariant API, meaning that RDD[T] and RDD[U] are unrelated types if T and U are different types – even if there is a subtyping relation between T and U.

Lets write some Scala trait and some concrete class, like these:

trait Card { val card_no:Long }

case class Payment(override val card_no: Long,Date:String,amount:Double) extends Card 

Let’s write a function operating on an RDD of any type that is a subtype of your Card trait, like this:

def paymentOnCard(r:RDD[Card]) = r.map(p => (p.card_no,DateTime.now))

Let’s apply it to a concrete RDD of some type that is a subtype of Card:

val paymentRDD = spark.sparkContext.parallelize(List(
      Payment(1,"2019", 1000),
      Payment(2, "2019", 2000)
    ))

paymentOnCard(paymentRDD) // compile time error

Unfortunately, this code isn’t that useful, because RDDs are invariant and will fail to compile due to the type mismatch.


What Next ?

Use Scala’s generic types that only accepts RDDs of some concrete type that is a subtype of Card, like this:

def paymentOnCard[T](r:RDD[T])(implicit convert: T => Card) =
    r.map(c => (convert(c).card_no,DateTime.now))

 val paymentRDD = spark.sparkContext.parallelize(List(
      Payment(1,"2019", 1000),
      Payment(2, "2019", 2000)
    ))

paymentOnCard(paymentRDD) // compile successfully

By using generics in this way can achieve a covariant implementation. what we’re doing here is using Scala’s implicit to implement typeclasses so we can support typesafe ad hoc polymorphism.


In conclusion:

The RDD is invariant in nature, but you can still do useful generic programming with use of Scala’s implcit conversions.


Comments