Tag Archives: Multithreading

Python asynchronous co process crawler error: [aiohttp. Client]_ Exceptions: serverdisconnected error: Server Disconnected]

Background description:

I just started to contact crawlers, read online tutorials and began to learn a little bit. All the knowledge points are relatively shallow. If there are better methods, please comment and share.

The initial crawler is very simple: crawl the data list in a web page, and the format returned by the web page is also very simple. It is in the form of a dictionary, which can be directly accessed by saving it as a dictionary with . Json() .

At the beginning of contact with asynchronous collaborative process, after completing the exercise, try to transform the original crawler, resulting in an error.

Initial code:

async def download_page(url):
	async with aiohttp.ClientSession() as session:
		async with session.get(url) as resp:
			await result = resp.text()

async def main(urls):
	tasks = []	
	for url in urls:
		tasks.append(asyncio.create_task(download_page(url)))  # 我的python版本为3.9.6
	await asyncio.await(tasks)

if __name__ == '__main__':
	urls = [ url1, url2, …… ]
	asyncio.run(main(urls))

This is the most basic asynchronous collaborative process framework. When the amount of data is small, it can basically meet the requirements. However, if the amount of data is slightly large, it will report errors. The error information I collected is as follows:

aiohttp.client_exceptions.ClientOSError: [WinError 64] The specified network name is no longer available.
Task exception was never retrieved

aiohttp.client_exceptions.ClientOSError: [WinError 121] The signal timeout has expired
Task exception was never retrieved

aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected
Task exception was never retrieved

The general error message is that there is a problem with the network request and connection
as a beginner, I didn’t have more in-depth understanding of network connection, nor did I learn more in-depth asynchronous co process operation. Therefore, it is difficult to solve the problem of error reporting.

Solution:

The big problem with the above error reports is that each task creates a session. When too many sessions are created, an error will be reported.

Solution:

Try to create only one session

async def download_page(url,session):
	async with session.get(url) as resp:
		await result = resp.text()

async def main(urls):
	tasks = []	
	async with aiohttp.ClientSession() as session:  # The session will be created in the main function, and the session will be passed to the download_page function as a variable.
		for url in urls:
			tasks.append(asyncio.create_task(download_page(url,session)))
			#My python version is 3.9.6, python version 3.8 and above, if you need to create asynchronous tasks, you need to create them via asyncio.create_task(), otherwise it will run normally but will give a warning message await asyncio.await(tasks)

if __name__ == '__main__':
	urls = [ url1, url2, …… ]
	asyncio.run(main(urls))

In this way, the problem of connection error can be solved to a certain extent when crawling a large amount of data.

Sentiment:  in the process of programming, thinking should be more flexible. A little change may improve the efficiency a lot.

Error (17) solves the problem of losing the request header of multithread asynchronous feign call

questions

In microservice, the request header will be lost in multithreading asynchronous + feign call

solving
    1. in the main thread, the request header parameters are first obtained and passed into the sub thread, and then the sub thread sets the request header parameters to the context. Finally, in feign forwarding processing, the request header data of the context set by the sub thread is obtained and forwarded to the downstream

Get context request parameter tool class

@Slf4j
public class RequestContextUtil {

    /**
     * Get request header data
     *
     * @return key-> request header name value-> request header value
     * @author zhengqingya
     * @date 2021/6/30 9:39 PM
     */
    public static Map<String, String> getHeaderMap() {
        Map<String, String> headerMap = Maps.newLinkedHashMap();
        try {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            if (requestAttributes == null) {
                return headerMap;
            }
            HttpServletRequest request = requestAttributes.getRequest();
            Enumeration<String> enumeration = request.getHeaderNames();
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                String value = request.getHeader(key);
                headerMap.put(key, value);
            }
        } catch (Exception e) {
            log.error("《RequestContextUtil》 Failed to get request header parameters:", e);
        }
        return headerMap;
    }

}

Request header context

@Slf4j
public class RequestHeaderHandler {

    public static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new ThreadLocal<>();

    public static void setHeaderMap(Map<String, String> headerMap) {
        THREAD_LOCAL.set(headerMap);
    }

    public static Map<String, String> getHeaderMap() {
        return THREAD_LOCAL.get();
    }

    public static void remove() {
        THREAD_LOCAL.remove();
    }

}

Feign forwarding process RPC call parameter transfer

 */
@Slf4j
@Configuration
public class FeignRequestInterceptor implements RequestInterceptor {

    @Override
    @SneakyThrows
    public void apply(RequestTemplate requestTemplate) {
        log.debug("========================== ↓↓↓↓↓↓ 《FeignRequestInterceptor》 Start... ↓↓↓↓↓↓ ==========================");
        Map<String, String> threadHeaderNameMap = RequestHeaderHandler.getHeaderMap();
        if (!CollectionUtils.isEmpty(threadHeaderNameMap)) {
            threadHeaderNameMap.forEach((headerName, headerValue) -> {
                log.debug("《FeignRequestInterceptor》 Multi-threaded headerName:[{}] headerValue:[{}]", headerName, headerValue);
                requestTemplate.header(headerName, headerValue);
            });
        }
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        headerMap.forEach((headerName, headerValue) -> {
            log.debug("《FeignRequestInterceptor》 headerName:[{}] headerValue:[{}]", headerName, headerValue);
            requestTemplate.header(headerName, headerValue);
        });
        log.debug("========================== ↑↑↑↑↑↑ 《FeignRequestInterceptor》 End... ↑↑↑↑↑↑ ==========================");
    }

}

Use cases

@Slf4j
@RestController
@RequestMapping("/web/api/demo/test")
@Api(tags = "test api")
@AllArgsConstructor
public class RpcController extends BaseController {

    private SystemTaskThread systemTaskThread;

    @GetMapping("getContextUserId")
    @ApiOperation("rpc Call test - Async")
    public void getContextUserId() {
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        log.info("Main thread request header value: {}", headerMap.get("userId"));
        this.systemTaskThread.getRequestHeaderUserId(RequestContextUtil.getHeaderMap());
    }

}

@Slf4j
@Component
@AllArgsConstructor
public class SystemTaskThread {

    private ISystemClient systemClient;

    @SneakyThrows
    @Async(ThreadPoolConstant.SMALL_TOOLS_THREAD_POOL)
    public void getRequestHeaderUserId(Map<String, String> headerMap) {
        RequestHeaderHandler.setHeaderMap(headerMap);
        log.info("Sub-thread request header value: {}", this.systemClient.getRequestHeaderUserId());
    }

}

Note: it is also mentioned on the Internet that the main thread obtains the request parameters requestattributes, requestattributes = requestcontextholder. Getrequestattributes() to the child thread, and then re assign requestcontextholder. Setrequestattributes (requestattributes) But this method is not effective. Please record it here by the way~

Failed to establish a new connection: [winerror 10048] in the requests thread pool, the interface is called circularly to get the datagram error

To solve the problem that in the requests thread pool, loop call the interface to get data and report an error failed to establish a new connection: [winerror 10048] generally, each socket address (Protocol/network address/port) is only allowed to be used once

My code first

import json
import csv
from threadpool import ThreadPool
import threadpool
from threadpool import makeRequests
import time
import random
with open('all_balance.csv','r') as f:
    list1 = csv.reader(f)
    list1 = list(list1)
