[Solved] Flink1.12 integrate Hadoop 3.X error: java.lang.RuntimeException…

Flink1.12 integration Hadoop 3. X error reporting

Due to the needs of the project, it is necessary to carry out rule early warning for a period of time, so first install the Flink cluster and Hadoop on the alicloud server of the project, but when running the official website example, an error is directly reported. The specific information is as follows:

Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:309)
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:76)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:316)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:231)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:495)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:347)
	at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291)
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256)
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
	at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
	... 4 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:491)
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64)
	at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:313)
	... 19 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
	at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
	... 24 more

Only later did Baidu find that it needed a jar package to bridge Flink and HDFS, so it put the two jar packages into the Lib of Flink.

Then, when starting, an error will still be reported, and the information is as follows:

[root@k8s-master flink-1.12.0]# bin/flink run  examples/batch/WordCount.jar --input hdfs://k8s-master:9000/test/word.txt --output hdfs://k8s-master:9000/test/result.txt  --parallelism 2
Job has been submitted with JobID 058a2b79197c7cdb1dd04269fcc2a0b6

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 058a2b79197c7cdb1dd04269fcc2a0b6)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 058a2b79197c7cdb1dd04269fcc2a0b6)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.client.program.ContextEnvironment.getJobExecutionResult(ContextEnvironment.java:112)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:76)
        at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
        ... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 058a2b79197c7cdb1dd04269fcc2a0b6)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:119)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:379)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:117)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:491)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
        at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:218)
        at org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:161)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:209)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
        ... 8 more

Since I install the Flink cluster here, I can package the two jars in SCP to the other two nodes and they can run normally.

Read More: