Solve the unexpected end of stream exception thrown by jedis

tonight the Internet is off. I haven’t fixed it for a long time. Write a super short post, and then take a rest as soon as possible.

has a Flink program that reads data from Kafka, summarizes the calculation indicator by the 1-minute scrolling window, and writes the result to Redis, that is, writes once every 1 minute. Although the Kafka Source inputs a huge amount of data, the result is only a few hundred kilobytes per minute. After a few hours of running, the following exception is thrown:

redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
        at redis.clients.util.RedisInputStream.ensureFill(
        at redis.clients.util.RedisInputStream.readByte(
        at redis.clients.jedis.Protocol.process(
// 以下略...

is then restarted frequently, and after up to five minutes the same exception is thrown repeatedly, rendering the data completely unwritable. Redis is standalone version 3.2.10, and Jedis is a version 2.9.0 built into the Flink-Redis connector provided by Bahir.

tracing the Jedis source code according to the exception stack in the figure does not yield any useful information. After various googles, find three search directions:

  1. client output buffer is full;
  2. Jedis instance is operated concurrently by multiple threads;
  3. connection idle for a long time disconnected by the server.

take a look at the buffer Settings:

> config get client-output-buffer-limit
1) "client-output-buffer-limit"
2) "normal 0 0 0 slave 268435456 67108864 60 pubsub 33554432 8388608 60"

has no limit at all. And even if you do, output of a few hundred K per minute is not a bottleneck at all. The multithreading problem also doesn’t exist, because RedisSink’s parallelism was set to 1 in the code in the first place. So there’s only a third possibility left, take a look at the timeout setting in Redis.conf.

> config get timeout
1) "timeout"
2) "120"

120 seconds is pretty short. Look at the structure when we use connector FlinkJedisPoolConfig instances where I used to, the answer is RedisCommandsContainerBuilder:

    public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
        Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();

        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
        return new RedisContainer(jedisPool);

shows that the JedisPool does not monitor idle connection at all. Because concurrency is so low, as the program runs, all connections in JedisPool gradually become idle and do not get cleaned up, and an exception occurs. So be sure to poke fun at the Redis connector first.

has to be changed. There are three ways to modify the idea:

  1. set maxIdle to 0. It is too simple and crude. It treats the symptoms rather than the root causes.

  2. directly adds the configuration items of testOnBorrow and testOnReturn to the FlinkJedisPoolConfig class code, that is, when getting and returning connections from JedisPool, the validity of the connections is tested, and the failed connections are cleaned up. There are two more pings, but there is no bottleneck if you only write one batch of data per minute.

  3. replaces GenericObjectPoolConfig with JedisPoolConfig in the above code. The code for JedisPoolConfig is simple:

public class JedisPoolConfig extends GenericObjectPoolConfig {
  public JedisPoolConfig() {
    // defaults to make your life with connection pool easier :)

is simple, but the four parameters it defaults to are really ‘make our life easier’ :

  • testWhileIdle: turn on idle connection detection;
  • minEvictableIdleTimeMillis: JedisPool connection in the free time threshold, when this threshold is reached, idle connections will be removed. Redis defaults to 30 minutes, which is too long, so JedisPoolConfig defaults to 1 minute;
  • timeBetweenEvictionRunsMillis: testing free connection cycle, it is 30 seconds;
  • numTestsPerEvictionRun: how many connections are taken for each test. If set to -1, all links are detected.

Of course we could also add an idle connection detection parameter to the GenericObjectPoolConfig used by flinkjejedispoolconfig, but that wouldn’t be as convenient as using JedisPoolConfig directly. This approach is more universal because the overhead associated with testOnBorrow and testOnReturn can be significant at times of high concurrency.

people then good night good night.

Read More: