Tag Archives: asynchronous

Error (17) solves the problem of losing the request header of multithread asynchronous feign call

questions

In microservice, the request header will be lost in multithreading asynchronous + feign call

solving
    1. in the main thread, the request header parameters are first obtained and passed into the sub thread, and then the sub thread sets the request header parameters to the context. Finally, in feign forwarding processing, the request header data of the context set by the sub thread is obtained and forwarded to the downstream

Get context request parameter tool class

@Slf4j
public class RequestContextUtil {

    /**
     * Get request header data
     *
     * @return key-> request header name value-> request header value
     * @author zhengqingya
     * @date 2021/6/30 9:39 PM
     */
    public static Map<String, String> getHeaderMap() {
        Map<String, String> headerMap = Maps.newLinkedHashMap();
        try {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            if (requestAttributes == null) {
                return headerMap;
            }
            HttpServletRequest request = requestAttributes.getRequest();
            Enumeration<String> enumeration = request.getHeaderNames();
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                String value = request.getHeader(key);
                headerMap.put(key, value);
            }
        } catch (Exception e) {
            log.error("《RequestContextUtil》 Failed to get request header parameters:", e);
        }
        return headerMap;
    }

}

Request header context

@Slf4j
public class RequestHeaderHandler {

    public static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new ThreadLocal<>();

    public static void setHeaderMap(Map<String, String> headerMap) {
        THREAD_LOCAL.set(headerMap);
    }

    public static Map<String, String> getHeaderMap() {
        return THREAD_LOCAL.get();
    }

    public static void remove() {
        THREAD_LOCAL.remove();
    }

}

Feign forwarding process RPC call parameter transfer

 */
@Slf4j
@Configuration
public class FeignRequestInterceptor implements RequestInterceptor {

    @Override
    @SneakyThrows
    public void apply(RequestTemplate requestTemplate) {
        log.debug("========================== ↓↓↓↓↓↓ 《FeignRequestInterceptor》 Start... ↓↓↓↓↓↓ ==========================");
        Map<String, String> threadHeaderNameMap = RequestHeaderHandler.getHeaderMap();
        if (!CollectionUtils.isEmpty(threadHeaderNameMap)) {
            threadHeaderNameMap.forEach((headerName, headerValue) -> {
                log.debug("《FeignRequestInterceptor》 Multi-threaded headerName:[{}] headerValue:[{}]", headerName, headerValue);
                requestTemplate.header(headerName, headerValue);
            });
        }
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        headerMap.forEach((headerName, headerValue) -> {
            log.debug("《FeignRequestInterceptor》 headerName:[{}] headerValue:[{}]", headerName, headerValue);
            requestTemplate.header(headerName, headerValue);
        });
        log.debug("========================== ↑↑↑↑↑↑ 《FeignRequestInterceptor》 End... ↑↑↑↑↑↑ ==========================");
    }

}

Use cases

@Slf4j
@RestController
@RequestMapping("/web/api/demo/test")
@Api(tags = "test api")
@AllArgsConstructor
public class RpcController extends BaseController {

    private SystemTaskThread systemTaskThread;

    @GetMapping("getContextUserId")
    @ApiOperation("rpc Call test - Async")
    public void getContextUserId() {
        Map<String, String> headerMap = RequestContextUtil.getHeaderMap();
        log.info("Main thread request header value: {}", headerMap.get("userId"));
        this.systemTaskThread.getRequestHeaderUserId(RequestContextUtil.getHeaderMap());
    }

}

@Slf4j
@Component
@AllArgsConstructor
public class SystemTaskThread {

    private ISystemClient systemClient;

    @SneakyThrows
    @Async(ThreadPoolConstant.SMALL_TOOLS_THREAD_POOL)
    public void getRequestHeaderUserId(Map<String, String> headerMap) {
        RequestHeaderHandler.setHeaderMap(headerMap);
        log.info("Sub-thread request header value: {}", this.systemClient.getRequestHeaderUserId());
    }

}

