Tag Archives: Good

[Solved] Flink Error: Flink Hadoop is not in the classpath/dependencies

Error background:

When installing the Flink on yarn cluster, the Flink cluster cannot be started.

Version:

flink-1.14.6

hadoop-3.2.3

org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617) [flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59) [flink-dist_2.12-1.14.6.jar:1.14.6]
Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (hdfs:/flink/ha/default)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:92) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:532) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
	at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:55) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more

The reason for the error
Flink needs two jar package dependencies to access HDFS. Flink does not have them, so it needs to be put in by itself.

  1. flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
  2. commons-cli-1.5.0.jar

Solution:

Directly search the Maven warehouse for these two jar packages and download them: https://mvnrepository.com/

Put the jar package in the /flink/lib directory.

[Solved] flicksql cdc mysql to kafka Connect Error: org.apache.flink.table.api.ValidationException…

Error Messages: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'debezium-json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.


Check if there is any package that I forgot to import
I didn’t import the flink-json package here

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

The import is successful and Kafka can be connected normally~

[Solved] org.apache.flink.client.program.ProgramInvocationException: The main method caused an error

After the flick task starts the checkpoint and sets the status backend, the task is submitted for operation. The above errors occur. The specific errors are as follows:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
  at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
  at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
  at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)
  at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
  at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
  at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
  ... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
  at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
  at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: xxxxx
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
  ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: xxxxx
Caused by: java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
  at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
  at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at java.lang.Thread.run(Thread.java:748)


After checking the yarn log, it may be the reason for the jar package conflict. After noting the dependency on Hadoop in the POM file, resubmit and run successfully.

[Solved] Flink TableAPI Error — > Class cannot be found

Console error:

