In Flink computing, some common operations are map or flatmap. After some data, keyby opens a window to calculate. So what are the operators in these calculations?
There are two types of operators.
Incremental aggregation includes reduce and aggregate operators, and total aggregation includes apply and process operators. So today we will mainly explain the commonly used incremental aggregation operator, aggregate operator
three types of aggregate method signature & lt; Data source type, accumulator type, output type & gt
the four types of window function method signature are < IN, OUT, KEY, W extends Window>
Step 1: convert datastream to windowedstream
// Reading data from kafka
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
// Performing window aggregation on data
val aggStream: DataStream[ItemViewCount] = inputStream
.filter(_.behavior == "pv") // filter out the pv data
.keyBy(_.itemId)
.timeWindow(Time.hours(1), Time.minutes(5)) // open window for statistics
.aggregate(new CountAgg(), new WindowCountResult()) // Aggregate the count of the current item in the time window
Step 2: user defined aggregate function
// A custom pre-aggregation function that adds one to the data
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
// The add method is an accumulator method, here is the simplest +1 operation
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
//initialize the accumulator value
override def createAccumulator(): Long = 0L
// finally return the value, here is the accumulator
override def getResult(accumulator: Long): Long = accumulator
// partition processing of the summation operation, here all the results of the summation process are added together
override def merge(a: Long, b: Long): Long = a + b
}
Part 3: custom window function
// Custom window function
class WindowCountResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
//Long type Key is the return value of the custom accumulator in the previous step
//Window is the difference to your button type, no window type in the first step, TimeWindow
//input is the type of data received, in this case the Long type iterator
//out is the type returned by this method, here is the set of ItemViewCount sample class objects
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
// Call the constructor of the ItemViewCount sample class object to construct the ItemViewCount sample class in turn and return
out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))
}
}
Read More:
- Mybatis uses step-by-step lazy loading to cause abnormal JSON conversion. The interface 500 reports an error
- Dataframe groupby custom aggregate function
- OpenGL step pit record
- Appium step pit summary — solution
- Myeclipse10: one step error report solution for cracking replacejar
- Deploy mongodb fragment combined with replica set to realize distributed storage of MySQL database files (step 10)
- Step on the pit — error reported by sqoop tool.ExportTool : Error during export
- Failed: error connecting to db server: server returned error on SASL authentication step: Authentica
- error: aggregate value used where an integer was expected
- How to solve the problem of error reporting in Flink
- Flink 1.1 error: no executorfactory found to execute the application
- org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph
- Python installation tutorial
- Openpyxl foreign tutorial, written very well
- Ubuntu 12.04 installation and use of airtrack ng tutorial
- PIP3 upgrade tutorial of Python 3.5.2 under ubantu16
- NVIDIA aegis handheld machine root tutorial
- The tutorial and difference of glew, glfw, glad and freeglut
- Photoshop CS6 detailed installation of graphic tutorial