Cannot print directly when Flink SQL contains aggregation operators
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[category], orderBy=[sales DESC], select=[category, sales])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:355)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTrai
reason:
In normal circumstances, toappendstream is used by default for table stream conversion, while the aggregation operation involves delete operation, which can not be satisfied by appendstream alone. Therefore, retractstream or upsertstream are considered.
Solution:
Use tableenvironment Toretractstream() for output
for example:
Table table = tEnv.sqlQuery(
"SELECT\n" +
" userName,\n" +
" product,\n" +
" amount\n" +
"FROM\n" +
" orders,\n" +
" user_table\n" +
"WHERE\n" +
" orders.userId = user_table.userId");
Table table = tEnv.sqlQuery("SELECT userId,sum(amount) as boughtSum " +
"FROM orders group by userId");
tEnv.toRetractStream(table, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
})).print();