Apache Spark and Functional Programming

Vishal Chawla
4 min readMay 24, 2021

Apache Spark is (at the time of writing) the most popular “big” data processing library these days. While the Dataframe has a really nice DSL to express pipelined transformations, it does suffer from lack of type safety and is little too imperative IMO. The transformations tend to get hairy and cumbersome to read and reason about especially if there are multiple intermediate steps or operators.

Here I am presenting a way to decompose such pipelines (I use this term within the context of a single spark application having multiple transformations) so you get smaller steps: good for testing and improves readability. While most of my recent work is based on Spark Streaming, I use batch processing for a few things:

  • offline reports for customers that crunches bunch of data from S3/parquet (topic for another day), calculates thousands of new metrics, produces scorecards etc
  • apply imperative rules on sensor data to flag anomalies and aggregate these over a window. I will use this as a motivating example here.
  • reprocessing data in case of failures or late data in the streaming pipeline

I receive IoT sensor data. This is telemetry data which must first be aggregated into groups. The concept of a group is any physical equipment (such as a compressor) or could be logical/geographical (such as a building or campus) that would have hundreds of such physical equipment. On the other end we have “rules” that are customer-driven and could be expressed against a specific group type (such as building, compressor, chiller etc). These rules are nothing but patterns (think RETE-see my other post here) that are evaluated against observed data. If a rule matches, it generates and violation Each rule is expressing a physical condition such as temperature of air supplied to the room cannot be lower than the temperature of chilled water if the cooling mode is OFF for any 15 minute window. This is a rule that is applicable to a building (a group as defined above) as it will reason about metrics that physically belong to different equipment. This is a gross simplification just to illustrate the shape of the rule.

Model

case class Metric(metricid: String, event_time: Timestamp, metricvalue: Double)
case class GroupType(template:String, equipment:String)
object GroupType {
val building = GroupType("building", "building")
val ahu = GroupType("AHU", "AHU")
}
case class Rule(id:String, model:String, assertion:String, applicableTo:GroupType)
case class Group(id:Long, groupType:GroupType, metrics:List[Metric])
case class GroupMetadata(id:Long, meta:Map[String,String])
case class MetricGroupMapping(id:String, groupId:Int)
case class GroupAnomaly(groupid:Long, matchedRule:Rule):
Dataset[Group]

There are other pieces of this, but keeping it basic to get the point across. Basically, the input to the spark application is the following:

  • Dataset[Metric]
  • Dataset[MetricGroupMapping] since a metric could belong to multiple groups
  • Dataset[Group]
  • Dataset[GroupMetadata]
  • Dataset[Rule]

The spark application needs to join metrics to groups and the metadata so that the raw metrics are augmented and then discover the rules applicable to the group based on the groupType and apply the rules. The job of actually evaluating the rules is left to the RETE network.

class RuleApplication(spark:SparkSession) {
import spark.implicits._


def applyRuleLogic(metrics:Dataset[Metric], metricGroupMapping: Dataset[MetricGroupMapping], groups:Dataset[Group], metadata:Dataset[GroupMetadata], rules:Dataset[Rule]) = {
metrics
.joinWith(metricGroupMapping, metrics("metricid") === metricGroupMapping("id"), "inner")
.joinWith(groups, $"_1.groupId" === groups("id"), "inner")
.joinWith(metadata, $"_1.id" === metadata("id"))
.map{ case(((metric, metricGroup), group), groupMetadata) => (metric, metricGroup, group, groupMetadata)}
.joinWith(rules, $"_3.groupType" === rules("applicableTo"))
.map(r =>
//execute the rules and create anomalies...

GroupAnomaly(r._1._3.id, r._2))

}
}

Again, leaving out many low level details but you will agree this is ugly.

Refinement 1

As any developer would tell you, breaking up large chunks into smaller, bite-sized chunks is good and especially true with FP. So, this turned into

class RuleApplicationV2(spark: SparkSession) {
import RuleApplicationV2._



def applyRuleLogic(metrics:Dataset[Metric], metricGroupMapping: Dataset[MetricGroupMapping], groups:Dataset[Group], metadata:Dataset[GroupMetadata], rules:Dataset[Rule]) = {

val metricsMappedToGroups = metricsToGroupsJoin(metrics,metricGroupMapping)(spark)
val attachGroup = mappedMetricsToGroupJoin(metricsMappedToGroups, groups)
}

}
object RuleApplicationV2 {
private def metricsToGroupsJoin(metrics: Dataset[Metric], metricGroupMapping: Dataset[MetricGroupMapping])(spark: SparkSession): Dataset[(Metric, MetricGroupMapping)] = {
import spark.implicits._
metrics.joinWith(metricGroupMapping, $"")
}
def mappedMetricsToGroupJoin(metricsMappedToGroups: Dataset[(Metric, MetricGroupMapping)], groups: Dataset[Group]):Dataset[((Metric, MetricGroupMapping), Group)] = ???
}

We defined type aliases for the tuples being passed around — this makes the function signatures more compact. But not entirely satisfying. The types align except the pesky sparkSession, so function composition possible, maybe?

Refinement 2 (Enter Cats)

Cat (egories) are like potato chips, you can never have just one

Refinement 1 shows that the smaller functions compose except having to pass around SparkSession What if we could create a pipeline of functions and magically materialize SparkSession dependency when we need to actually run the function instead of having to pass it. If this doesn’t sound like the Reader monad, I don’t know what does.

I’ve been using Cats already, and provides the Reader monad which I used.

import cats.data.Reader
class RuleApplicationV3(spark: SparkSession) {
import RuleApplicationV3._



def applyRuleLogic(metrics:Dataset[Metric], metricGroupMapping: Dataset[MetricGroupMapping], groups:Dataset[Group], metadata:Dataset[GroupMetadata], rules:Dataset[Rule]) = {

metricsToGroupsJoin(metrics,metricGroupMapping)
.flatMap(mappedMetricsToGroupJoin(_, groups))
.flatMap(...)
.run(spark)
}

}
object RuleApplicationV3 {
private def metricsToGroupsJoin(metrics: Dataset[Metric], metricGroupMapping: Dataset[MetricGroupMapping])(spark: SparkSession): Dataset[(Metric, MetricGroupMapping)] =
Reader[SparkSession, MetricWithGroup]{
import spark.implicits._
metrics.joinWith(metricGroupMapping, $"")
}
def mappedMetricsToGroupJoin(metricsMappedToGroups: Dataset[(Metric, MetricGroupMapping)], groups: Dataset[Group]):Dataset[((Metric, MetricGroupMapping), Group)] = Reader[SparkSession, MetricWithItsGroup]
}

Each function now returns a Reader which “reads” SparkSession to produce the specified type. This creates a pipeline of functions and the dependency SparkSession is provided only when it is needed i.e. when the pipeline is triggered. Much better from a readability POV and easier to reason about.

Other Approaches

  • implicit SparkSession, but somewhat less satisfying to me
  • I didn’t really try it, but Frameless looks interesting

--

--

Vishal Chawla

I am a recovering entrepreneur. I dabble in everything data related and am fluent in databricks, dbt, scala, spark, kafka,