Tag Archives: Good.

[Solved] Flink Error: Flink Hadoop is not in the classpath/dependencies

Error background:

When installing the Flink on yarn cluster, the Flink cluster cannot be started.

Version:

flink-1.14.6

hadoop-3.2.3

org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617) [flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59) [flink-dist_2.12-1.14.6.jar:1.14.6]
Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (hdfs:/flink/ha/default)
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:92) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:532) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
	at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:55) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[flink-dist_2.12-1.14.6.jar:1.14.6]
	... 2 more

The reason for the error
Flink needs two jar package dependencies to access HDFS. Flink does not have them, so it needs to be put in by itself.

  1. flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
  2. commons-cli-1.5.0.jar

Solution:

Directly search the Maven warehouse for these two jar packages and download them: https://mvnrepository.com/

Put the jar package in the /flink/lib directory.

[Solved] Flink jdbc Error: Access Denied for user ‘root‘@‘10.0.0.x‘ (using password: YES)

Error message:

2021-12-31 11:02:51.955 [Source: TableSourceScan(table=[[default_catalog, default_database, jdbc_source_table, project=[id]]], fields=[id]) -> Calc(select=[id, id AS id0, id AS id1]) -> Sink: Sink(table=[default_catalog.default_database.jdbc_upsert_sink_table], fields=[id, id0, id1]) (1/1)#1] WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, jdbc_source_table, project=[id]]], fields=[id]) -> Calc(select=[id, id AS id0, id AS id1]) -> Sink: Sink(table=[default_catalog.default_database.jdbc_upsert_sink_table], fields=[id, id0, id1]) (1/1)#1 (85391b7c05a5282974f4da4cbb62f38e) switched from INITIALIZING to FAILED with failure cause: java.io.IOException: unable to open JDBC writer
	at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:129)
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:60)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Access denied for user 'root'@'10.0.0.23' (using password: YES)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
	at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121)
	at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)
	... 14 more

How to Solve:

The connector with parameter has incorrect connection information. The table field type in the database does not match the definition in the Flink SQL, resulting in table connection failure