1、 Background
Recently, I was working on a project to connect two es clusters, so I initialized two resthighlevelclient instances esclient and esclient1
package com.xxx.common.config; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EsConfig { @Value("${es.url}") private String url; @Value("${es.port}") private Integer port; @Value("${es.username}") private String username; @Value("${es.password}") private String password; @Value("${es.connection.timeout:30000}") private int connctionTimeout; @Value("${es.socket.timeout:60000}") private int socketTimeout; @Bean public RestHighLevelClient esClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } }
As a result, an error was reported during startup, as follows:
java.net.ConnectException: Connection refused at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestHealthIndicator.doHealthCheck(ElasticsearchRestHealthIndicator.java:60) at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99) at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77) at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468) at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829) at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) at sun.rmi.transport.Transport$1.run(Transport.java:200) at sun.rmi.transport.Transport$1.run(Transport.java:197) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:196) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) ... 1 common frames omitted
2、 Analyze the cause
Why only one client will not report an error?If two clients report an error through self-test, and the URL in the client is【 http://localhost:9200 】, with such questions, I read the source code of ES self-test once
ElasticsearchRestHealthIndicator
The main method is this
public ElasticsearchRestHealthIndicator(RestClient client) { super("Elasticsearch health check failed"); this.client = client; this.jsonParser = JsonParserFactory.getJsonParser(); } protected void doHealthCheck(Builder builder) throws Exception { Response response = this.client.performRequest(new Request("GET", "/_cluster/health/")); StatusLine statusLine = response.getStatusLine(); if (statusLine.getStatusCode() != 200) { builder.down(); builder.withDetail("statusCode", statusLine.getStatusCode()); builder.withDetail("reasonPhrase", statusLine.getReasonPhrase()); } else { InputStream inputStream = response.getEntity().getContent(); Throwable var5 = null; try { this.doHealthCheck(builder, StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8)); } catch (Throwable var14) { var5 = var14; throw var14; } finally { if (inputStream != null) { if (var5 != null) { try { inputStream.close(); } catch (Throwable var13) { var5.addSuppressed(var13); } } else { inputStream.close(); } } } } }
The reason for the error is that the URL in the client configuration is not pointed to correctly【 http://localhost:9200 】So curiosity showed me the ES auto configuration source code
ElasticsearchRestClientAutoConfiguration
ElasticsearchRestClientConfigurations
ElasticsearchRestClientProperties
RestClientBuilderCustomizer
I found something fishy in elasticsearchrestclientconfigurations, as shown in the following code
@Bean @ConditionalOnMissingBean RestClient elasticsearchRestClient(RestClientBuilder builder, ObjectProvider<RestHighLevelClient> restHighLevelClient) { RestHighLevelClient client = (RestHighLevelClient)restHighLevelClient.getIfUnique(); return client != null ?client.getLowLevelClient() : builder.build(); }
This code indicates that if there are multiple resthighlevelclient instances in the current application, restclientbuilder will be selected. If there is only one instance, the default one will be selected. This also explains the default configuration of elasticsearchrestclientproperties if there are multiple clients without an error
private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200")); private String username; private String password; private Duration connectionTimeout = Duration.ofSeconds(1L); private Duration readTimeout = Duration.ofSeconds(30L);
3、 Solution
1. If there are multiple es, remove the self-test (not recommended)
management.health.elasticsearch.enabled=false
2. Configure the default restclientbuilder to self check the default es
The first method: keep the name the same as the default resthighlevelclient
@Bean(name = "restHighLevelClient") public RestHighLevelClient restHighLevelClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; }
The second method: set @ primary as the default
@Bean @Primary public RestHighLevelClient esClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient client = new RestHighLevelClient(builder); return client; }
3. Repeat the ES self-test procedure (allow multiple es clusters to perform self-test)
What I’m interested in is to expand and enable the two clusters to implement health check