Note: it is also mentioned on the Internet that the main thread obtains the request parameters requestattributes, requestattributes = requestcontextholder. Getrequestattributes() to the child thread, and then re assign requestcontextholder. Setrequestattributes (requestattributes) But this method is not effective. Please record it here by the way~

TypeError: An asyncio.Future, a coroutine or an awaitable is

Typeerror: an asyncio. Future, a coroutine or an awaitable is required
when it’s asynchronous, it’s found that one is reporting this error. Some methods are found on the Internet, but it doesn’t work, so it’s found that it doesn’t work

Async is missing from the front of the method

Full asynchronous:

async def ExportData(v,f_row,a):
			........


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
 t1 = time.time()
 # for v in a[1:]:
 tasks = [ExportData(v,f_row,a) for v in a[1:]]
 loop.run_until_complete(asyncio.gather(*tasks))

QuickFIX/J100% Java Open Source FIX Engine

http://www.quickfixj.org/
The Financial Information eXchange (FIX) protocol is a messaging standard developed specifically for the real-time electronic exchange of securities transactions. FIX is a public-domain specification owned and maintained by FIX Protocol, Ltd (FPL).
QuickFIX/J is a full featured messaging engine for the FIX protocol. It is a 100% Java open source implementation of the popular C++ QuickFIX engine.
Features:
Free! It costs nothing and has a very liberal open source licence. Full source code available (also at no cost). Supports FIX versions 4.0 – 4.4, 5.0/FIXT1.1. Runs on any hardware and operating system supported by 1.4+ Java SE or compatible VM. Compatibility with QuickFIX C++ Java Native Wrapper API (easy to upgrade)Java NIO asynchronous network communications for scalability (using Apache MINA)Supports embedded SSL with Java 5+Provides standard JMX MBeans for FIX engine management Easy to embed in existing Java applications.
Choice of message processing threading strategiesCommunication transports for TCP sockets and VM pipes. Metadata-driven parsing and validation. Metadata-driven code generation of type-safe FIX message-related classes. Metadata API for use at application level (for example, FIX messaging UI). Support for protocol customizations (new messages, fields, constraints). Session state storage plugins: JDBC, File, SleepyCat/JE, In memoryLogging plugins: JDBC, File, SFL4J (supports JDK1.4 logging, Log4J, Commons Logging), Console, Composite
Failover and High Availability. Scheduling of session connections. Many automated unit and acceptance tests. Example applications: Simple Swing order entry UI and a console-based order execution simulator. Commercial support available from multiple sources.
 
About QuickFIX/J…
The Financial Information eXchange (FIX) protocol is a messaging standard developed specifically for the real-time electronic exchange of securities transactions. FIX is a public-domain specification owned and maintained by FIX Protocol, Ltd (FPL). The FPL mission is to improve the global trading process by defining, managing, and promoting an open protocol for real-time, electronic communication between industry participants, while complementing industry standards.
QuickFIX/J is a full featured messaging engine for the FIX protocol. It is a 100% Java open source implementation of the popular C++ QuickFIX engine. For more information see the QuickFIX/J web site.
Features:
Free! It costs nothing and has a very liberal open source licence. Full source code available (also at no cost). Supports FIX versions 4.0 – 4.4. Runs on any hardware and operating system supported by 1.4+ Java SE or compatible VM. Compatibility with QuickFIX C++ Java Native Wrapper API (easy to upgrade) Java NIO asynchronous network communications for scalability (using Apache MINA) Easy to embed in existing Java applications.
Choice of message processing threading strategies Communication transports for TCP sockets and VM pipes. Metadata-driven parsing and validation. Metadata-driven code generation of type-safe FIX message-related classes. Metadata API for use at application level (for example, FIX messaging UI). Support for protocol customizations (new messages, fields, constraints). Session state storage plugins: JDBC, File, SleepyCat/JE, In memory Logging plugins: JDBC, File, SFL4J (supports JDK1.4 logging, Log4J, Commons Logging), Console Failover and High Availability. Scheduling of session connections. Many automated unit and acceptance tests. Integrated SSL communications Example applications: Simple Swing order entry UI and a console-based order execution simulator. Commercial support available from multiple sources.
Building and Testing QuickFIX/J
Dependencies Building QuickFIX/J Generating the database for JDBC based store and log Testing QuickFIX/J
Using QuickFIX/J
Creating Your Application Configuration Acceptor Failover Support Dynamic Acceptor Session Definition Receiving Messages Sending Messages Repeating Groups User Defined Fields Validation Secure communications using SSL Managing QFJ Applications With JMX Configuring character sets. Customizing Message Code Generation. Example Applications Determining your QFJ version.
 

