Tag Archives: flume

[Solved] Failed loading positionFile: /opt/module/flume/taildir_position.json

Error Message: 2022-03-22 15:55:57,000 (lifecycleSupervisor-1-0) [ERROR – org.apache.flume.source.taildir.ReliableTaildirEventReader.loadPositionFile(ReliableTaildirEventReader.java:147)] Failed loading positionFile: /opt/module/flume/taildir_position.json

use flume1.9 tildir source, memory channel, hdfs sink to write a configuration file, use in hdfs no file into, and then I look at the log file, found the above error


My profile

Profile:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/taildir_position.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*log.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*file.*


# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/test2/%Y%m%d/%H
#Prefix of the uploaded file
a3.sinks.k3.hdfs.filePrefix = upload-
# whether to scroll the folder according to time
a3.sinks.k3.hdfs.round = true
# how many time units to create a new folder
a3.sinks.k3.hdfs.roundValue = 1
# redefine the units of time
a3.sinks.k3.hdfs.roundUnit = hour
# whether to use the local timestamp
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# accumulate how many Event before flush to HDFS once
a3.sinks.k3.hdfs.batchSize = 100
# set the file type, can support compression
a3.sinks.k3.hdfs.fileType = DataStream
# how often to generate a new file
a3.sinks.k3.hdfs.rollInterval = 600
# set the scrolling size of each file is about 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# file scrolling and the number of Event has nothing to do with
a3.sinks.k3.hdfs.rollCount = 0
#Minimum number of redundancies
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

Solution:

Modify
a3.sources.r3.positionFile = /opt/module/flume/taildir_position.json,
to
a3.sources.r3.positionFile = /opt/module/flume/taildir.json

a3.sources = r3
a3.sinks = k3
a3.channels = c3

a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/taildir_position.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*log.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*file.*


# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/test2/%Y%m%d/%H
#Prefix of the uploaded file
a3.sinks.k3.hdfs.filePrefix = upload-
# whether to scroll the folder according to time
a3.sinks.k3.hdfs.round = true
# how many time units to create a new folder
a3.sinks.k3.hdfs.roundValue = 1
# redefine the units of time
a3.sinks.k3.hdfs.roundUnit = hour
# whether to use the local timestamp
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# accumulate how many Event before flush to HDFS once
a3.sinks.k3.hdfs.batchSize = 100
# set the file type, can support compression
a3.sinks.k3.hdfs.fileType = DataStream
# how often to generate a new file
a3.sinks.k3.hdfs.rollInterval = 600
# set the scrolling size of each file is about 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# file scrolling and the number of Event has nothing to do with
a3.sinks.k3.hdfs.rollCount = 0
#Minimum number of redundancies
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

[Solved] Error: java.io.EOFException: Premature EOF from inputStream

Solve the problem of error: java.io.eofexception: precondition EOF from InputStream

1. Question

1. Problem process

During the log parsing task, an error is reported suddenly, and the task is always very stable. How can an error be reported suddenly?A tight heart

2. Detailed error type:

Check the log and find the following errors

21/11/18 14:36:29 INFO mapreduce.Job: Task Id : attempt_1628497295151_1290365_m_000002_2, Status : FAILED
Error: java.io.EOFException: Premature EOF from inputStream
	at com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:75)
	at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:114)
	at com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:54)
	at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:83)
	at com.hadoop.mapreduce.LzoSplitRecordReader.initialize(LzoSplitRecordReader.java:58)
	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1907)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

The error is queried through a search engine, and the result points to the upper limit of the dfs.datanode.max.transfer.threads parameter, such as
https://blog.csdn.net/zhoujj303030/article/details/44422415

Viewing the cluster configuration, it is found that the parameter is modified to 8192. Check other problems.

Later, it was found that there was an LZO empty file in the log file. After deletion, the task was executed again and successfully.

2. Solution

To prevent the above problems from happening again, write a script to delete LZO empty files before performing the parsing task