def load_data(name):
    while True:
        Payload = {
          "id": 0,
          "jsonrpc": "2.0",
          "method": "Filecoin.StateGetActor",
          "params":
          [name[0],[{'/': 'bafy2bzaceazzbdegiso5c4tsjipxjmdabpk5uamj6khzhuwycb4bjeafbhaeo'}, {'/': 'bafy2bzacea4pnuzojwownzajhjdlqitt4wodbvgmhhfc4dujqdkjz3lgl6sac'}, {'/': 'bafy2bzacecjsgkjrcg6bejnqbvm2lktrhxpiazrqwfppueijfzte2bf2kwx42'}, {'/': 'bafy2bzaceaeglquy7h5i6tobxqikaqh2onzvhdjzdhpdneo47grs3qdvvbzc6'}, {'/': 'bafy2bzacea2knbqirjgw7rsfrydo6gfams6wnbpfdrvxhasuo6obqipn4zoco'}, {'/': 'bafy2bzacecmfiu5w7gpuhvdudqpd4qvwng2wokoj6mqydismrujcobgtcunxe'}, {'/': 'bafy2bzacec5mzv2jqqd7k2ripfddlsjv5k2eq52tihjpbtjok37p5hkxep2za'}, {'/': 'bafy2bzacedqyr6oufui2plsbykvevrioa6ukuviyb4i5iz5mq34xxq3gzlz32'}, {'/': 'bafy2bzacecled7zvadjt2jjn354pfhyga22apgqeh5c3ig3vv62tqb6rujsxk'}, {'/': 'bafy2bzacecfyxsfr445b6cvlxnl2p53twzfw4fjqy67bg6nioopb5apa6zb62'}, {'/': 'bafy2bzacech6xyahzbhyyjjd747cyzpllgx4abksncyidxpuxg7hsm2gydxw6'}, {'/': 'bafy2bzaceaisnevf7cpht6cmiwg2l63cqxi5jqyrinjsmdqyvax3delxnj4gg'}]]}
        headers = {"Content-Type": "application/json"}`

        respon = requests.post('http://********:1248/rpc/v0',headers=headers, data=json.dumps(Payload))

        if respon.status_code == 200:
            if respon.json().get('result'):
                print(name[0],int(respon.json()['result']['Balance'])/10**18)
                with open('all_balance_6_24.csv', 'a', encoding='utf-8', newline='') as f:
                    writer = csv.writer(f)
                    writer.writerow([name[0], int(respon.json()['result']['Balance'])/10**18])
                return
        else:
            print(respon.status_code)
            print(respon.text)
            time.sleep(20)
tasks = threadpool.makeRequests(load_data, [list1[i] for i in range(1, 488013)])
pool = threadpool.ThreadPool(100)
for task in tasks:
    pool.putRequest(task)
pool.wait()

Error code

Traceback (most recent call last):
  File "E:\project\chain\lib\site-packages\threadpool.py", line 158, in run
    result = request.callable(*request.args, **request.kwds)
  File "E:\project\chain\chain_main_get_state.py", line 22, in load_data
    respon = requests.post('http://10.0.6.22:1248/rpc/v0',headers=headers, data=json.dumps(Payload))
  File "E:\project\chain\lib\site-packages\requests\api.py", line 119, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "E:\project\chain\lib\site-packages\requests\api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "E:\project\chain\lib\site-packages\requests\sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "E:\project\chain\lib\site-packages\requests\sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "E:\project\chain\lib\site-packages\requests\adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='10.0.6.22', port=1248): Max retries exceeded with url: /rpc/v0 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x0000028BAE472C40>: Failed to establish a new connection: [WinError 10048] 通常每个套接字地址(协议/网络地址/端口)只允许使用一次。'))

Because it’s multithreading, the so-called shutdown of Python process is not the way I want. Finally, I find an effective way to test the effectiveness:

use regedit command to access HKEY_ LOCAL_ Machine/system/currentcontrolset/services/TCPIP/parameters registry subkey and create a new reg named tcptimedwaitdelay_ DWORD value. Set this value to decimal 30, which is hexadecimal 0x0000001E. This value sets the wait time to 30 seconds</ Code>
access HKEY with regedit command_ LOCAL_ Machine/system/currentcontrolset/services/TCPIP/parameters registry subkey and create a new reg named maxuserport_ DWORD value. Stop and restart the system. Default: no recommendation: at least 32768 decimal

When I called the interface, using the command-line tool netstat – N, I found that nearly 4000 connections to the IP address of the target computer running the interface were in time_ In wait state, you can increase the default maxuserport setting and decrease the tcptimedwaitdelay setting at the same time, so that the client anonymous port will not be exhausted. For example, you can set maxuserport to 20000 and tcptimedwaitdelay to 30. A lower tcptimedwaitdelay setting means that the socket is in time_ The waiting time in wait state is shorter. A higher maxuserport setting means that you can put more sockets in time_ Wait status

JMeter: java.net.bindexception: address already in use: connect solution

The operating system will reserve a temporary port for TCP/IP services, and JMeter will start every thread (new operation) when running concurrency test

reason:

Windows reserves a temporary port for TCP/IP service. When JMeter runs concurrency test, every thread (new socket operation) will occupy a temporary port. If the TCP/IP port is occupied and not released in time (socket. Close() operation can not release the bound port immediately, but set the port to time_ Wait status, it will be released after a period of time, the default is 240s), and java.net.bindexception: address already in use: connect will appear.

resolvent:

To increase the number of ports reserved for TCP/IP service, it is necessary to operate on the system installed with JMeter. This paper introduces the solution on Windows system.

1. Key win + R, enter regedit to open the registry, or enter regedit command in CMD to open the registry;

2. Choose HKEY_ LOCAL_ MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters;

3. Right click parameters to create a new DWORD named maxuserport;

4. Then double-click maxuserport, select decimal base, input value data as 6553465534, which is the maximum value, exceeding     The system uses the closest effective value (min 5000 or max 65534);

5. Right click parameters to create a new DWORD named   TCPTimedWaitDelay;

6. Double click tcptimed waitdelay, select decimal base, input numerical data as 30, set time as 30 seconds, default is 240 seconds;

7. Restart the computer to take effect!!

 

[go] solve the fatal error of go: concurrent map writes map non concurrent security

Map is not concurrency safe, when there are multiple concurrent growths reading and writing the same map  
A panic error occurs

concurrent map writes

For example, this error occurs in the following code:

var mMap map[int]int

func TestMyMap(t *testing.T) {
    mMap = make(map[int]int)

    for i := 0; i < 5000; i++ {
        go func() {
            mMap[i] = i
        }()
        go readMap(i)
    }
}
func readMap(i int) int {
    return mMap[i]
}

There are many ways to solve this problem. Now we use read-write lock,

Concurrent access to map is not safe, and undefined behavior will appear, leading to program exit. Therefore, if you want to access the map concurrently in multiple coroutines, you must provide some synchronization mechanism. Generally, you can control the concurrent access to the map by reading and writing the lock sync.rwmutex. Encapsulating the map and sync.rwmutex can realize the secure concurrent access to the map

Code after transformation

type SMap struct {
    sync.RWMutex
    Map map[int]int
}

func (l *SMap) readMap(key int) (int, bool) {
    l.RLock()
    value, ok := l.Map[key]
    l.RUnlock()
    return value, ok
}

func (l *SMap) writeMap(key int, value int) {
    l.Lock()
    l.Map[key] = value
    l.Unlock()
}

var mMap *SMap

func TestMyMap(t *testing.T) {
    mMap = &SMap{
        Map: make(map[int]int),
    }

    for i := 0; i < 5000; i++ {
        go func() {
            mMap.writeMap(i, i)
        }()
        go readMap(i)
    }
}
func readMap(i int) (int, bool) {
    return mMap.readMap(i)
}

  There are three ways:

1. Use channel
2. Use sync. Map
3. Use map but lock it

[example multitasking] Python multithreading module

Python has built-in threading module, which encapsulates the lower level thread module
for built-in methods, see the official document: threading – thread based parallelism

Multithreading execution

The main thread will wait for all the child threads to finish

#coding=utf-8
import threading
import time

def thread_test():
    print("test.")
    time.sleep(1)

if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=thread_test)
        t.start() 

View the number of threads

#coding=utf-8
import threading
from time import sleep, ctime

def a():
    for i in range(3):
        print("a...%d"%i)
        sleep(1)

def bbbbb():
    for i in range(3):
        print("bbbbb...%d"%i)
        sleep(1)

if __name__ == '__main__':
    print('---start---:%s'%ctime())
    t1 = threading.Thread(target=a)
    t2 = threading.Thread(target=bbbbb)

    t1.start()
    t2.start()

    while True:
        length = len(threading.enumerate())
        # length = threading.active_count()
        print('The number of threads currently running is: %d'%length)
        if length<=1:
            break

        sleep(0.5)

Encapsulating the threading. Thread class

Inherit threading. Thread and override run method

#coding=utf-8
import threading
import time

class MyThread(threading.Thread):

    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm "+self.name+' @ '+str(i) The name of the current thread is stored in the #name attribute
            print(msg)


if __name__ == '__main__':
    t = MyThread()
    t.start()

Thread execution order

Each thread runs a complete run function, but the start order of the thread and the execution order of each loop in the run function cannot be determined.

Sharing global variables and passing parameters

# Shared global variables
from threading import Thread
import time

g_num = 100

def work1():
    global g_num
    for i in range(3):
        g_num += 1
    print("----in work1, g_num is %d---"%g_num)


def work2():
    global g_num
    print("----in work2, g_num is %d---"%g_num)


print("---Before the thread is created g_num is %d---"%g_num)

t1 = Thread(target=work1)
t1.start()

#Delay for a while to make sure things are done in thread t1
time.sleep(1)

t2 = Thread(target=work2)
t2.start()

All threads in a process share global variables, which is very convenient to share data among multiple threads. Disadvantages: threads change global variables randomly, which may cause confusion among multiple threads (that is, threads are not safe)

from threading import Thread
import time

def work1(nums):
    nums.append(44)
    print("----in work1---",nums)


def work2(nums):
    #Delay for a while to make sure things are done in thread t1
    time.sleep(1)
    print("----in work2---",nums)

g_nums = [11,22,33]

t1 = Thread(target=work1, args=(g_nums,))
t1.start()

t2 = Thread(target=work2, args=(g_nums,))
t2.start()

Mutex

When multiple threads modify a shared data almost at the same time, synchronization control is needed.
when a thread wants to change the shared data, lock it first. At this time, the resource status is “locked”, and other threads cannot change it; Until the thread releases the resource and changes its state to “non locked”, other threads can lock the resource again. Mutex ensures that only one thread can write each time, so as to ensure the correctness of data in the case of multithreading.

Advantages ensure that a piece of key code can only be completely executed by one thread from beginning to end. Disadvantages 1 prevent the concurrent execution of multiple threads. In fact, a piece of code containing locks can only be executed in single thread mode, which greatly reduces the efficiency. Disadvantages 2 because there can be multiple locks, different threads hold different locks, and try to obtain the lock held by the other party, Deadlock may occur

import threading
import time

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        mutex.acquire()  
        g_num += 1
        mutex.release()  

    print("---test1---g_num=%d"%g_num)

def test2(num):
    global g_num
    for i in range(num):
        mutex.acquire()  
        g_num += 1
        mutex.release()  

    print("---test2---g_num=%d"%g_num)

# Create a mutually exclusive lock
# Default is the unlocked state
mutex = threading.Lock()

# Create 2 threads and have them each add 1000000 times to g_num
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()

p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()

# Wait for the calculation to complete
while len(threading.enumerate()) ! = 1:
    time.sleep(1)

print("The final result after 2 threads operate on the same global variable is:%s" % g_num)

Mutex can use context manager
to use lock, condition and semaphore in with statement
the object with acquire() and release() method provided by this module can be used as context manager of with statement. When entering a statement block, the acquire () method will be called, and when exiting a statement block, release () will be called. Therefore, the following fragments:

with some_lock:
    # do something...

# Equivalent to :

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

Lock, RLOCK, condition, semaphore, and boundedsemaphore objects can now be used as context managers for the with statement.

Multithreading case [chat]

Write a program with two threads
thread 1 is used to receive data and then display
thread 2 is used to detect keyboard data and then send data through UDP

import socket
import threading


def send_msg(udp_socket):
    """Get the keyboard data and send it to the other side """
    while True:
        # 1. input data from keyboard
        msg = input("\nPlease enter the data to be sent:")
        # 2. enter the ip address of the other party
        dest_ip = input("\nPlease enter the other party's ip address:")
        # 3. enter the other party's port
        dest_port = int(input("\nPlease enter the other party's port:"))
        # 4. send data
        udp_socket.sendto(msg.encode("utf-8"), (dest_ip, dest_port))


def recv_msg(udp_socket):
    """Receive data and display """
    while True:
        # 1. receive data
        recv_msg = udp_socket.recvfrom(1024)
        # 2. decode
        recv_ip = recv_msg[1]
        recv_msg = recv_msg[0].decode("utf-8")
        # 3. display the received data
        print(">>>%s:%s" % (str(recv_ip), recv_msg))


def main():
    # 1. Create a socket
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    # 2. bind local information
    udp_socket.bind(("", 7890))

    # 3. create a child thread to receive data
    t = threading.Thread(target=recv_msg, args=(udp_socket,))
    t.start()
    # 4. let the main thread detect the keyboard data and send
    send_msg(udp_socket)


if __name__ == "__main__":
    main()

Multithreading: when doing unit test, use thread pool to find that the specified code is not running and skip directly

Today, we did a unit test to debug the interface. We found that the interface call was successful, but we did not run the thread pool execution method. Instead, we directly skipped the execution code

 

 ExecutorService pool = Executors.newFixedThreadPool( 2 );

 

 public void callInterfaceCreditease(final String idcard,final String name,final String mobile){
        try{
            
            ExecutorService pool = Executors.newFixedThreadPool( 2 );
            //Interface
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    creditCardCrediteaseService.doFetchCreditData(name, idcard, mobile);
                }
            });
            //Release thread pool resources
            pool.shutdown();
        }catch(Exception e){
            log.error("Calling interface exceptions:",e);
        }
                
    }

 

Spring boot uses thread pool to realize asynchronous processing without return and asynchronous processing with return

1. Scene

HTTP requests are processed asynchronously;

1) No need to return, such as sending SMS, push messages, e-mail and other ancillary services, asynchronous processing, reduce the total time consumption of HTTP requests, improve customer experience.

2) If you need to return, the front-end will wait for the business processing result data, and the asynchronous processing will return to the main thread of HTTP request, and return to the front-end; if it is a single business that can be optimized here, please check the previous and next articles in this blog; if it is multiple business processing, you can use multiple asynchronous processing with return, and the total weight will summarize the result and return.

2. Knowledge points

1) Thread pool threadpooltaskexecutor

2) Note: @ enableasync, @ async (“asyncserviceexecutor”)

3) Return value encapsulates future & lt; T & gt;, new asyncresult & lt; & gt; (T)

3. Code examples

1)controller

package com.liuxd.controller;

import com.liuxd.entity.Responses;
import com.liuxd.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Future;

@Slf4j
@RestController
public class AsyncCtr {

    @Autowired
    private TaskService taskService;

    @RequestMapping(value = "/get", method = RequestMethod.GET)
    public Responses<String> getResult() throws Exception {

        log.info("HTTP request received...") ;
        long startTime = System.currentTimeMillis();

        //1. Asynchronous execution_processing_data_without_waiting
        taskService.handleData();

        //2. Asynchronous execution_processing_data_waiting_for_processing_results
        Future<String> future = taskService.getData();
        String result = future.get();

        log.info("Receiving HTTP request thread task completed, exit!") ;
        long endTime = System.currentTimeMillis();
        log.info("Total time spent on http request. " + (endTime - startTime) + "ms");

        return new Responses<>(0, result, "SUCCESS");

    }

}

2)service

package com.liuxd.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Slf4j
@Service
public class TaskService {

    @Async("asyncServiceExecutor")
    public void handleData() {

        log.info("Call service no return asynchronous method, start execution...");
        long startTime = System.currentTimeMillis();
        try {
            Thread.sleep(2500L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("Call service no return asynchronous method, the end of execution!!!") ;
        long endTime = System.currentTimeMillis();
        log.info("Total time taken to call service without return asynchronous method. " + (endTime - startTime) + "ms");

    }

    @Async("asyncServiceExecutor")
    public Future<String> getData(){

        log.info("Call service has return asynchronous method, start execution...");
        long startTime = System.currentTimeMillis();

        try {
            Thread.sleep(2500L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("Call service has returned asynchronous methods, the end of execution!!!") ;

        long endTime = System.currentTimeMillis();
        log.info("Total time to call service with return asynchronous method: " + (endTime - startTime) + "ms");

        return new AsyncResult<>("Asynchronous processing completed!");

    }


}

3) Thread pool

package com.liuxd.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @program: unif-insure-service
 * @description: Thread Pool
 **/
@Configuration
@EnableAsync
public class ExecutorConfig {
    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //Configure the number of core threads
        executor.setCorePoolSize(5);
        // Configure the maximum number of threads
        executor.setMaxPoolSize(5);
        //configure queue size
        executor.setQueueCapacity(1000);
        //configure the name prefix of the threads in the thread pool
        executor.setThreadNamePrefix("async-system-");

        // rejection-policy: How to handle new tasks when the pool has reached its max size
        // CALLER_RUNS: instead of executing the task in the new thread, the caller's thread will execute it
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //Execution initialization
        executor.initialize();
        return executor;
    }
}

4)Responses

package com.liuxd.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Responses<T> {

    private Integer code;

    private String msg;

    private T data;

}

5) Print results

6) Result analysis

1) There are three threads, HTTP request main thread, asynchronous no return thread, asynchronous return thread

2) The main thread does not need to wait for no return thread, it will wait for the result of return thread, and finally return

Asynchronous processing of HTTP request by Java_ Method 1: through callable

1.Callable

1) Runnable, which performs an independent task but does not return a value. If you want a return value after the task is completed, you can use the callable interface;

2) Callable is a paradigm with type parameters. Its type parameter method is expressed as the value returned by the method call () instead of run (), and it must be used ExecutorService.submint () method.

difference:

1. Callable, which accepts a generic type and returns a value of this type in the call() method; however, runnable’s run() method has no return value;
2. Callable, whose call() method can throw an exception, while runnable’s run() method does not.

2. Business scenario:

In HTTP requests, the business processing process takes a long time, such as large queries, remote calls and other scenarios. The main thread that receives HTTP requests will be occupied all the time, and the number of threads in Tomcat thread pool is limited. Therefore, the number of HTTP requests accepted per unit time will decrease.

3. Code examples

1)controller

package com.liuxd.controller;

import com.liuxd.entity.Responses;
import com.liuxd.service.BusinessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Callable;

@Slf4j
@RestController
public class AsyncCallableController {

    @Autowired
    private BusinessService businessService;

    @GetMapping(value = "/getData")
    public Callable<Responses<String>> getData() {

        log.info("HTTP request received...");

        Callable<Responses<String>> data = (() -> {
            return businessService.getData();
        });

        log.info("The task of the receiving HTTP request thread has been completed, exit!");

        return data;
    }

}

2) service

package com.liuxd.service;

import com.liuxd.entity.Responses;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class BusinessService {

    public Responses<String> getData(){

        log.info("Call the service method and start the execution...");

        try {
            Thread.sleep(2500L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("Call service method, end of execution!");

        return new Responses<>(0,"Done","SUCCESS");

    }

}

3) Print results

The reason why HashMap multithreads send out life and death loops

This article is a learning record of Vaccine: Java HashMap’s endless loop, welcome to discuss ~
Suppose the state of a cylinder to be expanded is as follows:
1->; 2-> Null
1 for the current thread to insert a new element of an array of e: 1 and the next element next:
when thread 2 completed the expansion, the pointer to the current status to 2 – & gt; 1-> NULL
Thread 1 continues:
rst (e:1,next:2)
n>arrel :1 ->; null

e=next=2;
next=e1.next=1;

> (e:2,next:1)
> 1-> null

e=next=1;
next=e1.next=null;

(e:1,next:null)
new barrel :1 ->; 2-> 1 is just 1 lt; -> 2

e=next=null;

cpu100%
7 using head insert, 1.8 using tail insert for capacity expansion.
1.7 using head insert, 1.8 using tail insert for capacity expansion.

The number of control threads and concurrency number of concurrent executor of Java starting gun

introduction

the first time when studying the singleton pattern, useful to the multithreading concurrency test thread-safe singleton pattern. However, I was pressed for time at that time and did not record it, so I made a special record today.

1, look at the code

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


/**
* @author :Jarvisy
* @date :Created in 2020/9/16 1:20
* @description :
*/
public class ConcurrentExecutor {
    /**
     * @param runHandler
     * @param executeCount    发起请求总数
     * @param concurrentCount 同时并发执行的线程数
     * @throws Exception
     */
    public static void execute(final RunHandler runHandler, int executeCount, int concurrentCount) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //控制信号量,此处用于控制并发的线程数
        final Semaphore semaphore = new Semaphore(concurrentCount);
        //闭锁,可实现计数量递减
        final CountDownLatch countDownLatch = new CountDownLatch(executeCount);
        for (int i = 0; i < executeCount; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    try {
                        //执行此方法用于获取执行许可,当总计未释放的许可数不超过executeCount时,
                        //则允许同性,否则线程阻塞等待,知道获取到许可
                        semaphore.acquire();
                        runHandler.handler();//回调函数
                        //释放许可
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();//线程阻塞,知道闭锁值为0时,阻塞才释放,继续往下执行
        executorService.shutdown();
    }


    public interface RunHandler {
        void handler();
    }
}

2, Executors

Java four thread pool can be created by Executors:

  1. newCachedThreadPool creates a cacheable thread pool. If the length of the thread pool exceeds the processing requirement, the free thread can be reclaimed flexibly. If no thread can be retrieved, a new thread can be created.
  2. newFixedThreadPool creates a fixed-length thread pool that controls the maximum number of concurrent threads. The newScheduledThreadPool creates a fixed-length thread pool that supports timed and periodic task execution while waiting in the queue for
  3. .
  4. newSingleThreadExecutor creates a single-threaded thread pool that executes tasks with only a unique worker thread, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority).

The first type: newCachedThreadPool
thread pool is infinite. When the second task is executed and the first task has been completed, the thread executing the first task is reused instead of creating a new thread each time.

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    System.out.println(index);
                }
            });
        }

second: newFixedThreadPool
because the thread pool size is 3, each task output index after sleep 2 seconds, so every two seconds to print 3 Numbers.
the size of the fixed-length thread pool is best set based on system resources. Such as the Runtime. GetRuntime (). AvailableProcessors ()

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   fixedThreadPool.execute(new Runnable() {  
    public void run() {  
     try {  
      System.out.println(index);  
      Thread.sleep(2000);  
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
   });

NewScheduledThreadPool

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
  scheduledThreadPool.scheduleAtFixedRate(new Runnable() {  
   public void run() {  
    System.out.println("delay 1 seconds, and excute every 3 seconds");  
   }  
  }, 1, 3, TimeUnit.SECONDS);

Fourth: newSingleThreadExecutor

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   singleThreadExecutor.execute(new Runnable() {  
    public void run() {  
     try {  
      System.out.println(index);  
      Thread.sleep(2000);  
     } catch (InterruptedException e) {  
      e.printStackTrace();  
     }  
    }  
   });

3, the ExecutorService

ExecutorService methods:

1, execute(Runnable) this method accepts an instance of Runnable and executes asynchronously. The problem with this approach is that there is no way to know the execution results of Task. If we want to get the execution result of task, we can pass in an instance of Callable.

2, submit(Runnable) submit(Runnable) and execute(Runnable) are different in that the former can return a Future object. By returning the Future object, we can check whether the submitted task has been completed. If the task completes, the future.get() method returns null. Note that the future.get() method causes a block.

3, submit(Callable) submit(Callable), similar to submit(Runnable), also returns a Future object, but otherwise submit(Callable) receives an implementation of a Callable. The call() method in the Callable interface has a return value that can return the execution result of the task, while the run() method in the Runnable interface is void. No return value. If the task completes, the future.get() method returns the execution result of the Callable task. Note that the future.get() method causes a block.

4, invokeAny (…). this method receives a collection of Callable. Execution of this method does not return the Future, but does return the result of one of the Callable tasks. This method does not guarantee that the result of which task is returned, but one of them anyway.

5 and invokeAll (…). invokeAll (…). With invokeAny (…). Similarly, a collection of Callable objects is received, but the former will return a List of Future objects corresponding to each Callable task after execution.

Shutdown of>ExecutorService:
1, shutdown() : stops receiving new tasks, and the original tasks continue

  1. stop receiving the new submit task;
  2. committed tasks (including those running and waiting in the queue), will continue to complete;
  3. wait until the second step is completed, then really stop;

2, shutdownNow() : stop receiving new tasks, the original task stops executing

  1. stops receiving the new submit task as shutdown();
  2. ignores tasks waiting in the queue;
  3. attempts to interrupt the task being performed;
  4. returns a list of unexecuted tasks;

3, awaitTermination(long timeOut, TimeUnit) : timeOut and TimeUnit are timeOut and unit
the current thread is blocking until:

  1. and other submitted tasks (including running and waiting in the queue) executed;
  2. or timeout (timeout and TimeUnit);
  3. or if the thread is interrupted, throw InterruptedException;

then monitors whether the ExecutorService has closed, returning either true (after the shutdown request all tasks have been completed) or false (timeout)

The difference between

4, shutdown() and shutdownNow()
shutdown() just closed the submission channel, using submit() was invalid; And the inside runs just the way it is supposed to, and then stops.
shutdownNow() instantly stops the thread pool, stopping both running and waiting tasks.

5, shutdown(), awaitTermination(),
shutdown(), no new task can be submitted. But after awaitTermination(), the submission can continue.
awaitTermination() is blocked and returns whether the thread pool has stopped (true/false); Shutdown () does not block.

1. Elegant shutdown, using shutdown()
; 2. Want to shutdown immediately, and get the list of unperformed tasks; 3. shutdown() > awaitTermination()

4, Semaphore

Semaphore is a helper class for thread synchronization that maintains the number of threads currently accessing itself and provides a synchronization mechanism. Semaphore allows you to control the number of threads accessing a resource at the same time, for example, to implement the number of concurrent accesses allowed for a file.

Semaphore initialization must provide the number of concurrent threads because Semaphore does not provide a function to update the number of concurrent threads.

1. The constructors of the class Semaphore are permits, representing the maximum number of permits for threads to execute code between acquire() and release() at the same time. Acquire () acquire() function is to consume 1 license every time this method is called. The function of acquire(n) is to consume n licenses every time this method is called. The function of release() is to dynamically add a license to the method every time it is called.
5, method release(n)‘s function is to dynamically add n licenses to the method every time it is called.
method acquirenterruptibly () means that threads waiting to enter acquire() method are not allowed to be interrupted.
7, the method available() returns the number of permits currently available in the Semaphore object.
8, method exhaustive () retrieves and returns the maximum number of permits, And will be available for permission to reset to 0
9, methods, getQueueLength () is the function of licensing the waiting thread number 10,
method hasQueueThreads () is used to determine any threads are waiting for the permission
11, fair and not fair semaphore:
sometimes to get permission to order associated with the sequence of thread starts, this signal will be divided into fair and not fair. The so-called fair semaphore is the order in which the lock is acquired depending on the order in which the thread is started, but it does not mean that 100% of the semaphore is acquired, only in probability, and the non-fair semaphore is independent.
e.g.

Semaphore semaphore = new Semaphore(1false);

False: means non-fair semaphore, i.e., the order in which threads start is independent of the order in which semaphore. Acquire () is called, i.e., the program started first does not mean that permission is obtained first.
True: fair semaphore, i.e., the order in which threads start is related to the order in which semaphore. Acquire () is called, i.e., the first started thread gets permission first.

12, tryAcquire() is used to try to obtain 1 license. Returns false if it cannot be retrieved, usually in conjunction with an if statement, which is non-blocking. The non-blocking feature prevents the synchronization from being in a state of continuous waiting.
13, method tryAcquire(n) is to try to obtain n licenses, if not, return false
14, method tryAcquire(long timeout, TimeUnit unit) is to try to obtain 1 license within a specified time. The function of tryAcquire(int acquisition, long timeout, TimeUnit unit) is to attempt to obtain n permits within a specified time. Otherwise, false

can be returned

5, CountDownLatch

CountDownLatch is a class that causes one thread to wait until each of the other threads has completed its own execution.
is implemented by a counter whose initial value is the number of threads. After each thread has finished executing, the value of the counter is -1. When the value of the counter is 0, it means that all threads have finished executing, and then the thread waiting on the lock can resume its work.

CountDownLatch has only one constructor, and you need to pass in an int, which is typically your number of threads

The three methods in the

CountDownLatch class are the most important:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };   
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
//将count值减1
public void countDown() { };