Flink SQL contains aggregation operators Error: you cannot print directly

Cannot print directly when Flink SQL contains aggregation operators

Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[category], orderBy=[sales DESC], select=[category, sales])
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:355)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTrai 

reason:

In normal circumstances, toappendstream is used by default for table stream conversion, while the aggregation operation involves delete operation, which can not be satisfied by appendstream alone. Therefore, retractstream or upsertstream are considered.

Solution:

Use tableenvironment Toretractstream() for output
for example:

Table table = tEnv.sqlQuery(
                        "SELECT\n" +
                        "    userName,\n" +
                        "    product,\n" +
                        "    amount\n" +
                        "FROM\n" +
                        "    orders,\n" +
                        "    user_table\n" +
                        "WHERE\n" +
                        "    orders.userId = user_table.userId");
Table table = tEnv.sqlQuery("SELECT userId,sum(amount) as boughtSum " +
                            "FROM orders group by userId");
tEnv.toRetractStream(table, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
})).print();

Read More: