Apache Spark UDF: Over Optimization Issue

What is Apache Spark UDF

User-Defined Function (aka UDF) is a feature of Apache Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL's DSL for transforming Dataset/DataFrame.

Defining a UDF:


You define a new UDF by defining a function (could be Scala/Python depending upon the underline language) as an input parameter of UDF function. For example:
 
import spark.implicits._
val squared = (s: Long) => {s * s}
val squared_udf = spark.udf.register("square", squared)

Once it is registered, it can be used as:
 
 val df = spark.range(10)
df.withColumn("s",squared_udf($"id")).show()

UDF Optimization

UDFs are black-box, Spark does not optimize the code written inside the UDF. Look at the execution plan:
 
val df1 = df.withColumn("s",squared_udf($"id"))
df1.explain()

As in the plan above, the UDF is defined as SpecializedUDF:square. The code inside the UDF is not part of optimization.

UDF Optimization: Issue

This optimization does not provide any issue, till the UDF is simple and the UDF columns are not used in any other column projection or anywhere else in the pipeline. Let's continue with the example above and see what will be the execution plan if the column s is used to project column half as:
 
import org.apache.spark.sql.functions._
df.withColumn("s",squared_udf($"id")).withColumn("half",expr("s/2")).explain
As in the plan above, the UDF is called at every place where the column s is used. Even when the column is used for filtering, the UDF is also called there as well. Let check the plan below.
 
df.withColumn("s",squared_udf($"id")).filter("s < 10").explain

UDF Optimization Issue: Solution

The solution to this issue is to cache the DataFrame as soon as the UDF is applied. Look at the followings:
 
df.withColumn("s",squared_udf($"id")).cache.withColumn("half",expr("s/2")).explain
When using filters:
 
df.withColumn("s",squared_udf($"id")).cache.filter("s < 10").explain
As per the new execution plan in both cases, the UDF is resolved first, then an InMemoryRelation is created. After this InMemoryRelation, the UDF is not called again.

Comments

Popular posts from this blog

Using Entity Framework 6 Code First with Oracle 11g

Dapper.Net One-To-Many mapping