"C:\Program Files\Java\jdk1.8.0_201\bin\java.exe" "-javaagent:D:\ideaSoftware\IntelliJ IDEA 2020.2.3\lib\idea_rt.jar=52126:D:\ideaSoftware\IntelliJ IDEA 2020.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_201\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_201\jre\lib\rt.jar;D:\Conding_pojoect\MyFlink\target\classes;D:\MyTools\repository\org\apache\flink\flink-clients_2.12\1.9.3\flink-clients_2.12-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-core\1.9.3\flink-core-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-annotations\1.9.3\flink-annotations-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-metrics-core\1.9.3\flink-metrics-core-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-7.0\flink-shaded-asm-6-6.2.1-7.0.jar;D:\MyTools\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyTools\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyTools\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyTools\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyTools\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyTools\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;D:\MyTools\repository\org\apache\flink\flink-runtime_2.12\1.9.3\flink-runtime_2.12-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-queryable-state-client-java\1.9.3\flink-queryable-state-client-java-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-hadoop-fs\1.9.3\flink-hadoop-fs-1.9.3.jar;D:\MyTools\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-netty\4.1.32.Final-7.0\flink-shaded-netty-4.1.32.Final-7.0.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-jackson\2.10.1-9.0\flink-shaded-jackson-2.10.1-9.0.jar;D:\MyTools\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;D:\MyTools\repository\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyTools\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyTools\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyTools\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyTools\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyTools\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyTools\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyTools\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyTools\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyTools\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyTools\repository\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyTools\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyTools\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyTools\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyTools\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyTools\repository\org\apache\flink\flink-optimizer_2.12\1.9.3\flink-optimizer_2.12-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-java\1.9.3\flink-java-1.9.3.jar;D:\MyTools\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyTools\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyTools\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyTools\repository\org\apache\flink\force-shading\1.9.3\force-shading-1.9.3.jar;D:\MyTools\repository\org\projectlombok\lombok\1.18.2\lombok-1.18.2.jar;D:\MyTools\repository\org\apache\flink\flink-streaming-java_2.12\1.9.3\flink-streaming-java_2.12-1.9.3.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-guava\18.0-7.0\flink-shaded-guava-18.0-7.0.jar;D:\MyTools\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyTools\repository\org\apache\bahir\flink-connector-redis_2.11\1.0\flink-connector-redis_2.11-1.0.jar;D:\MyTools\repository\org\apache\flink\flink-streaming-java_2.11\1.2.0\flink-streaming-java_2.11-1.2.0.jar;D:\MyTools\repository\org\apache\flink\flink-runtime_2.11\1.2.0\flink-runtime_2.11-1.2.0.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-hadoop2\1.2.0\flink-shaded-hadoop2-1.2.0.jar;D:\MyTools\repository\org\tukaani\xz\1.0\xz-1.0.jar;D:\MyTools\repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;D:\MyTools\repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;D:\MyTools\repository\commons-net\commons-net\3.1\commons-net-3.1.jar;D:\MyTools\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;D:\MyTools\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;D:\MyTools\repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;D:\MyTools\repository\commons-el\commons-el\1.0\commons-el-1.0.jar;D:\MyTools\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;D:\MyTools\repository\com\jamesmurty\utils\java-xmlbuilder\0.4\java-xmlbuilder-0.4.jar;D:\MyTools\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;D:\MyTools\repository\commons-configuration\commons-configuration\1.7\commons-configuration-1.7.jar;D:\MyTools\repository\commons-digester\commons-digester\1.8.1\commons-digester-1.8.1.jar;D:\MyTools\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;D:\MyTools\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;D:\MyTools\repository\org\apache\avro\avro\1.7.7\avro-1.7.7.jar;D:\MyTools\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;D:\MyTools\repository\com\jcraft\jsch\0.1.42\jsch-0.1.42.jar;D:\MyTools\repository\commons-beanutils\commons-beanutils-bean-collections\1.8.3\commons-beanutils-bean-collections-1.8.3.jar;D:\MyTools\repository\commons-daemon\commons-daemon\1.0.13\commons-daemon-1.0.13.jar;D:\MyTools\repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;D:\MyTools\repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;D:\MyTools\repository\javax\activation\activation\1.1\activation-1.1.jar;D:\MyTools\repository\io\netty\netty-all\4.0.27.Final\netty-all-4.0.27.Final.jar;D:\MyTools\repository\com\data-artisans\flakka-actor_2.11\2.3-custom\flakka-actor_2.11-2.3-custom.jar;D:\MyTools\repository\com\data-artisans\flakka-remote_2.11\2.3-custom\flakka-remote_2.11-2.3-custom.jar;D:\MyTools\repository\io\netty\netty\3.8.0.Final\netty-3.8.0.Final.jar;D:\MyTools\repository\org\uncommons\maths\uncommons-maths\1.2.2a\uncommons-maths-1.2.2a.jar;D:\MyTools\repository\com\data-artisans\flakka-slf4j_2.11\2.3-custom\flakka-slf4j_2.11-2.3-custom.jar;D:\MyTools\repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;D:\MyTools\repository\com\github\scopt\scopt_2.11\3.2.0\scopt_2.11-3.2.0.jar;D:\MyTools\repository\com\fasterxml\jackson\core\jackson-core\2.7.4\jackson-core-2.7.4.jar;D:\MyTools\repository\com\fasterxml\jackson\core\jackson-databind\2.7.4\jackson-databind-2.7.4.jar;D:\MyTools\repository\com\fasterxml\jackson\core\jackson-annotations\2.7.0\jackson-annotations-2.7.0.jar;D:\MyTools\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;D:\MyTools\repository\org\slf4j\slf4j-log4j12\1.6.1\slf4j-log4j12-1.6.1.jar;D:\MyTools\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;D:\MyTools\repository\jline\jline\0.9.94\jline-0.9.94.jar;D:\MyTools\repository\junit\junit\3.8.1\junit-3.8.1.jar;D:\MyTools\repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;D:\MyTools\repository\org\apache\flink\flink-clients_2.11\1.2.0\flink-clients_2.11-1.2.0.jar;D:\MyTools\repository\org\apache\flink\flink-optimizer_2.11\1.2.0\flink-optimizer_2.11-1.2.0.jar;D:\MyTools\repository\org\apache\sling\org.apache.sling.commons.json\2.0.6\org.apache.sling.commons.json-2.0.6.jar;D:\MyTools\repository\redis\clients\jedis\2.8.0\jedis-2.8.0.jar;D:\MyTools\repository\org\apache\commons\commons-pool2\2.3\commons-pool2-2.3.jar;D:\MyTools\repository\org\apache\flink\flink-statebackend-rocksdb_2.12\1.10.1\flink-statebackend-rocksdb_2.12-1.10.1.jar;D:\MyTools\repository\com\data-artisans\frocksdbjni\5.17.2-artisans-2.0\frocksdbjni-5.17.2-artisans-2.0.jar;D:\MyTools\repository\mysql\mysql-connector-java\8.0.25\mysql-connector-java-8.0.25.jar;D:\MyTools\repository\com\google\protobuf\protobuf-java\3.11.4\protobuf-java-3.11.4.jar;D:\MyTools\repository\org\apache\flink\flink-table-planner_2.12\1.10.1\flink-table-planner_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-table-common\1.10.1\flink-table-common-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-shaded-asm-7\7.1-9.0\flink-shaded-asm-7-7.1-9.0.jar;D:\MyTools\repository\org\apache\flink\flink-table-api-java-bridge_2.12\1.10.1\flink-table-api-java-bridge_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-table-api-scala-bridge_2.12\1.10.1\flink-table-api-scala-bridge_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-scala_2.12\1.10.1\flink-scala_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-streaming-scala_2.12\1.10.1\flink-streaming-scala_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-table-planner-blink_2.12\1.10.1\flink-table-planner-blink_2.12-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-table-api-java\1.10.1\flink-table-api-java-1.10.1.jar;D:\MyTools\repository\org\apache\flink\flink-table-api-scala_2.12\1.10.1\flink-table-api-scala_2.12-1.10.1.jar;D:\MyTools\repository\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyTools\repository\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyTools\repository\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyTools\repository\org\apache\flink\flink-table-runtime-blink_2.12\1.10.1\flink-table-runtime-blink_2.12-1.10.1.jar;D:\MyTools\repository\org\codehaus\janino\janino\3.0.9\janino-3.0.9.jar;D:\MyTools\repository\org\codehaus\janino\commons-compiler\3.0.9\commons-compiler-3.0.9.jar;D:\MyTools\repository\org\apache\calcite\avatica\avatica-core\1.15.0\avatica-core-1.15.0.jar;D:\MyTools\repository\org\reflections\reflections\0.9.10\reflections-0.9.10.jar" Com.ZQQQ.TableAPI.Example
21:39:36,292 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
21:39:36,293 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/configuration/ReadableConfig
	at org.apache.flink.table.planner.StreamPlannerFactory.create(StreamPlannerFactory.java:49)
	at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:120)
	at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112)
	at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:83)
	at Com.ZQQQ.TableAPI.Example.main(Example.java:35)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.ReadableConfig
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 5 more

Process finished with exit code 1

Reason for the error.
It is caused by the inconsistency between the Flink version and the tableAPI version.
Solution.
Change all the pom.xml that involve flink version to the same version.

I configure the pom.xml file as follows

  <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
	
        <!--TableAPI-->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>	

[Solved] Flink jdbc Error: Access Denied for user ‘root‘@‘10.0.0.x‘ (using password: YES)

Error message:

2021-12-31 11:02:51.955 [Source: TableSourceScan(table=[[default_catalog, default_database, jdbc_source_table, project=[id]]], fields=[id]) -> Calc(select=[id, id AS id0, id AS id1]) -> Sink: Sink(table=[default_catalog.default_database.jdbc_upsert_sink_table], fields=[id, id0, id1]) (1/1)#1] WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, jdbc_source_table, project=[id]]], fields=[id]) -> Calc(select=[id, id AS id0, id AS id1]) -> Sink: Sink(table=[default_catalog.default_database.jdbc_upsert_sink_table], fields=[id, id0, id1]) (1/1)#1 (85391b7c05a5282974f4da4cbb62f38e) switched from INITIALIZING to FAILED with failure cause: java.io.IOException: unable to open JDBC writer
	at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:129)
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:60)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Access denied for user 'root'@'10.0.0.23' (using password: YES)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
	at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121)
	at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)
	... 14 more

How to Solve:

The connector with parameter has incorrect connection information. The table field type in the database does not match the definition in the Flink SQL, resulting in table connection failure

Flink SQL contains aggregation operators Error: you cannot print directly

Cannot print directly when Flink SQL contains aggregation operators

Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[category], orderBy=[sales DESC], select=[category, sales])
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:355)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTrai 

reason:

In normal circumstances, toappendstream is used by default for table stream conversion, while the aggregation operation involves delete operation, which can not be satisfied by appendstream alone. Therefore, retractstream or upsertstream are considered.

Solution:

Use tableenvironment Toretractstream() for output
for example:

Table table = tEnv.sqlQuery(
                        "SELECT\n" +
                        "    userName,\n" +
                        "    product,\n" +
                        "    amount\n" +
                        "FROM\n" +
                        "    orders,\n" +
                        "    user_table\n" +
                        "WHERE\n" +
                        "    orders.userId = user_table.userId");
