Tag Archives: Kafka

kakfa Create topic Error: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.

Error message:

java. util. concurrent. ExecutionException: org. apache. kafka. common. errors. InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.

According to the error message, the number of copies of topic to be created is 3 and the number of brokers is 2. Indicates that Kafka did not get the stored brokers information in zookeeper.

Check:

Whether the Kafka cluster is enabled normally. At the same time, it should be noted that the number of topic replicas created cannot be greater than the number of surviving brokers in the cluster

Lsof – I check whether each Kafka node is normal

[Solved] “error_code“:500,“message“:“IO Error trying to forward REST request: java.net.ConnectException: Connection Refused

“error_code”:500,“message”:“IO Error trying to forward REST request: java.net.ConnectException: Connection refused”
An error is reported by executing the following command.
curl -u debezium:4a3s4d02234 http://debezium-001.optics.net:8083/connectors/mysql-optics-prod/restart -X POST
The error message is shown below.
{“error_code”:500, “message”: “IO Error trying to forward REST request: java. ConnectException: Connection refused”}
Solution: make sure the following parameters are set correctly.
rest.host.name=debezium-001.optics.netrest.port=8083

[Solved] Kafka in Windows error:java. nio. file. Filesystemexception: this file is in use by another program and cannot be accessed by the process

Problem description

Kafka deployed in Windows environment hangs up after running for a period of time
view logs logs/server Log the following errors are found:

 ERROR Failed to clean up log for __consumer_offsets-44 in dir D:\kafka\kafka_2.13-2.8.0\kafkakafka_2.13-2.8.0kafka-logs due to IOException (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: D:\kafka\kafka_2.13-2.8.0\kafkakafka_2.13-2.8.0kafka-logs\__consumer_offsets-44\00000000000000000000.timeindex.cleaned -> D:\kafka\kafka_2.13-2.8.0\kafkakafka_2.13-2.8.0kafka-logs\__consumer_offsets-44\00000000000000000000.timeindex.swap: 另一个程序正在使用此文件,进程无法访问。

	at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
	at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
	at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
	at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
	at java.nio.file.Files.move(Files.java:1395)
	at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:904)
	at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:210)
	at kafka.log.LazyIndex$IndexValue.renameTo(LazyIndex.scala:155)
	at kafka.log.LazyIndex.$anonfun$renameTo$1(LazyIndex.scala:79)
	at kafka.log.LazyIndex.renameTo(LazyIndex.scala:79)
	at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:496)
	at kafka.log.Log.$anonfun$replaceSegments$4(Log.scala:2402)
	at kafka.log.Log.$anonfun$replaceSegments$4$adapted(Log.scala:2402)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at kafka.log.Log.replaceSegments(Log.scala:2402)
	at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:613)
	at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:538)
	at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:537)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at kafka.log.Cleaner.doClean(LogCleaner.scala:537)
	at kafka.log.Cleaner.clean(LogCleaner.scala:511)
	at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380)
	at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352)
	at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
	at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
	Suppressed: java.nio.file.FileSystemException: D:\kafka\kafka_2.13-2.8.0\kafkakafka_2.13-2.8.0kafka-logs\__consumer_offsets-44\00000000000000000000.timeindex.cleaned -> D:\kafka\kafka_2.13-2.8.0\kafkakafka_2.13-2.8.0kafka-logs\__consumer_offsets-44\00000000000000000000.timeindex.swap: 另一个程序正在使用此文件,进程无法访问。

		at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
		at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
		at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
		at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
		at java.nio.file.Files.move(Files.java:1395)
		at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:901)
		... 20 more

reason

When the consumer offsets the log cleanup or the log file scrolls, Kafka broker will shut down because the file renaming fails. This is because cannot modify the file in use in windows.

Solution

In server In the properties configuration file:
1.Modify log retention. Hours = - 1
2.Add log at the end of the file cleaner. Enable = false
3.Restart

Suggestion

There are similar issues and PR in GitHub, but they are not merged. The author uses kafka_2.13-2.8.0 version has this problem. Kafka is not recommended for win environments.

[Solved] Python operate Kafka error: SyntaxError: invalid syntax

Python operation Kafka reports an error: return ‘& lt; SimpleProducer batch=%s>’ %self.async

return ‘< SimpleProducer batch=%s>’ % self. async
^^^^^
SyntaxError: invalid syntax

reason:

