Cannot print directly when Flink SQL contains aggregation operators
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[category], orderBy=[sales DESC], select=[category, sales])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:355)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTrai
reason:
In normal circumstances, toappendstream is used by default for table stream conversion, while the aggregation operation involves delete operation, which can not be satisfied by appendstream alone. Therefore, retractstream or upsertstream are considered.
Solution:
Use tableenvironment Toretractstream() for output
for example:
Table table = tEnv.sqlQuery(
"SELECT\n" +
" userName,\n" +
" product,\n" +
" amount\n" +
"FROM\n" +
" orders,\n" +
" user_table\n" +
"WHERE\n" +
" orders.userId = user_table.userId");
Table table = tEnv.sqlQuery("SELECT userId,sum(amount) as boughtSum " +
"FROM orders group by userId");
tEnv.toRetractStream(table, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
})).print();
Read More:
- [Solved] SyntaxError: Missing parentheses in call to ‘print‘. Did you mean print(e)?
- Proguard Packaging Error: You have to specify ‘-keep‘ options if you want to write out kept elements with ‘-print
- [Solved] Flink Error: Flink Hadoop is not in the classpath/dependencies
- [Solved] flink Write Files Error: lang.NoClassDefFoundError: org/apache/flink/api/common/typeinfo/TypeInformation
- Keras’ print model error: Failed to import pydot. You must install pydot and graphviz for `pydotprint` to work.
- Elasticsearch 6.2.3 version executes aggregation error Fielddata is disabled on text fields by default
- [Solved] Flink jdbc Error: Access Denied for user ‘root‘@‘10.0.0.x‘ (using password: YES)
- [Solved] Flink Hadoop is not in the classpath/dependencies
- [Solved] Flink Error: Cannot resolve method addSource
- [Solved] java.lang.noclassdeffounderror when idea runs Flink: org/Apache/flick/API/common/executionconfig
- [Solved] pagehelper Error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘LIMIT 1’
- [Solved] Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation
- django Internal server error 500 Modify source code to view print errors
- After SpringBoot starts, exit the console directly and display Process finished with exit code 1
- Mybatis Error: Cause: java.sql.SQLException: sql injection violation, syntax error: syntax error, expect EQ
- Git Error: please tell me who you are [How to Solve]
- [Solved] Sparksql error: Exception in thread “main” org.apache.spark.sql.catalyst.errors.package$TreeNodeException
- Java.sql.SQLException: ORA-02291: integrity constraint violated – par
- [Solved] This application has no explicit mapping for /error, so you are seeing this as a fallback
- SQL Server Error: Arithmetic overflow error converting expression to data type int.