Asynchronous loading JS does not allow the use of document write solution

asynchronous loading js does not allow the use of document write solution

to recommend a cat smoking website: love cat family (http://15cat.com), I hope you like


var scriptFile = document.createElement('script');

scriptFile.setAttribute("type","text/javascript");

scriptFile.setAttribute("src",'http://api.map.baidu.com/api?type=quick&ak=o9B4Ol99j9NcBXSu5nFTR7uI&v=1.0');

document.getElementsByTagName("head")[0].appendChild(scriptFile);

When you finally want to add it to the head, chrome comes up with the following warning.
Failed to execute ‘write’ on ‘Document’: It isn’t possible to write into a document from an asynchronously-loaded external script Unless it is explicitly opened.
What is this?
PS: An error in chrome in console (a red error mark) will prevent the script from executing after the error, a warning (yellow exclamation mark) just won’t execute where it was warned.

Solution.
This occurs because the code introduced contains a document.write method, and asynchronously loaded js is not allowed to use the document.write method.
Since the document has been loaded and parsed, the document stream is closed.
So the js you load asynchronously can no longer write into the document, such as using document.write.
So two direct links can be introduced.


var usel = '<script src="';
usel += gds[0].imageurl;
usel += '"></script>';
document.write(usel);

 

Parallel processing in Python (Pool.map(), Pool.starmap(), Pool.apply ())

1. Parallel processing

parallel processing is a mode of operation in which tasks are run simultaneously in multiple processors on the same computer. The purpose of this mode of work is to reduce the total task processing time, but there is an additional overhead associated with communicating between processes, so for small tasks, the total task time increases rather than decreases.

in the Python language, the multiprocessing module runs a separate parallel process by using child processes instead of threads. It allows you to take advantage of multiple processors on your machine (Windows and Unix), that is, multiple processes can run

in memory completely independently

2. How many parallel processing can your device do at most

the maximum number of processes that can be run at one time is limited by the number of processors in the computer. You can use the cpu_count() function in the multiprocessing module to display

import multiprocessing as mp

print("Number of processers: ", mp.cpu_count())

like my computer only has four:

3. Execute synchronously and asynchronously </h3 b> In

parallel processing, there are two types of execution: synchronous and asynchronous

synchronous execution means that each process completes in the order in which it was started. This is done by locking the main program until the corresponding process has finished executing.

asynchronous execution , in other words, the execution of a process does not involve locking, and as a result, the order in which the process results are returned may be confusing, but in general, asynchronous execution completes faster.

There are two objects in the

multiprocessing module to implement parallel function execution: Pool class and Process class

</ strong>

</ p>

4. Practical problem solving example: calculate the number of elements </h3 b> within a given numerical range in each row

given a two-dimensional matrix (or list and multidimensional list), calculate the number of elements

within a given numerical range in each row

import numpy as np
from time import time

# RandomState()是一个伪随机数生成器
np.random.RandomState(100)
# 0, 10 : 生成0到10的随机整数
# size=[200000, 5]  即生成200000行,一列的 ndarray(二维矩阵的形式,每个里面5个元素)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()  # 将numpy.ndarray 转化为list
# 因为是随机的,所以每次的数字不确定
data = data[:5]
print("数据为:", data)

"""
运行结果:
数据为: [[5, 6, 7, 0, 9], [4, 0, 6, 7, 4], [7, 3, 8, 3, 9], [2, 1, 9, 3, 2], [0, 0, 9, 5, 2]]


"""

4.1 </ strong> don’t use parallel processing reference code </ strong> </ p>
The

function howmany_within_range() is repeated to check howmany diseases the number in the range has returned a count

"""不使用并行处理"""

def howmany_within_range(row, minimum, maximum):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count += 1
    return count


result = []
for row in data:
    result.append(howmany_within_range(row, minimum=4, maximum=8))
print("给定数值范围中的元素个数:", result[:10])
"""
注意:以下只是参考输出,因为输入序列是随机的,每次输出结果并不固定
运行结果:
给定数值范围中的元素 [3, 2, 3, 4, 2, 3, 3, 2, 2, 2]
"""

4.2 parallelize the function

The usual way to parallelize code is to take a specific function that can be run multiple times, put it on a different processor, and run it. To do this, you need to use the Pool class to initialize the number of n processors, and then pass the function you want to run and run to the parallel method in the Pool class.

The apply(), map() and starmap() methods are provided in the Pool() to run the passed functions in parallel.

What’s the difference between

apply() and map()?

apply() and map() both take the function to be parallelized as the main parameter, but the difference is that apply() takes the args parameter, which passes each parameter to the function to be parallelized, while map takes only one iterator as the parameter.

Therefore, it is better to use map() for simple, iterative operations for parallel processing, and for faster completion of work

2 Pool. The apply () for parallel processing </ strong> </ p>

if __name__ == '__main__':

    # 1.初始化 multiprocessing.Pool()
    pool = mp.Pool(mp.cpu_count())

    # 2.使用apply(), 将函数howmany_within_range作为主参传进去
    results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

    # 3. 不要忘记关闭进程
    pool.close()

    print(results[:10])

note: </ strong> use if __name__ = = “__main__ ‘: put your code in the following to perform, or complains </ p>

The freeze_support “()” line can be omitted if The program
is not going to be frozen to produce an executable. </ p>

if we print outside of this program, we’ll see that it’s going to run in parallel, so we’ll print outside of this program </p b> multiple times

4.2.2 Parallelizing using the Pool. The map () </ strong> </ p>

pool.map () takes only one iterator argument. A simple change to howmany_within_range() is to howmany_within_range_rowonly(), which sets the minimum and maximum to fixed values, that is, . Accepting only the row data list iterator as input is not the best approach, but it clearly shows how it differs from apply()

import multiprocessing as mp
    
    def howmany_within_range_rowonly(row, minimum=4, maximum=8):
        count = 0
        for n in row:
            if minimum <= n <= maximum:
                count += 1
        return count
    
    pool = mp.Pool(mp.cpu_count())
    results = pool.map(howmany_within_range_rowonly,[row for row in data])
    pool.close()
    print(results[:10])

holdings using the Pool. The starmap to parallelize () </ strong> </ p>

, like pool.map (), pool.starmap () takes only one iterator argument, but in starmap() each element in the iterator is also an iterator. This internal iterator allows you to pass arguments to the parallelized function and unwrap them sequentially at execution time, as long as they are passed and unwrapped in the same order

actually, pool. starmap() is like a pool. map() version

that takes arguments

    import multiprocessing as mp

    pool = mp.Pool(mp.cpu_count())
    results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])
    pool.close()
    print(results[:10])

5. Asynchronous parallel processing </h3 b>

and synchronous parallel processing peer asynchronous parallel processing functions apply_async(), map_async(), and starmap_async() allow parallel execution of processes asynchronously, that is, the next process can be started as soon as the previous one completes, regardless of the order in which it was started. Therefore, there is no guarantee that the result will be in the same order as the input

6. The use of the Pool. Apply_async to parallelize () </ strong> </ p>

keep updating

</ p>