Table table = tEnv.sqlQuery("SELECT userId,sum(amount) as boughtSum " +
                            "FROM orders group by userId");
tEnv.toRetractStream(table, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
})).print();

[Solved] The main method caused an error: Could not deploy Yarn job cluster.

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster.

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.

Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.

Diagnostics from YARN: Application application_1640140324841_0003 failed 1 times (global limit =4; local limit is =1) due to AM Container for appattempt_1640140324841_0003_000001 exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-12-22 11:23:34.422]Exception from container-launch.
Container id: container_e44_1640140324841_0003_01_000001
Exit code: 1
Shell output: main : command provided 1
main : run as user is etl_admin
main : requested yarn user is etl_admin
Getting exit code file…
Creating script paths…
Writing pid file…
Writing to tmp file /data1/yarn/nm/nmPrivate/application_1640140324841_0003/container_e44_1640140324841_0003_01_000001/container_e44_1640140324841_0003_01_000001.pid.tmp
Writing to cgroup task files…
Creating local dirs…
Launching container…

 

Solution:
Look is the flink version of the idea is 1.11.0, the flink version on the cluster is 1.13.1
Directly upgrade the flink version in the idea to 1.13.1, done!

[Solved] PhoenixParserException:ERROR 602 (42P00): Syntax error. Missing ‘EOF’

Error:
org.apache.phoenix.exception.PhoenixParserException: ERROR 602 (42P00): Syntax error. Missing “EOF” at line 1, column 36.

Codes:

public class DimUtil {
    public static JSONObject readDimFromPhoenix(Connection conn, String tableName, Long id) {
        String sql = "select * from " + tableName + "where id=?";
        Object[] args = {id.toString()};
        //Get the query result and return
        List<JSONObject> list = JdbcUtil.queryList(conn, sql, args, JSONObject.class);
        return list.size()==1?list.get(0):new JSONObject();
    }
}

Error analysis:

Solution:

Just add a space in front of where.

org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph

An error is reported when the Flink SQL client submits the job:

2021-10-21 15:23:54,232 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-10-21 15:23:54,233 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-10-21 15:23:54,291 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm2
2021-10-21 15:23:54,337 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface n101:36989 of application 'application_1634635118307_0001'.
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

Failing over to rm2

Only active namenode can be submitted successfully. My active namenode is RM2

[Solved] java.lang.noclassdeffounderror when idea runs Flink: org/Apache/flick/API/common/executionconfig

Solution:
change provided to compile ,for example:

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>compile</scope>
		</dependency>

Click POM. XML refresh in idea to refresh

[Solved] Flink Error: Cannot resolve method addSource

The error report is inexplicable, but in fact, the dependency package imported by idea has multiple options, and it is wrong.

The required dependencies are as follows:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

An error is reported when the file in hive parquet format is written in the Flink connection

Version: cdh6.3.2
flick version: 1.13.2
CDH hive version: 2.1.1

Error message:

java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
	at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.<init>(ParquetRowDataBuilder.java:55) ~[flink-parquet_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:124) ~[flink-parquet_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56) ~[flink-parquet_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory.create(FileSystemTableSink.java:624) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at StreamExecCalc$35.processElement(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[flink-connector-kafka_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) ~[flink-connector-kafka_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-connector-kafka_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[flink-connector-kafka_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
2021-08-15 10:45:37,863 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job e8f0af4bb984507ec9f69f07fa2df3d5
2021-08-15 10:45:37,865 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-08-15 10:45:37,866 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-08-15 10:45:37,867 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph 

According to the guidelines given on the official website of Flink:
add the Flink parquet dependency package and parquet-hadoop-1.11.1.jar and parquet-common-1.11.1.jar packages. The above error still exists and the specified construction method cannot be found.

reason:

In CDH hive version: the version in parquet-hadoop-bundle.jar is inconsistent with that in Flink parquet.

**

resolvent:

**
1. Because the Flink itself has provided the Flink parquet package and contains the corresponding dependencies, it is only necessary to ensure that the dependencies provided by the Flink are preferentially loaded when the Flink task is executed. Flink parquet can be packaged and distributed with the code
2. Because the package versions are inconsistent, you can consider upgrading the corresponding component version. Note that you can’t simply adjust the version of parquet-hadoop-bundle.jar. After viewing it from Maven warehouse, there are no available packages to use. And: upgrade the version of hive or reduce the version of Flink.