Title: python-asynchronous execution library asyncio
categories: python
tags: [python, asyncio, asynchronous, parallel]
date: 2020-09-28 14:45:34
comments: false
mathjax: true
toc: true
When writing the tool, it needs to request data for dozens of times, synchronous sequential execution, the speed is a little slow, so it is much easier to change to asynchronous parallel execution. Similarly, other designs to IO that will block can be solved by asynchronous parallel execution. Similarly, file IO can also be handled asynchronously.
Asyncio (built-in) + AIoHTTP /aiofiles (requires PIP installation) is used
code
Tool class Async_util.py (simply wrap it up)
# -*- coding: utf-8 -*-
import aiofiles
import aiohttp
import asyncio
import json
import sys
import traceback
import threading
from typing import List
from tool import utils
class CReqInfo:
def __init__(self):
self.url = None
self.method = "POST"
self.data = None
self.extA = None
class CRspInfo:
def __init__(self):
self.code: int = 0
self.text = None
self.extA = None
class CFileInfo:
def __init__(self, path, encoding="utf-8"):
self.path = path
self.encoding = encoding
self.content = None
self.error = None
self.extA = None
class CCmdInfo:
def __init__(self, cmd):
self.code = 0
self.msg = None
self.cmd = cmd
self.extA = None
class CThreadInfo:
def __init__(self, target, args=()):
self.target = target
self.args = args
self.result = None
class CInnerThread(threading.Thread):
# def __init__(self, autoid, target, args=()):
def __init__(self, autoid, ti: CThreadInfo):
super(CInnerThread, self).__init__()
self.autoid = autoid
self.target = ti.target
self.args = ti.args
self.ti: CThreadInfo = ti
def run(self):
try:
self.ti.result = self.target(*self.args)
except Exception as e:
self.ti.result = e
traceback.print_stack()
def get_result(self):
return self.autoid, self.ti
class CAsyncHttp:
async def request(self, reqInfo: CReqInfo):
if isinstance(reqInfo.data, dict):
reqInfo.data = json.dumps(reqInfo.data)
rspInfo = CRspInfo()
try:
async with aiohttp.request(method=reqInfo.method, url=reqInfo.url, data=reqInfo.data) as response:
rspInfo.code = int(response.status)
rspInfo.extA = reqInfo.extA
rspInfo.text = await response.text()
except Exception as e:
rspInfo.code = -999
rspInfo.text = e
finally:
return rspInfo
def doReq(self, *reqArr) -> List[CRspInfo]:
return CAsyncTask().doTask(*[self.request(reqInfo) for reqInfo in reqArr])
class CAsyncFileRead:
async def read(self, fi: CFileInfo):
try:
async with aiofiles.open(fi.path, mode="rb") as fd:
content = await fd.read()
fi.content = fi.encoding is None and content or str(content, encoding=fi.encoding, errors="ignore")
except Exception as e:
fi.error = e
finally:
return fi
def doRead(self, *fileArr) -> List[CFileInfo]:
return CAsyncTask().doTask(*[self.read(fi) for fi in fileArr])
class CAsyncFileWrite:
async def write(self, fi: CFileInfo):
utils.createDirForFile(fi.path)
try:
async with aiofiles.open(fi.path, mode="wb") as fd:
bts = fi.encoding is None and fi.content or fi.content.encode(
encoding=fi.encoding)
await fd.write(bts)
except Exception as e:
fi.error = e
finally:
return fi
def doWrite(self, *fileArr) -> List[CFileInfo]:
return CAsyncTask().doTask(*[self.write(fi) for fi in fileArr])
class CAsyncCmd:
async def run(self, ci: CCmdInfo):
proc = await asyncio.create_subprocess_shell(
ci.cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
bts = stdout or stderr
ci.code = proc.returncode
ci.msg = bts is not None and str(bts, encoding="utf-8", errors="ignore")
return ci
def doCmd(self, *cmdArr) -> List[CCmdInfo]:
return CAsyncTask().doTask(*[self.run(ci) for ci in cmdArr])
class CAsyncTask:
def __init__(self):
self.isStopProgress = False
async def progress(self):
symbol = ["/", "ᅳ", "\\", "|"]
total = len(symbol)
cnt = 0
while not self.isStopProgress:
sys.stdout.write(f"------ processing {symbol[cnt % total]}\r")
sys.stdout.flush()
await asyncio.sleep(0.1)
cnt += 1
print("------ processing 100%")
async def start(self, *taskArr):
first = asyncio.gather(*taskArr)
second = asyncio.create_task(self.progress())
retVal = await first
self.isStopProgress = True
await second
return retVal
def doTask(self, *taskArr):
loop = asyncio.get_event_loop()
res = loop.run_until_complete(self.start(*taskArr))
# loop.close() # https 会报错: RuntimeError: Event loop is closed
return res
class CAsyncThread:
def doRun(self, *threadArr) -> List[CThreadInfo]:
thdInsArr = []
autoid = 1
for ti in threadArr:
thd = CInnerThread(autoid=autoid, ti=ti)
autoid += 1
thdInsArr.append(thd)
thd.start()
retDct = {}
for thd in thdInsArr:
thd.join()
aid, ti = thd.get_result()
retDct[aid] = ti
sorted(retDct.items(), key=lambda x: x[0], reverse=False)
return list(retDct.values())
def doTask(*taskArr):
return CAsyncTask().doTask(*taskArr)
def doReq(*reqArr):
return CAsyncHttp().doReq(*reqArr)
def doRead(*fileArr):
return CAsyncFileRead().doRead(*fileArr)
def doWrite(*fileArr):
return CAsyncFileWrite().doWrite(*fileArr)
def doCmd(*cmdArr):
return CAsyncCmd().doCmd(*cmdArr)
def doRun(*threadArr):
return CAsyncThread().doRun(*threadArr)
The test case
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import os
import asyncio, aiohttp, aiofiles
import json
from datetime import datetime, timedelta
from time import ctime, sleep
import time
import unittest
from tool import utils, async_util
SelfPath: str = os.path.abspath(os.path.dirname(__file__))
class Test_Async(unittest.TestCase):
def setUp(self):
print("\n\n------------------ test result ------------------")
def test_gather(self):
async def count(num):
print(f"One - {num}")
await asyncio.sleep(1)
print(f"Two - {num}")
async def main():
await asyncio.gather(count(1), count(2), count(3)) # gather Execute concurrently, returning sequential results.
asyncio.run(main())
print("--- finished")
def test_createTask(self):
async def count(num):
print("One")
await asyncio.sleep(num)
print("Two")
async def main():
first = asyncio.create_task(count(2)) # Start executing it when you create it.
second = asyncio.create_task(count(1))
await first
print(f"finished first")
await second
print(f"finished second")
asyncio.run(main())
print("--- finished")
def test_progress(self):
from tool.async_util import CAsyncTask, CRspInfo
# Tasks to be performed
async def reqFn(num):
url = "http://149.129.147.44:8305/hotupdate"
reqInfo = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
rspInfo = CRspInfo()
try:
async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
print(f"--- idx: {num} code: {rsp.status}")
rspInfo.code = num
rspInfo.text = await rsp.text()
except:
rspInfo.code = -999
finally:
return rspInfo
async def reqFn01():
print("--- start reqFn01")
await asyncio.sleep(20)
return "hello01"
async def reqFn02():
print("--- start reqFn02")
await asyncio.sleep(10)
return "hello02"
async def reqFn03():
print("--- start reqFn03")
await asyncio.sleep(30)
return "hello03"
taskArr = [reqFn(idx) for idx in range(30)]
res = CAsyncTask().doTask(reqFn01(), reqFn02(), reqFn03(), *taskArr)
print(f"--- finished, res: {utils.beautyJson(res)}")
# asynchronous io http
def test_concurrencyReq(self):
url = "http://149.129.147.44:8305/hotupdate" # Test
# url = "https://www.baidu.com" # Test
reqInfo = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
# code, rspDct = utils.httpPost(url, utils.objToJson(reqInfo))
# print(f"--- code: {code}, rsp: {utils.beautyJson(rspDct)}")
# return
async def reqFn(idx):
try:
# async with aiohttp.request(method="GET", url=url) as rsp:
async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
print(f"--- idx: {idx} code: {rsp.status}")
# response.request_info
res = await rsp.text()
# print(f"--- res: {res}")
return res
except:
return "--- error"
# create task 方式
async def main01():
taskArr = []
for idx in range(5):
task = asyncio.create_task(reqFn(idx)) # Start executing it when you create it.
taskArr.append(task)
resArr = []
for task in taskArr: # Waiting for all requests to complete
res = await task
resArr.append(res)
return resArr
# gather 方式
async def main02():
taskArr = []
for idx in range(5):
task = reqFn(idx)
taskArr.append(task)
return await asyncio.gather(*taskArr)
# True
loop = asyncio.get_event_loop()
resArr = loop.run_until_complete(main02()) # Complete the event loop until the end of the last task
# # Error: RuntimeError: Event loop is closed
# resArr = asyncio.run(main02())
print("--- finished")
print(f"--- resArr: {utils.beautyJson(resArr)}")
def test_compare_http(self):
url = "http://149.129.147.44:8305/hotupdate"
# url = "https://www.baidu.com"
reqCnt = 1
dct = {
"Plat": 8,
"Os": 2,
"Appid": 3,
"Uid": '123123',
"Version": '0.0.0.1',
"Deviceid": 'wolegequ',
}
@utils.call_cost
def syncFn():
print("--- syncFn start")
for idx in range(reqCnt):
code, rspDct = utils.httpPost(url, dct)
print("--- syncFn end")
@utils.call_cost
def asyncFn():
print("--- asyncFn start")
reqArr = []
for idx in range(reqCnt):
ri = async_util.CReqInfo()
ri.url = url
ri.data = dct # dict or json string
ri.method = "POST"
ri.extA = f"extra data {idx}"
reqArr.append(ri)
resArr = async_util.doReq(*reqArr)
print("--- type: {}, len: {}".format(type(resArr), len(resArr)))
# print(f"--- finished, resArr: {utils.beautyJson(resArr)}")
print("--- asyncFn end")
sync_cc = syncFn()
print("sync: {}".format(sync_cc))
print()
async_cc = asyncFn()
print("async: {}".format(async_cc))
def test_asyncFile(self):
async def dealFile(filePath):
print("--- dealFile:", filePath)
async with aiofiles.open(filePath, mode="r") as fd: # read
txt = await fd.read()
print("--- read:", txt)
async with aiofiles.open(filePath, mode="w") as fd: # write
await fd.write("wolegequ")
return "done!!"
path = utils.getDesktop("test_io2/aaa.txt")
res = async_util.doTask(dealFile(path))
print("--- res:", res)
# Asynchronous io file, read by line
def test_asyncLine(self):
async def dealFile(filePath):
print("--- dealFile:", filePath)
async with aiofiles.open(filePath, mode="rb") as fd: # write
async for line in fd:
# print("--- line:", line.decode(encoding="utf-8", errors="ignore"))
print("--- line:", str(line, encoding="utf-8", errors="ignore"))
path = utils.getDesktop("a_temp.lua")
res = async_util.doTask(dealFile(path))
print("--- res:", res)
# Compare file reads, synchronous, asynchronous, time-consuming.
def test_compare_readFile(self):
dstDir = utils.getDesktop("test_io")
fileArr = utils.getFiles(dstDir, ["*.*"])
print("--- fileArr len: {}".format(len(fileArr)))
@utils.call_cost
def syncFn():
print("--- syncFn start")
for file in fileArr:
# time.sleep(0.5)
utils.readFileBytes(file)
print("--- syncFn end")
@utils.call_cost
def asyncFn():
print("--- asyncFn start")
fiArr = [async_util.CFileInfo(file) for file in fileArr]
res = async_util.doRead(*fiArr)
# print("--- res:", utils.beautyJson(res))
# 换个目录写进去
# for fi in fiArr:
# fi.path = fi.path.replace("test_io2", "test_io3")
# async_util.doWrite(*fiArr)
print("--- asyncFn end")
sync_cc = syncFn()
print("sync: {}".format(sync_cc))
print()
async_cc = asyncFn()
print("async: {}".format(async_cc))
# Asynchronous parallel execution of system commands
def test_subprocess(self):
# official document: https://docs.python.org/3/library/asyncio-subprocess.html
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode(errors="ignore")}')
if stderr:
print(f'[stderr]\n{stderr.decode(errors="ignore")}')
cmd = "git status"
asyncio.run(run(cmd))
def test_compare_subprocess(self):
cnt = 5
# cmd = "git status"
cmd = "call {}".format(utils.getDesktop("aaa.exe"))
@utils.call_cost
def asyncFn():
cmdArr = []
for i in range(cnt):
ci = async_util.CCmdInfo(cmd)
ci.extA = i
cmdArr.append(ci)
res = async_util.doCmd(*cmdArr)
# print("--- res:", utils.beautyJson(res))
@utils.call_cost
def syncFn():
async def run(command):
return utils.cmdToString(command)
res = async_util.doTask(*[run(cmd) for i in range(cnt)])
# print("--- res:", utils.beautyJson(res))
dt1 = syncFn()
print("--- syncFn cost time:", dt1) # --- syncFn cost time: 00:00:45
dt2 = asyncFn()
print("--- asyncFn cost time:", dt2) # --- asyncFn cost time: 00:00:09
# True multi-threaded parallelism.
def test_multi_thread(self):
def fn001(name):
# print("--- hello 111, name: {}".format(name))
# time.sleep(5)
# print("--- hello 222, name: {}".format(name))
# error
# assert False, "--- wolegequ"
# arr = []
# b = arr[1]
utils.execute("call {}".format(utils.getDesktop("aaa.exe")))
return "world-{}".format(name)
# res Sequential return value
res = async_util.doRun(*[async_util.CThreadInfo(target=fn001, args=(i,)) for i in range(3)])
# print("--- end, res: {}".format(utils.beautyJson([str(ti.result) for ti in res])))
for ti in res:
print("--- result is error:", utils.isError(ti.result))
# print("--- exmsg:", utils.exmsg(ti.result))
if __name__ == "__main__":
ins = Test_Async()
ins.test_multi_thread()
Read More:
- [Solved] Python Error: asyncio RuntimeError: This event loop is already running
- Python3.6 Run uvicorn Error: AttributeError: module ‘asyncio‘ has no attribute ‘run‘
- Python asynchronous co process crawler error: [aiohttp. Client]_ Exceptions: serverdisconnected error: Server Disconnected]
- Python netmiko library Cisco telnet switch automation
- [Solved] ParserError: NULL byte detected. This byte cannot be processed in Python‘s native csv library
- Mac Upgrade pip Error OSError: [Errno 13] Permission denied: ‘/Library/Python/2.7/site-packages/pip-9.0.1-py2….
- [Solved] Python Image Library fails with message “decoder JPEG not available” – PIL
- Invalid python sd, Fatal Python error: init_fs_encoding: failed to get the Python cod [How to Solve]
- [Solved] NPM install Error: check python checking for Python executable python2 in the PATH
- How to Solve Python WARNING: Ignoring invalid distribution -ip (e:\python\python_dowmload\lib\site-packages)
- [Solved] opencv-python: recipe for target ‘modules/python3/CMakeFiles/opencv_python3.dir/all‘ failed
- [Solved] RuntimeError (note: full exception trace is shown but execution is paused at: <module>)
- Pyyaml tutorial introduction to pyyaml library and YML writing and reading
- [How to Solve] Reason: Incompatible library version
- [Solved] RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling `cubla…
- HDF5 library version mismatched error [How to Solve]
- [Solved] Pdfplumber Read PDF Sheet Error: AttributeError: function/symbol ‘ARC4_stream_init‘ not found in library
- Nltk Library Download error: [errno: 11004] getaddrinfo failed
- Linux installs Python and upgrades Python
- [Solved] RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling `cublasSgemm