1. Traverse the files under the specified path

for file in `hdfs dfs -ls /xxx/xxx/2037-11-05/pageview | sed '1d;s/  */ /g' | cut -d\  -f8`;
do  
	echo $file; 
done

Result output:

/xxx/xxx/2037-11-05/pageview/log.1631668209557.lzo
/xxx/xxx/2037-11-05/pageview/log.1631668211445.lzo

2. Judge whether the file is empty

for file in `hdfs dfs -ls /xxx/xxx/2037-11-05/pageview | sed '1d;s/  */ /g' | cut -d\  -f8`;
do  
	echo $file; 
	lzoIsEmpty=$(hdfs dfs -count $file | awk '{print $3}')
	echo $lzoIsEmpty;
	if [[ $lzoIsEmpty -eq 0 ]];then 
		# is empty, delete the file
		hdfs dfs -rm $file;
	else
		echo "Loading data"
	fi
done

3. Final script

for type in webclick error pageview exposure login
do
    isEmpty=$(hdfs dfs -count /xxx/xxx/$do_date/$type | awk '{print $2}')
    if [[ $isEmpty -eq 0 ]];then 
        echo "------ Given Path:/xxx/xxx/$do_date/$type is empty" 
    else 
		for file in `hdfs dfs -ls /xxx/xxx/$do_date/$type | sed '1d;s/  */ /g' | cut -d\  -f8`;
		do  
			echo $file; 
			lzoIsEmpty=$(hdfs dfs -count $file | awk '{print $3}')
			echo $lzoIsEmpty;
			if [[ $lzoIsEmpty -eq 0 ]];then 
				echo Delete Files: $file
				hdfs dfs -rm $file;
			fi
		done
		
		echo ================== Import log data of type $do_date $type into ods layer ==================
		... Handling log parsing logic
   fi
done

Flume receives an error when a single message is too large

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

Java client tuning: Max_ REQUEST_ SIZE_ CONFIG

Kafka server side adjustment: message. Max. bytes = 2147483640 is close to the max of int, because the maximum range of this value is int

Special note: after the parameter is adjusted, it has no effect on the created topic
adjust the parameter of the created topic: set to the maximum integer value of 2147483647

bin/kafka-configs.sh --zookeeper localhost:2181  --alter --topic topic_name   --add-config  max.message.bytes=2147483647

Problem solving
 

[Solved] Flume Error: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

flume error: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError:org/apache/hadoop/conf/Configuration at org.apache.flume.sink.hdfs.HDFSEventSink.getCodec(HDFSEventSink.java:324)Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
The above error occurs because the server needs to be configured with: environment variables for Hadoop.

NettyAvroRpcClient RPC connection error

2014-12-19 01:05:42,141 (lifecycleSupervisor-1-1) [WARN – org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink. java:294 )] Unable to create Rpc client using hostname: xxx.xxx.xxx.xxx, port: 41100
org.apache.flume.FlumeException: NettyAvroRpcClient { host: 121.41.49.51, port: 41100 }: RPC connection error

This problem occurs when flume uses Avro to accept data.

First, let’s see if the port of the connected server is monitored

If you want to send data to port 4383 of 192.168.1.1, you need a server listening to this window, otherwise RPC connection failure will occur

Flume monitors a single append file in real time

1) case requirements: real-time monitoring Hive logs, and uploaded to the HDFS
2) requirement analysis:

3) implementation steps:
1. The Flume to the data output to the HDFS, Hadoop-related JAR packages must be copied to /opt/module/ Flume /lib folder.

2. Create the flume – file – HDFS. Conf file

create file
note: if you want to read the files in the Linux system, have to be in accordance with the rules of the Linux command execute the command. Since Hive logs are in Linux, the type of file to be read is selected :exec means execute. Means to execute a Linux command to read a file.

add the following content


3. Run the Flume

4. To open the Hadoop and the Hive and Hive produces log

5. View files on HDFS.