To solve your problem, you can use a custom aggregate function over a window
First, you need to create your custom aggregate function. An aggregate function is defined by an accumulator (a buffer
), that will be initialized (zero
value) and updated when treating a new row (reduce
function) or encountering another accumulator (merge
function). And at the end, the accumulator is returned (finish
function)
In your case, accumulator should keep two pieces of information:
- Current category of employees
- Sum of salaries of previous employees belonging to the current category
To store those information, you can use a Tuple (Int, Int)
, with first element is current category and second element the sum of salaries of previous employees of current category:
- You initialize this tuple with
(0, 0)
.
- When you encounter a new row, if the sum of previous salaries and salary of current row is over 80, you increment category and reinitialize previous salaries' sum with salary of current row, else you add salary of current row to previous salaries' sum.
- As you will be using a window function, you will sequentially treat rows so you don't need to implement merge with another accumulator.
- And at the end, as you only want the category, you return only the first element of the accumulator.
So we get the following aggregator implementation:
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
object Labeler extends Aggregator[Int, (Int, Int), Int] {
override def zero: (Int, Int) = (0, 0)
override def reduce(catAndSum: (Int, Int), salary: Int): (Int, Int) = {
if (catAndSum._2 + salary > 80)
(catAndSum._1 + 1, salary)
else
(catAndSum._1, catAndSum._2 + salary)
}
override def merge(catAndSum1: (Int, Int), catAndSum2: (Int, Int)): (Int, Int) = {
throw new NotImplementedError("should be used only over a windows function")
}
override def finish(catAndSum: (Int, Int)): Int = catAndSum._1
override def bufferEncoder: Encoder[(Int, Int)] = Encoders.tuple(Encoders.scalaInt, Encoders.scalaInt)
override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
Once you have your aggregator, you transform it to a spark aggregate function using udaf
function.
You then create your window over all dataframe and ordered by salary and apply your spark aggregate function over this window:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val labeler = udaf(Labeler)
val window = Window.orderBy("salary")
val result = dataframe.withColumn("category", labeler(col("salary")).over(window))
Using your example as input dataframe, you get the following result dataframe:
+--------+------+--------+
|employee|salary|category|
+--------+------+--------+
|Emp1 |10 |0 |
|Emp2 |20 |0 |
|Emp3 |30 |0 |
|Emp4 |35 |1 |
|Emp5 |36 |1 |
|Emp6 |50 |2 |
|Emp7 |70 |3 |
+--------+------+--------+
dataFrame.select($”Employee”, $”salary”, assignACategory($"Employee”, $”salary" ))
work for you?dataFrame.select($”Employee”, $”salary”, assignACategory($"Employee”, $”salary" )).filter($”salary” < 80)