Tag Archives: aggregate function

To customize the aggregate function of Flink (Step-by-Step Tutorial)

In Flink computing, some common operations are map or flatmap. After some data, keyby opens a window to calculate. So what are the operators in these calculations?

There are two types of operators.

Incremental aggregation includes reduce and aggregate operators, and total aggregation includes apply and process operators. So today we will mainly explain the commonly used incremental aggregation operator, aggregate operator
three types of aggregate method signature & lt; Data source type, accumulator type, output type & gt
the four types of window function method signature are < IN, OUT, KEY, W extends Window>

Step 1: convert datastream to windowedstream


        // Reading data from kafka
        val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
            .map(data => {
                val dataArray = data.split(",")
                UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
            })
            .assignAscendingTimestamps(_.timestamp * 1000L)
        
        // Performing window aggregation on data
        val aggStream: DataStream[ItemViewCount] = inputStream
            .filter(_.behavior == "pv") // filter out the pv data
            .keyBy(_.itemId)
            .timeWindow(Time.hours(1), Time.minutes(5)) // open window for statistics
            .aggregate(new CountAgg(), new WindowCountResult()) // Aggregate the count of the current item in the time window

Step 2: user defined aggregate function

// A custom pre-aggregation function that adds one to the data
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
    // The add method is an accumulator method, here is the simplest +1 operation
    override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
    //initialize the accumulator value
    override def createAccumulator(): Long = 0L
    // finally return the value, here is the accumulator
    override def getResult(accumulator: Long): Long = accumulator
    // partition processing of the summation operation, here all the results of the summation process are added together
    override def merge(a: Long, b: Long): Long = a + b
}

Part 3: custom window function

// Custom window function
class WindowCountResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
    //Long type Key is the return value of the custom accumulator in the previous step
    //Window is the difference to your button type, no window type in the first step, TimeWindow
    //input is the type of data received, in this case the Long type iterator
    //out is the type returned by this method, here is the set of ItemViewCount sample class objects
    override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
        // Call the constructor of the ItemViewCount sample class object to construct the ItemViewCount sample class in turn and return
        out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))
    }
}