Because PY3 Async has become a keyword in 7. This leads to incompatibility.

Solution:

Method 1:

The latest Kafka version is used, but the Kafka on pypi has not been replaced with the latest one. You can upgrade Kafka Python using the following method
PIP install Kafka python

Method 2:

Just switch to version 3.6 and wait for subsequent upgrades

[Solved] error adding symbols: File in wrong format collect2: error: ld returned 1 exit status

Mqtt is required in a project CPP, compile mqtt on the Cambrian box cpp。 mqtt. CPP is dependent on mqtt C library and header files, I first compiled mqtt c. Then compile mqtt During CPP, use the following cmake command to specify mqtt C library and header file location

cmake -DPAHO_WITH_SSL=OFF -DPAHO_MQTT_C_LIBRARIES=/home/chw/mqtt/paho.mqtt.c-master/install/lib64/libpaho-mqtt3a.so  -DPAHO_MQTT_C_INCLUDE_DIRS=/home/chw/mqtt/paho.mqtt.c-master/install/include/ DCMAKE_INSTALL_PREFIX=../install ..

Then the following errors are reported during compilation

/home/chw/mqtt/paho.mqtt.c-master/install/lib64/libpaho-mqtt3a.so: error adding symbols: File in wrong format
collect2: error: ld returned 1 exit status
src/CMakeFiles/paho-mqttpp3.dir/build.make:99: recipe for target 'src/libpaho-mqttpp3.so.1.2.0' failed
make[2]: *** [src/libpaho-mqttpp3.so.1.2.0] Error 1
CMakeFiles/Makefile2:89: recipe for target 'src/CMakeFiles/paho-mqttpp3.dir/all' failed
make[1]: *** [src/CMakeFiles/paho-mqttpp3.dir/all] Error 2
Makefile:151: recipe for target 'all' failed
make: *** [all] Error 2

The first reaction to this error is that the format of the library linked by the LD command of arm is x86, so the prompt file in wrong format

Then I went to mqtt Under the install folder of C, use the file command to see the format of the library,

root@localhost:/home/chw/mqtt/paho.mqtt.c-master/install/lib64# file libpaho-mqtt3a.so.1.3.9
libpaho-mqtt3a.so.1.3.9: ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, not stripped

You can see that the format of this library is x86-64, and our Cambrian box is ARM architecture, so the format is not recognized. At this time, I suddenly found out how to mqtt There are lib and lib64 folders in the install folder of C, so I go to the Lib folder and use the file command to see the format.

root@localhost:/home/chw/mqtt/paho.mqtt.c-master/install/lib# file libpaho-mqtt3a.so.1.3.9
libpaho-mqtt3a.so.1.3.9: ELF 64-bit LSB shared object, ARM aarch64, version 1 (SYSV), dynamically linked, BuildID[sha1]=1875ae7550029980d276187e883c154aa43c7356, not stripped

I was pleasantly surprised to find that the Library under the Lib folder was in arm aarch64 format, so the cause of the problem was found. I used the Library under the lib64 folder when configuring cmake, so I used mqtt when configuring cmake The library path of C can be changed to Lib folder.

cmake -DPAHO_WITH_SSL=OFF -DPAHO_MQTT_C_LIBRARIES=/home/chw/mqtt/paho.mqtt.c-master/install/lib/libpaho-mqtt3a.so  -DPAHO_MQTT_C_INCLUDE_DIRS=/home/chw/mqtt/paho.mqtt.c-master/install/include/ DCMAKE_INSTALL_PREFIX=../install ..

Also: compile mqtt.com below During the CPP library, it was found that the compiled library was also x86. Later, it was found that the reason was that I directly compiled mqtt on X86 The CPP # project was copied and then re cmake. I didn’t download the source code from GitHub again. After cmake, the compiled CPP library is in aarch64 format.

Problem solved!

[Solved] Kafka Start Log Error: WARN Found a corrupted index file due to requirement failed: Corrupt index found

Go straight to the theme, and the personal test is effective!!

Error content

WARN Found a corrupted index file due to requirement failed: Corrupt index found   A corrupt index file was found and the request failed

WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/tmp/kafka-logs/__consumer_offsets-13/00000000000000000000.index) has non-zero size but the last offset is 0 which is no larger than the base offset 0.}. deleting /tmp/kafka-logs/__consumer_offsets-13/00000000000000000000.timeindex, /tmp/kafka-logs/__consumer_offsets-13/00000000000000000000.index, and /tmp/kafka-logs/__consumer_offsets-13/00000000000000000000.txnindex and rebuilding index... (kafka.log.Log)
[2021-11-27 10:02:24,168] INFO Recovering unflushed segment 0 in log __consumer_offsets-13. (kafka.log.Log)
[2021-11-27 10:02:24,168] INFO Loading producer state from offset 0 for partition __consumer_offsets-13 with message format version 2 (kafka.log.Log)
[2021-11-27 10:02:24,169] INFO Completed load of log __consumer_offsets-13 with 1 log segments, log start offset 0 and log end offset 0 in 2 ms (kafka.log.Log)
[2021-11-27 10:02:24,172] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/tmp/kafka-logs/test-0/00000000000000000000.index) has non-zero size but the last offset is 0 which is no larger than the base offset 0.}. deleting /tmp/kafka-logs/test-0/00000000000000000000.timeindex, /tmp/kafka-logs/test-0/00000000000000000000.index, and /tmp/kafka-logs/test-0/00000000000000000000.txnindex and rebuilding index... (kafka.log.Log)

Error reporting reason

The Kafka broker is abnormally closed and stopped, resulting in index corruption

Solutions

Step 1:   Enter the Kafka home directory of your installation and execute the following command to view the storage directory of Kafka data (Kafka log configuration)

vim config/server.properties

Step 2: directly delete the/TMP/Kafka LOHS folder

rm -rf /tmp/kafka-logs/

Result verification

Restart Kafka and check the log to verify that the problem has been solved

Check the Kafka log and confirm that the error has been resolved


To solve the problem, a problem is derived

deleting the Kafka data file is equivalent to starting Kafka for the first time. When you start up with the zookeeper provided by Kafka, you may report an error by viewing logs/zookeeper.out under the log information.

[Solved] ES Error: request contains unrecognized parameter [ignore_throttled]

Problem description

When using springboot to integrate es, the tools provided by springboot data are used. The specific dependencies are as follows:

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
 </dependency>

be careful,

I did not add other dependencies related to es; The version of ES is 6.5.4

Test code

In the newly created springboot project, after adding the above dependency, the configuration YML file is as follows:

spring:
  elasticsearch:
    rest:
      uris: "Server Address"
      read-timeout: "10s"

Add the test content in springboottest as follows:

package cn.smileyan.demo;

import java.io.IOException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;

import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


/**
 * @author smileyan
 */
@SpringBootTest
class ElasticsearchDemoApplicationTests {
    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testCreate() throws IOException {
        CreateIndexRequest request = new CreateIndexRequest("myindex2");
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        assert createIndexResponse.isAcknowledged();
    }

    @Test
    public void testDelete() throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest("myindex2");
        client.indices().delete(request, RequestOptions.DEFAULT);
    }
}

Note that running testcreate does not report an error, but testdelete reports an error.

ElasticsearchStatusException[Elasticsearch exception [type=illegal_argument_exception, reason=request [/myindex2] contains unrecognized parameter: [ignore_throttled]]
]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:176)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1933)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1910)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1667)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1624)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1594)
	at org.elasticsearch.client.IndicesClient.delete(IndicesClient.java:103)
	at cn.smileyan.demo.ElasticsearchDemoApplicationTests.testDelete(ElasticsearchDemoApplicationTests.java:39)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
	Suppressed: org.elasticsearch.client.ResponseException: method [DELETE], host [http://es.smileyan.cn:9200], URI [/myindex2?master_timeout=30s&ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=true&ignore_throttled=false&timeout=30s], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"request [/myindex2] contains unrecognized parameter: [ignore_throttled]"}],"type":"illegal_argument_exception","reason":"request [/myindex2] contains unrecognized parameter: [ignore_throttled]"},"status":400}
		at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:326)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:296)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:270)
		at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1654)
		... 69 more

Solution:

When starting springboot, please pay attention to the log entered:

as mentioned earlier, the version I use is 6.5.4, and the version conflicts.

The solution is very simple: reduce the version of springboot. My version is 2.2.13. Release, and then continue the test to solve all the problems.

Creating the index myindex2, and deleting the index can work normally

Kafka Creates and Writes Logs Error [How to Solve]

Kafka creates and writes logs with an error

Running bin\windows\kafka-server-start.bat config\server.properties reports an error when creating logs, but viewing the directory file has been created successfully. Error reason: no permission.

Solution: modify the folder permissions

Run bin\windows\kafka-server-start.bat config\server.properties to report an error when creating logs, but check that the directory file has been created successfully

Error reporting reason: no permission

Solution: modify folder permissions

How to Solve canal & MYSQL or “Kafka cannot consume data” Error

Error 1: interaction between canal and MySQL

Explanation: the essential reason is that the same IP generates too many interrupted database connections (exceeding the maximum value of max_connect_errors) in a short time

If the MySQL server continuously receives requests from the same host, and all these continuous requests are unsuccessful, the established connection will be interrupted. When the cumulative value of these continuous requests is greater than When you set the value of max_connect_errors, the MySQL server will block all subsequent requests from this host.

Solution:   Mysqladmin flush hosts – H 127.0.0.1 – uroot – P password

Error 2: Kafka cannot consume data

Reason: the number of partitions of the theme I created is insufficient. You can manually add the same number of partitions as those set in instance.properties in conf/example of canal

Execute the following command:

//1. View the subject details and the number of partitions

kafka-topics.sh –bootstrap-server hadoop102:9092 –describe –topic ODS_BASE_DB_C

//2. Manually add the number of partitions

kafka-topics.sh –bootstrap-server hadoop102:9092 –alter –partitions 4 –topic ODS_BASE_DB_C

KAFKA – ERROR Failed to write meta.properties due to (kafka.server.BrokerMetadataCheckpoint)

 
I set up Kafka source code read environment on Windows10, and encourted with error below, so what is wrong, I am a fresh man to learn kafka

[2021-04-29 19:57:42,957] ERROR Failed to write meta.properties due to (kafka.server.BrokerMetadataCheckpoint)
java.nio.file.AccessDeniedException: C:\Users\a\workspace\kafka\logs
    at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
    at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)
[2021-04-29 19:57:42,973] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.nio.file.AccessDeniedException: C:\Users\a\workspace\kafka\logs
    at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
    at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)

and another ERROR below

[2021-04-29 19:57:43,821] ERROR Error while writing to checkpoint file C:\Users\a\workspace\kafka\logs\recovery-point-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: C:\Users\a\workspace\kafka\logs
    at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
    at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)
[2021-04-29 19:57:43,823] ERROR Disk error while writing recovery offsets checkpoint in directory C:\Users\a\workspace\kafka\logs: Error while writing to checkpoint file C:\Users\a\workspace\kafka\logs\recovery-point-offset-checkpoint (kafka.log.LogManager)
[2021-04-29 19:57:43,833] ERROR Error while writing to checkpoint file C:\Users\a\workspace\kafka\logs\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: C:\Users\a\workspace\kafka\logs
    at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
    at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)
[2021-04-29 19:57:43,834] ERROR Disk error while writing log start offsets checkpoint in directory C:\Users\a\workspace\kafka\logs: Error while writing to checkpoint file C:\Users\a\workspace\kafka\logs\log-start-offset-checkpoint (kafka.log.LogManager)

I downgraded to the Kafka 2.8.1 version (Index of /kafka/2.8.1) and after that everything is working perfectly.
Share
Improve this answer

This is a common error when log retention happens.
Kafka doesn’t have good support for windows filesystem.
You can use WSL2 or Docker to work around these limitations – 

This seems to be a bug affecting some Kafka versions. Try installing kafka_2.12-2.8.1.tgz version. It will solve your problem.

I have tried using Kafka kafka_2.12-3.0.0.tgz and the same issue occurred. Seems 2.12-3.0.0 have issues with windows.
Try to downgrade the version to kafka_2.12-2.8.1.tgz. This resolved all issues and working fine.

TIDB-kafka server: Message was too large, server rejected it to avoid allocation error

1、 Background

Using drainer to synchronize to Kafka, an error is reported:

 ["fail to produce message to kafka, please check the state of kafka server"] [error="kafka: Failed to produce message to topic test-tidb: kafka server: Message was too large, server rejected it to avoid allocation error."]

2、 Settle

Error reason: if a large transaction is executed in tidb, the generated binlog data will be large, which may exceed the message size limit of Kafka.

Solution: you need to adjust the configuration parameters of Kafka cluster, as shown below.

message.max.bytes=1073741824
replica.fetch.max.bytes=1073741824
fetch.message.max.bytes=1073741824