Spring cloud Netflix hystrix/acutor/hystrix.stream Error ut005023

Error phenomenon

UT005023: Exception handling request to /actuator/ hystrix.stream
Issues on GitHub

Solution

Looking at the GitHub code, we find that this problem has been fixed on the master branch, but it is not fixed in the latest jar package 1.5.18 of Maven central warehouse.
Solution 1: I created a jar package to cover the original jar package
I created a 1.5.19 jar package and uploaded it to the company’s private Maven warehouse

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
      <groupId>com.netflix.hystrix</groupId>
      <artifactId>hystrix-metrics-event-stream</artifactId>
      <version>1.5.19</version>
</dependency>      

Solution 2:
use jetty container instead, because org.eclipse.jetty . server.ResponseWriter Synchronization lock is added in all methods. Note that I didn’t practice here. It’s better to speak based on facts

Analyze the reasons

Hystrix metrics event stream 1.5.18 source code HYS trixSampleSseServlet.java
The reason for the problem is: observeon( Schedulers.io ()) a sub thread is set up to print the monitoring data, and the main thread prints Ping in the while loop, Both threads do the same writer writer.print () and writer.checkError () operation. Different thread operations result in output buffer exception. The code solution on the master branch is to add a synchronized (responsewritelock) lock.

 sampleSubscription = sampleStream
                        .observeOn(Schedulers.io())
                        .subscribe(new Subscriber<String>() {
                            @Override
                            public void onCompleted() {
                                logger.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
                                moreDataWillBeSent.set(false);
                            }

                            @Override
                            public void onError(Throwable e) {
                                moreDataWillBeSent.set(false);
                            }

                            @Override
                            public void onNext(String sampleDataAsString) {
                             // Here is a sub-thread of RxJava constantly printing data
                                if (sampleDataAsString != null) {
                                    writer.print("data: " + sampleDataAsString + "\n\n");
                                    // explicitly check for client disconnect - PrintWriter does not throw exceptions
                                    if (writer.checkError()) {
                                        moreDataWillBeSent.set(false);
                                    }
                                    writer.flush();
                                }
                            }
                        });
                // Here is the main thread doing a while loop, constantly printing pings
                while (moreDataWillBeSent.get() && !isDestroyed) {
                    try {
                        Thread.sleep(pausePollerThreadDelayInMs);
                        //in case stream has not started emitting yet, catch any clients which connect/disconnect before emits start
                        writer.print("ping: \n\n");
                        // explicitly check for client disconnect - PrintWriter does not throw exceptions
                        if (writer.checkError()) {
                            moreDataWillBeSent.set(false);
                        }
                        writer.flush();
                    } catch (InterruptedException e) {
                        moreDataWillBeSent.set(false);
                    }
                }

Why won’t exceptions occur when using jetty containers

because org.eclipse.jetty . server.ResponseWriter All methods have synchronization locks on them.
the key codes are as follows:

package org.eclipse.jetty.server;

public class ResponseWriter extends PrintWriter {
    
    public boolean checkError() {
        synchronized(this.lock) {
            return this._ioException != null || super.checkError();
        }
    }

    public void flush() {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.flush();
            }
        } catch (Throwable var4) {
            this.setError(var4);
        }

    }

    public void write(int c) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(c);
            }
        } catch (InterruptedIOException var5) {
            LOG.debug(var5);
            Thread.currentThread().interrupt();
        } catch (IOException var6) {
            this.setError(var6);
        }

    }

    public void write(char[] buf, int off, int len) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(buf, off, len);
            }
        } catch (InterruptedIOException var7) {
            LOG.debug(var7);
            Thread.currentThread().interrupt();
        } catch (IOException var8) {
            this.setError(var8);
        }

    }

    public void write(String s, int off, int len) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(s, off, len);
            }
        } catch (InterruptedIOException var7) {
            LOG.debug(var7);
            Thread.currentThread().interrupt();
        } catch (IOException var8) {
            this.setError(var8);
        }

    }
}

writer.flush() this line of code can theoretically be deleted in Tomcat and undertow containers

Because in writer.checkError The () method has already been called this.flush () method org.apache.catalina.connector.CoyoteWriter#checkError

package org.apache.catalina.connector;

public class CoyoteWriter extends PrintWriter {
  
    public boolean checkError() {
        this.flush();
        return this.error;
    }
}

io.undertow.servlet.spec.ServletPrintWriterDelegate#checkError

package io.undertow.servlet.spec;

public final class ServletPrintWriterDelegate extends PrintWriter {
   
    public boolean checkError() {
        return this.servletPrintWriter.checkError();
    }
}

public class ServletPrintWriter {
    
    public boolean checkError() {
        this.flush();
        return this.error;
    }
}

Compiling pit with hystrix source code

1. The project must be a git project. Packages downloaded directly from GitHub cannot be compiled. Git init is required to be the GIT directory initially. It is recommended to directly git clone [email protected] :Netflix/ Hystrix.git
2. I used gradle4.0
3 nebula.netflixoss ’Change the version to ‘4.1.0’
4. Use Alibaba’s Maven warehouse
the following is my build.gradle

buildscript {
    repositories {
         maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    }
    dependencies {
         classpath 'com.netflix.nebula:gradle-extra-configurations-plugin:3.0.3'
    }
}

plugins {
    id 'nebula.netflixoss' version '4.1.0'
    id 'me.champeau.gradle.jmh' version '0.3.1'
    id 'net.saliman.cobertura' version '2.2.8'
}

ext {
    githubProjectName = rootProject.name
    project.version='1.5.19'
}

allprojects {
    repositories {
         maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    }

    apply plugin: 'net.saliman.cobertura'
}

subprojects {
    apply plugin: 'nebula.netflixoss'
    apply plugin: 'java'
    apply plugin: 'nebula.provided-base'
	apply plugin: 'nebula.compile-api'
	
    sourceCompatibility = 1.8
    targetCompatibility = 1.8



    group = "com.netflix.${githubProjectName}"

    eclipse {
        classpath {
            // include 'provided' dependencies on the classpath
            plusConfigurations += [configurations.provided]
            downloadSources = true
            downloadJavadoc = true
        }
    }

    idea {
        module {
            // include 'provided' dependencies on the classpath
            scopes.COMPILE.plus += [configurations.provided]
        }
    }
}

mvn install和mvn deploy
mvn install:install-file
-Dfile=\data\hystrix\hystrix-contrib\hystrix-metrics-event-stream\build\libs\hystrix-metrics-event-stream-1.5.19.jar
-DgroupId=com.netflix.hystrix
-DartifactId=hystrix-metrics-event-stream
-Dversion=1.5.19
-Dpackaging=jar
mvn deploy:deploy-file
-Dfile=\data\hystrix\hystrix-contrib\hystrix-metrics-event-stream\build\libs\hystrix-metrics-event-stream-1.5.19.jar
-DgroupId=com.netflix.hystrix
-DartifactId=hystrix-metrics-event-stream
-Dversion=1.5.19
-Dpackaging=jar
-DrepositoryId=custom-releases
-Durl=http://host:port/nexus/content/repositories/custom_releases

Read More: