Tag Archives: distributed

Doris Error: there is no scanNode Backend [How to Solve]

Background

No. 3.8 on the business development side responded that sparkstreaming lost, scanned the Doris table (query SQL) and reported an error

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 20, hd012.corp.yodao.com, executor 7): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: errCode = 2, 
detailMessage = there is no scanNode Backend. [126101: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS), 14587381: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS), 213814: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS)]

Error:
detailMessage = there is no scanNode Backend. [126101: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS), 14587381: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS), 213814: in black list(Ocurrs time out with specfied time 10000 MICROSECONDS)]
Source Code Analysis

//Blacklisted objects
private static Map<Long, Pair<Integer, String>> blacklistBackends = Maps.newConcurrentMap();

//The task execution process requires getHost, and the return value is the TNetworkAddress object
public static TNetworkAddress getHost(long backendId,
                                      List<TScanRangeLocation> locations,
                                      ImmutableMap<Long, Backend> backends,
                                      Reference<Long> backendIdRef)

//Get the backend object by backendId in the getHost() method
Backend backend = backends.get(backendId);



// determine if the backend object is available
//return the TNetworkAddress object if it is available
//If it's not available, it iterate through the locations object to find a candidate backend object
//If the backend just unavailable is the same as the candidate backend object id, then continue
//If not, determine whether it is available, available then return to change the candidate be's TNetworkAddress
//If not available, continue to change the next candidate be


if (isAvailable(backend)) {
    backendIdRef.setRef(backendId);
    return new TNetworkAddress(backend.getHost(), backend.getBePort());
}  else {
    for (TScanRangeLocation location : locations) {
        if (location.backend_id == backendId) {
            continue;
        }
        // choose the first alive backend(in analysis stage, the locations are random)
        Backend candidateBackend = backends.get(location.backend_id);
        if (isAvailable(candidateBackend)) {
            backendIdRef.setRef(location.backend_id);
            return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort());
        }
    }
}

public static boolean isAvailable(Backend backend) {
    return (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backend.getId()));
}


//If a be is not returned until the end, the cause of the exception is returned
// no backend returned
throw new UserException("there is no scanNode Backend. " +
        getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
                backends, locations.size()));


// get the reason why backends can not be chosen.
private static String getBackendErrorMsg(List<Long> backendIds, ImmutableMap<Long, Backend> backends, int limit) {
    List<String> res = Lists.newArrayList();
    for (int i = 0; i < backendIds.size() && i < limit; i++) {
        long beId = backendIds.get(i);
        Backend be = backends.get(beId);
        if (be == null) {
            res.add(beId + ": not exist");
        } else if (!be.isAlive()) {
            res.add(beId + ": not alive");
        } else if (blacklistBackends.containsKey(beId)) {
            Pair<Integer, String> pair = blacklistBackends.get(beId);
            res.add(beId + ": in black list(" + (pair == null ?"unknown" : pair.second) + ")");
        } else {
            res.add(beId + ": unknown");
        }
    }
    return res.toString();
}


//blacklistBackends object's put
public static void addToBlacklist(Long backendID, String reason) {
    if (backendID == null) {
        return;
    }

    blacklistBackends.put(backendID, Pair.create(FeConstants.heartbeat_interval_second + 1, reason));
    LOG.warn("add backend {} to black list. reason: {}", backendID, reason);
}


public static void addToBlacklist(Long backendID, String reason) {
    if (backendID == null) {
        return;
    }

    blacklistBackends.put(backendID, Pair.create(FeConstants.heartbeat_interval_second + 1, reason));
    LOG.warn("add backend {} to black list. reason: {}", backendID, reason);
}

Cause analysis

According to the task error
detailmessage = there is no scannode backend. [126101: in black list (ocurrs time out with specified time 10000 microseconds), 14587381: in black list (ocurrs time out with specified time 10000 microseconds), 213814: in black list (ocurrs time out with specified time 10000 microseconds)]
analysis, be ID is 126101 The reason why nodes 14587381 and 213814 are in the blacklist may be that ocurrs time out with specified time 10000 microseconds
then it is likely that the three bes on March 8 hung up at that time
according to point 7 of the previous experience of community students
it can be inferred that the be hung up because of improper tasks or configurations

Broker or other tasks overwhelm the be service_ broker_ concurrencymax_ bytes_ per_ broker_ scanner

The specific error was reported because the problem occurred on March 8. Today, more than 20 days have passed. During this period, it has experienced Doris cluster expansion, node rearrangement and other operation and maintenance work. Logs and many backups cannot be recovered. It can only be inferred from ocurrs time out with specified time 10000 microseconds that the be may have hung up at that time, Then our services will be mounted on the supervisor, so they will start automatically (the node service is not available before) ‘s Prometheus rules & amp; Alertmanager alarm)
if the same problem occurs again in the future, continue to improve this article

Solutions

Prometheus rules & amp; amp; amp; amp; nbsp; with be node service unavailable ; Alertmanager alarm
adjust the configuration in fe.conf
configure the spark task and broker task during execution
there is no substantive solution for the time being. If the problem reappears, continue to track and supplement solutions

Communication link failure when connecting Doris

Springboot queries Doris with an error

ERROR [http-nio-10020-exec-12] [http-nio-10020-exec-12raceId] [] [5] @@GlobalExceptionAdvice@@ | server error 
org.springframework.dao.RecoverableDataAccessException: 
### Error querying database.  Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 426 milliseconds ago.  The last packet sent successfully to the server was 0 milliseconds ago.
; Communications link failure

The last packet successfully received from the server was 426 milliseconds ago.  The last packet sent successfully to the server was 0 milliseconds ago.; nested exception is com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 426 milliseconds ago.  The last packet sent successfully to the server was 0 milliseconds ago.

An error is reported in the insert into select task scheduled by Doris

ERROR 2013 (HY000) at line 7: Lost connection to MySQL server during query

analysis

It may be that slow queries cause huge pressure on the cluster.
several slow queries reach 120s-400s, which is unbearable for the Doris cluster because of the global query_ The timeout parameter is 60. It is assumed that the task session variable of someone is set to 600s or higher

Let the development offline slow query task and the tuning SQL
slow query task for more than 100 seconds work normally after offline

But after a while, the springboot service alarms. There are mistakes again

Doris parameter

interactive_timeout=3880000

wait_timeout=3880000

Doris Fe service node alarm log

2021-06-03 16:00:08,398 WARN (Connect-Scheduler-Check-Timer-0|79) [ConnectContext.checkTimeout():365] kill wait timeout connection, remote: 1.1.1.1:57399, wait timeout: 3880000
2021-06-03 16:00:08,398 WARN (Connect-Scheduler-Check-Timer-0|79) [ConnectContext.kill():339] kill timeout query, 1.1.1.1.1:57399, kill connection: true

Doris monitoring

It can be seen that the number of connections at 15:44 drops sharply

#Elk log
you can also see that the alarm and error messages of Doris queried by springboot service also start at 15:44
so what operation variables affect the cluster at 15:44?

See waite according to the error report
_ The time is 3880000s, which is 44 days, but the default in the source code is 28800s

interactive_timeout=3880000

wait_timeout=3880000

No one went online, no one cut, and the Cluster Administrator was in my hands. I didn’t change the parameters, but I’m still not sure why the parameters will change. Go to the fe.audit audit audit log to check the operation records. Sure enough,
someone ( insider ) was using the 2020.2.3 version of DataGrid. At 15:44, the set global parameters were modified

interactive_timeout=3880000

wait_timeout=3880000

call back the two parameters to 28800s , and the connections of the cluster are restored immediately
it should be noted here that in the discussion with the community, there is only wait in Doris_ Timeout works, and the other is interactive_ Timeout in order to be compatible with MySQL, it doesn’t work

Question: why wait in Doris_ When the timeout parameter is too large, it will cause a connection error communications link failure
on the contrary, it can return to normal after being reduced. You need to sort out the code and look at the logic

Please check the
connection Doris error communications link failure

Twitter’s distributed self increasing ID algorithm snowflake (Java version)

summary

In distributed systems, there are some scenarios where a globally unique ID is needed. In this case, in order to prevent ID conflicts, a 36 bit UUID can be used. However, UUID has some disadvantages. First, it is relatively long. In addition, UUID is generally unordered.

Sometimes we want to use a simpler ID, and we want the ID to be generated in time order.

Twitter’s snowflake solved this problem. At first, twitter migrated the storage system from Mysql to Cassandra. Because Cassandra had no sequential ID generation mechanism, it developed such a global unique ID generation service.

structure

The structure of each part is as follows:

0 – 0000000000 0000000000 0000000000 0000000000 0 – 00000 – 00000 – 000000000000

The first bit is unused, the next 41 bits are milliseconds (the length of 41 bits can be used for 69 years), then 5-bit datacenterid and 5-bit workerid (the length of 10 bits can support deployment of 1024 nodes at most), and the last 12 bits are counts within milliseconds (the 12 bit counting sequence number supports 4096 ID serial numbers per millisecond for each node)

A total of 64 bits, a long type. (length of converted string is 18)

The IDs generated by snowflake are sorted according to the time increment, and there is no ID collision (distinguished by datacenter and workerid) in the whole distributed system, and the efficiency is high. It is said that snowflake can generate 260000 IDS per second.

Source code

(Java version of the source)

/**
 * Twitter_Snowflake<br>
 * The structure of SnowFlake is as follows (each part is separated by -):<br>
 * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
 * 1-bit identifier, as the long basic type is signed in Java, the highest bit is the sign bit, positive numbers are 0, negative numbers are 1, so the id is generally positive, the highest bit is 0 <br>
 * 41-bit time intercept (milliseconds), note that the 41-bit time intercept is not a time intercept to store the current time, but the difference between the time intercept (current time intercept - start time intercept)
 * the value obtained), where the start time intercept, generally our id generator to start using the time specified by our program (the following program IdWorker class startTime property). 41-bit time intercept, you can use 69 years, year T = (1L << 41)/(1000L * 60 * 60 * 24 * 365) = 69<br>
 * 10-bit data machine bits that can be deployed in 1024 nodes, including 5-bit datacenterId and 5-bit workerId<br>
 * 12-bit sequential, millisecond counting, 12-bit counting sequence number supports 4096 ID sequential numbers per node per millisecond (same machine, same time cutoff)<br>
 * Add up to exactly 64 bits for a Long type. <br>
 * The advantage of SnowFlake is that the overall self-increasing sorting by time and no ID collision within the whole distributed system (distinguished by data center ID and machine ID), and high efficiency, tested, SnowFlake can generate about 260,000 IDs per second.
 */
public class SnowflakeIdWorker {

    // ==============================Fields===========================================
    /** Start time cutoff (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /* The number of bits occupied by the machine id */
    private final long workerIdBits = 5L;

    /* The number of bits occupied by the data identifier id */
    private final long datacenterIdBits = 5L;

    /* The maximum machine id supported, resulting in 31 (this shift algorithm can quickly calculate the maximum number of decimal digits that can be represented by a few binary digits) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /* The maximum supported data identifier id, resulting in 31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** The number of bits in the id that the sequence occupies */.
    private final long sequenceBits = 12L;

    /** The machine ID is shifted 12 bits to the left */.
    private final long workerIdShift = sequenceBits;

    /** The data identifier id is shifted to the left by 17 bits (12+5)*/.
    Private final long datacenterIdShift = sequenceBits + workerIdBits。

    /** The time truncation is shifted to the left by 22 bits (5+5+12) */.
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits。

    /** Generate the mask for the sequence, here 4095. (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** Work machine ID(0~31) */
    private long workerId;

    /** Work machine ID(0~31) */
    private long datacenterId;

    /** Intra-millisecond sequence (0~4095) */
    private long sequence = 0L;

    /** Time cutoff of the last generated ID */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================
    /**
     * Constructor
     * @param workerId Job ID (0~31)
     * @param datacenterId datacenterId (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    // ==============================Methods==========================================
    /**
     * Get the next ID (this method is thread-safe)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //If the current time is less than the timestamp of the last ID generation, it means that the system clock is backed off and an exception should be thrown at this time
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //If it was generated at the same time, then perform a sequence within milliseconds
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // sequence overflow in milliseconds
            if (sequence == 0) {
                //Block until the next millisecond, get the new timestamp
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //Timestamp change, sequence reset in milliseconds
        else {
            sequence = 0L;
        }

        //Time cutoff of the last generated ID
        lastTimestamp = timestamp;

        //Shifted and put together by orthogonal operations to form a 64-bit ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * Block to the next millisecond until the new timestamp is obtained
     * @param lastTimestamp Time cutoff of the last generated ID
     * @return currentTimestamp
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * Returns the current time in milliseconds
     * @return current time in milliseconds
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    //==============================Test=============================================
    /** TEST */
    public static void main(String[] args) {
        SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        for (int i = 0; i < 1000; i++) {
            long id = idWorker.nextId();
            System.out.println(Long.toBinaryString(id));
            System.out.println(id);
        }
    }
}

“21442;” 32771;

Why must microservices have gateways?

1、 What is a service gateway

Service gateway = route forwarding + filter

1. Route forwarding: receive all external requests and forward them to the back-end micro service.

2. Filter: in the service gateway, a series of crosscutting functions can be completed, such as permission verification, current limiting and monitoring. All these can be completed through the filter (in fact, routing and forwarding are also realized through the filter).

2、 Why a service gateway is needed
the above crosscutting function (taking permission verification as an example) can be written in three places:

1) each service is implemented by itself [not recommended]

2) write to a public service, and then all other services depend on this service [not recommended]

3) write it to the prefilter of the service gateway, and all requests come to check the permission [recommended]

First, the disadvantages are too obvious to use;
Second, compared with the first point, the code development is not redundant, but there are two disadvantages:

(1) because each service introduces this public service, it is equivalent to introducing the same permission verification code in each service, which increases the jar package size of each service for no reason. Especially for the deployment scenario using docker image, the smaller the jar, the better;

       ② Since this public service is introduced into every service, it may be difficult for us to upgrade this service in the future. Moreover, the more functions the public service has, the more difficult it will be to upgrade. Suppose we change the way of permission verification in public service. If we want all services to use the new way of permission verification, we need to re package all previous services , compile the deployment.

The service gateway can solve this problem

    write the logic of permission verification in the filter of the gateway, the back-end service does not need to pay attention to the code of permission verification, so the logic of permission verification will not be introduced into the jar package of the service, and the size of the jar package will not be increased; if you want to modify the logic of permission verification, you only need to modify the filter of permission verification in the gateway, without upgrading all existing micro services.

So, need service gateway!!!

 

3、 Service gateway technology selection

 

After the introduction of service gateway, the microservice architecture is as above, including three parts: service gateway, open service and service.

1. Overall process:

The service gateway, open service and service are registered in the registry when they are started; the user requests the gateway directly, and the gateway performs intelligent routing and forwarding (including service discovery and load balancing) to the open service, which includes permission verification, monitoring, current limiting and other operations. The open service aggregates the internal service response, returns it to the gateway, and the gateway returns it to the user

2. Points for attention in introducing gateway

With the addition of gateway and one more layer of forwarding (originally, the user requested to directly access the open service), the performance will decline (but the decline is not big. Generally, the performance of gateway machine will be very good, and the access between gateway and open service is usually intranet access, which is very fast); single point problem of gateway: there must be a single point in the whole network call process, which may be Gateway, nginx, DNS server, etc. To prevent gateway single point, you can hang another nginx in front of the gateway layer. The performance of nginx is very high, and it will not hang basically. After that, the gateway service can continuously add machines. However, such a request is forwarded twice, so the best way is to deploy the gateway single point service on a powerful machine (estimate the configuration of the machine through pressure test). Moreover, the performance comparison between nginx and zuul is similar according to the experiment done by a foreign friend. Zuul is an open source framework for gateway of Netflix, and the gateway should be fully implemented Light weight.

3. Basic functions of service gateway

Intelligent routing: receive all external requests and forward them to the external service open service of the back end;

Note: we only forward external requests, and requests between services do not go through the gateway. This means that full link tracking, internal service API monitoring, fault tolerance of calls between internal services, and intelligent routing cannot be completed in the gateway. Of course, all service calls can go through the gateway, and almost all functions can be integrated into the gateway, but in this case, the gateway’s pressure can be reduced It’s going to be very heavy. Permission verification: only the user’s request to the open service is verified, and the internal request of the service is not verified. Is it necessary to verify the request inside the service?API monitoring: only monitor the requests passing through the gateway and some performance indicators of the gateway itself (for example, GC, etc.); current limiting: cooperate with the monitoring to carry out current limiting operation; API log unified collection: similar to an aspect aspect aspect, record the relevant log when the interface enters and goes out… Follow up supplement

The above functions are the basic functions of the gateway, and the gateway can also realize the following functions:

A | B test: a | B test is a relatively large thing, including background experiment configuration, data burial point (see conversion rate) and streaming engine. In the service gateway, the streaming engine can be realized, but in fact the streaming engine will call internal services, so if it is in accordance with the architecture in the figure above, the streaming engine should be in the open service rather than in the service gateway…. Follow up supplement

 

4. Technology selection

The author is going to build a lightweight service gateway

Development language: java + groovy, the advantage of groovy is that the gateway service can dynamically add filter to achieve some functions without restart; microservice basic framework: springboot; gateway basic component: Netflix zuul; service registry: consult; permission verification: JWT; API monitoring: Prometheus + grafana; API unified log collection: logback+ Elk; stress test: JMeter;… Follow up supplement

In the follow-up introduction, will gradually introduce each knowledge point, and complete a lightweight service gateway!!!