Python asynchronous execution library asyncio


Title: python-asynchronous execution library asyncio
categories: python
tags: [python, asyncio, asynchronous, parallel]
date: 2020-09-28 14:45:34
comments: false
mathjax: true
toc: true

When writing the tool, it needs to request data for dozens of times, synchronous sequential execution, the speed is a little slow, so it is much easier to change to asynchronous parallel execution. Similarly, other designs to IO that will block can be solved by asynchronous parallel execution. Similarly, file IO can also be handled asynchronously.
Asyncio (built-in) + AIoHTTP /aiofiles (requires PIP installation) is used

 


code
Tool class Async_util.py (simply wrap it up)

# -*- coding: utf-8 -*-

import aiofiles
import aiohttp
import asyncio
import json
import sys
import traceback
import threading
from typing import List

from tool import utils


class CReqInfo:
    def __init__(self):
        self.url = None
        self.method = "POST"
        self.data = None
        self.extA = None  


class CRspInfo:
    def __init__(self):
        self.code: int = 0
        self.text = None
        self.extA = None  


class CFileInfo:
    def __init__(self, path, encoding="utf-8"):  
        self.path = path
        self.encoding = encoding
        self.content = None
        self.error = None
        self.extA = None 


class CCmdInfo:
    def __init__(self, cmd):
        self.code = 0
        self.msg = None
        self.cmd = cmd
        self.extA = None  


class CThreadInfo:
    def __init__(self, target, args=()):
        self.target = target
        self.args = args
        self.result = None


class CInnerThread(threading.Thread):
    # def __init__(self, autoid, target, args=()):
    def __init__(self, autoid, ti: CThreadInfo):
        super(CInnerThread, self).__init__()
        self.autoid = autoid
        self.target = ti.target
        self.args = ti.args
        self.ti: CThreadInfo = ti

    def run(self):
        try:
            self.ti.result = self.target(*self.args)
        except Exception as e:
            self.ti.result = e
            traceback.print_stack()

    def get_result(self):
        return self.autoid, self.ti


class CAsyncHttp:


    async def request(self, reqInfo: CReqInfo):
        if isinstance(reqInfo.data, dict):
            reqInfo.data = json.dumps(reqInfo.data)

        rspInfo = CRspInfo()
        try:
            async with aiohttp.request(method=reqInfo.method, url=reqInfo.url, data=reqInfo.data) as response:
                rspInfo.code = int(response.status)
                rspInfo.extA = reqInfo.extA
                rspInfo.text = await response.text()
        except Exception as e:
            rspInfo.code = -999
            rspInfo.text = e
        finally:
            return rspInfo

    def doReq(self, *reqArr) -> List[CRspInfo]:
        return CAsyncTask().doTask(*[self.request(reqInfo) for reqInfo in reqArr])


class CAsyncFileRead:


    async def read(self, fi: CFileInfo):
        try:
            async with aiofiles.open(fi.path, mode="rb") as fd:
                content = await fd.read()
                fi.content = fi.encoding is None and content or str(content, encoding=fi.encoding, errors="ignore")
        except Exception as e:
            fi.error = e
        finally:
            return fi

    def doRead(self, *fileArr) -> List[CFileInfo]:
        return CAsyncTask().doTask(*[self.read(fi) for fi in fileArr])


class CAsyncFileWrite:


    async def write(self, fi: CFileInfo):
        utils.createDirForFile(fi.path)
        try:
            async with aiofiles.open(fi.path, mode="wb") as fd:
                bts = fi.encoding is None and fi.content or fi.content.encode(
                    encoding=fi.encoding)
                await fd.write(bts)
        except Exception as e:
            fi.error = e
        finally:
            return fi

    def doWrite(self, *fileArr) -> List[CFileInfo]:
        return CAsyncTask().doTask(*[self.write(fi) for fi in fileArr])


class CAsyncCmd:


    async def run(self, ci: CCmdInfo):
        proc = await asyncio.create_subprocess_shell(
            ci.cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()
        bts = stdout or stderr

        ci.code = proc.returncode
        ci.msg = bts is not None and str(bts, encoding="utf-8", errors="ignore")
        return ci

    def doCmd(self, *cmdArr) -> List[CCmdInfo]:
        return CAsyncTask().doTask(*[self.run(ci) for ci in cmdArr])


class CAsyncTask:


    def __init__(self):
        self.isStopProgress = False

    async def progress(self):
        symbol = ["/", "ᅳ", "\\", "|"]
        total = len(symbol)
        cnt = 0
        while not self.isStopProgress:
            sys.stdout.write(f"------ processing {symbol[cnt % total]}\r")
            sys.stdout.flush()
            await asyncio.sleep(0.1)
            cnt += 1
        print("------ processing 100%")

    async def start(self, *taskArr):
        first = asyncio.gather(*taskArr)
        second = asyncio.create_task(self.progress())

        retVal = await first
        self.isStopProgress = True
        await second

        return retVal

    def doTask(self, *taskArr):
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(self.start(*taskArr))
        # loop.close() # https 会报错: RuntimeError: Event loop is closed
        return res


class CAsyncThread:


    def doRun(self, *threadArr) -> List[CThreadInfo]:
        thdInsArr = []
        autoid = 1
        for ti in threadArr:
            thd = CInnerThread(autoid=autoid, ti=ti)
            autoid += 1
            thdInsArr.append(thd)
            thd.start()

        retDct = {}
        for thd in thdInsArr:
            thd.join()
            aid, ti = thd.get_result()
            retDct[aid] = ti

        sorted(retDct.items(), key=lambda x: x[0], reverse=False)
        return list(retDct.values())




def doTask(*taskArr):
    return CAsyncTask().doTask(*taskArr)


def doReq(*reqArr):
    return CAsyncHttp().doReq(*reqArr)


def doRead(*fileArr):
    return CAsyncFileRead().doRead(*fileArr)


def doWrite(*fileArr):
    return CAsyncFileWrite().doWrite(*fileArr)


def doCmd(*cmdArr):
    return CAsyncCmd().doCmd(*cmdArr)


def doRun(*threadArr):
    return CAsyncThread().doRun(*threadArr)

The test case

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import os
import asyncio, aiohttp, aiofiles
import json
from datetime import datetime, timedelta

from time import ctime, sleep
import time
import unittest

from tool import utils, async_util

SelfPath: str = os.path.abspath(os.path.dirname(__file__))




class Test_Async(unittest.TestCase):
    def setUp(self):
        print("\n\n------------------ test result ------------------")

    def test_gather(self):
        async def count(num):
            print(f"One - {num}")
            await asyncio.sleep(1)
            print(f"Two - {num}")

        async def main():
            await asyncio.gather(count(1), count(2), count(3))  # gather Execute concurrently, returning sequential results.

        asyncio.run(main())
        print("--- finished")

    def test_createTask(self):
        async def count(num):
            print("One")
            await asyncio.sleep(num)
            print("Two")

        async def main():
            first = asyncio.create_task(count(2))  # Start executing it when you create it.
            second = asyncio.create_task(count(1))

            await first
            print(f"finished first")
            await second
            print(f"finished second")

        asyncio.run(main())
        print("--- finished")

    def test_progress(self):
        from tool.async_util import CAsyncTask, CRspInfo

        # Tasks to be performed
        async def reqFn(num):
            url = "http://149.129.147.44:8305/hotupdate"
            reqInfo = {
                "Plat": 8,
                "Os": 2,
                "Appid": 3,
                "Uid": '123123',
                "Version": '0.0.0.1',
                "Deviceid": 'wolegequ',
            }
            rspInfo = CRspInfo()
            try:
                async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                    print(f"--- idx: {num} code: {rsp.status}")
                    rspInfo.code = num
                    rspInfo.text = await rsp.text()
            except:
                rspInfo.code = -999
            finally:
                return rspInfo

        async def reqFn01():
            print("--- start reqFn01")
            await asyncio.sleep(20)
            return "hello01"

        async def reqFn02():
            print("--- start reqFn02")
            await asyncio.sleep(10)
            return "hello02"

        async def reqFn03():
            print("--- start reqFn03")
            await asyncio.sleep(30)
            return "hello03"

        taskArr = [reqFn(idx) for idx in range(30)]

        res = CAsyncTask().doTask(reqFn01(), reqFn02(), reqFn03(), *taskArr)
        print(f"--- finished, res: {utils.beautyJson(res)}")

    # asynchronous io http
    def test_concurrencyReq(self):
        url = "http://149.129.147.44:8305/hotupdate"  # Test
        # url = "https://www.baidu.com"  # Test

        reqInfo = {
            "Plat": 8,
            "Os": 2,
            "Appid": 3,
            "Uid": '123123',
            "Version": '0.0.0.1',
            "Deviceid": 'wolegequ',
        }

        # code, rspDct = utils.httpPost(url, utils.objToJson(reqInfo))
        # print(f"--- code: {code}, rsp: {utils.beautyJson(rspDct)}")
        # return

        async def reqFn(idx):
            try:
                # async with aiohttp.request(method="GET", url=url) as rsp:
                async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                    print(f"--- idx: {idx} code: {rsp.status}")
                    # response.request_info 
                    res = await rsp.text()
                    # print(f"--- res: {res}")
                    return res
            except:
                return "--- error"

        # create task 方式
        async def main01():
            taskArr = []
            for idx in range(5):
                task = asyncio.create_task(reqFn(idx))  # Start executing it when you create it.
                taskArr.append(task)

            resArr = []
            for task in taskArr:  # Waiting for all requests to complete
                res = await task
                resArr.append(res)
            return resArr

        # gather 方式
        async def main02():
            taskArr = []
            for idx in range(5):
                task = reqFn(idx)
                taskArr.append(task)
            return await asyncio.gather(*taskArr)

        # True
        loop = asyncio.get_event_loop()
        resArr = loop.run_until_complete(main02())  # Complete the event loop until the end of the last task

        # # Error: RuntimeError: Event loop is closed
        # resArr = asyncio.run(main02())

        print("--- finished")
        print(f"--- resArr: {utils.beautyJson(resArr)}")

    def test_compare_http(self):
        url = "http://149.129.147.44:8305/hotupdate"
        # url = "https://www.baidu.com"
        reqCnt = 1

        dct = {
            "Plat": 8,
            "Os": 2,
            "Appid": 3,
            "Uid": '123123',
            "Version": '0.0.0.1',
            "Deviceid": 'wolegequ',
        }

        @utils.call_cost
        def syncFn():
            print("--- syncFn start")
            for idx in range(reqCnt):
                code, rspDct = utils.httpPost(url, dct)
            print("--- syncFn end")

        @utils.call_cost
        def asyncFn():
            print("--- asyncFn start")

            reqArr = []
            for idx in range(reqCnt):
                ri = async_util.CReqInfo()
                ri.url = url
                ri.data = dct  # dict or json string
                ri.method = "POST"
                ri.extA = f"extra data {idx}"
                reqArr.append(ri)

            resArr = async_util.doReq(*reqArr)
            print("--- type: {}, len: {}".format(type(resArr), len(resArr)))
            # print(f"--- finished, resArr: {utils.beautyJson(resArr)}")
            print("--- asyncFn end")

        sync_cc = syncFn()
        print("sync: {}".format(sync_cc))

        print()
        async_cc = asyncFn()
        print("async: {}".format(async_cc))


    def test_asyncFile(self):
        async def dealFile(filePath):
            print("--- dealFile:", filePath)
            async with aiofiles.open(filePath, mode="r") as fd:  # read
                txt = await fd.read()
                print("--- read:", txt)

            async with aiofiles.open(filePath, mode="w") as fd:  # write
                await fd.write("wolegequ")

            return "done!!"

        path = utils.getDesktop("test_io2/aaa.txt")
        res = async_util.doTask(dealFile(path))
        print("--- res:", res)

    # Asynchronous io file, read by line
    def test_asyncLine(self):
        async def dealFile(filePath):
            print("--- dealFile:", filePath)
            async with aiofiles.open(filePath, mode="rb") as fd:  # write
                async for line in fd:
                    # print("--- line:", line.decode(encoding="utf-8", errors="ignore"))
                    print("--- line:", str(line, encoding="utf-8", errors="ignore"))

        path = utils.getDesktop("a_temp.lua")
        res = async_util.doTask(dealFile(path))
        print("--- res:", res)

    # Compare file reads, synchronous, asynchronous, time-consuming.
    def test_compare_readFile(self):
        dstDir = utils.getDesktop("test_io")
        fileArr = utils.getFiles(dstDir, ["*.*"])
        print("--- fileArr len: {}".format(len(fileArr)))

        @utils.call_cost
        def syncFn():
            print("--- syncFn start")
            for file in fileArr:
                # time.sleep(0.5)
                utils.readFileBytes(file)
            print("--- syncFn end")

        @utils.call_cost
        def asyncFn():
            print("--- asyncFn start")

            fiArr = [async_util.CFileInfo(file) for file in fileArr]

            res = async_util.doRead(*fiArr)
            # print("--- res:", utils.beautyJson(res))

            # 换个目录写进去
            # for fi in fiArr:
            #     fi.path = fi.path.replace("test_io2", "test_io3")
            # async_util.doWrite(*fiArr)

            print("--- asyncFn end")

        sync_cc = syncFn()
        print("sync: {}".format(sync_cc))

        print()
        async_cc = asyncFn()
        print("async: {}".format(async_cc))

    # Asynchronous parallel execution of system commands
    def test_subprocess(self):
        # official document: https://docs.python.org/3/library/asyncio-subprocess.html

        async def run(cmd):
            proc = await asyncio.create_subprocess_shell(
                cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE)

            stdout, stderr = await proc.communicate()

            print(f'[{cmd!r} exited with {proc.returncode}]')
            if stdout:
                print(f'[stdout]\n{stdout.decode(errors="ignore")}')
            if stderr:
                print(f'[stderr]\n{stderr.decode(errors="ignore")}')

        cmd = "git status"
        asyncio.run(run(cmd))

    def test_compare_subprocess(self):
        cnt = 5

        # cmd = "git status"
        cmd = "call {}".format(utils.getDesktop("aaa.exe")) 

        @utils.call_cost
        def asyncFn():
            cmdArr = []
            for i in range(cnt):
                ci = async_util.CCmdInfo(cmd)
                ci.extA = i
                cmdArr.append(ci)

            res = async_util.doCmd(*cmdArr)
            # print("--- res:", utils.beautyJson(res))

        @utils.call_cost
        def syncFn():
            async def run(command):
                return utils.cmdToString(command)

            res = async_util.doTask(*[run(cmd) for i in range(cnt)])
            # print("--- res:", utils.beautyJson(res))

        dt1 = syncFn()
        print("--- syncFn cost time:", dt1)  # --- syncFn cost time: 00:00:45

        dt2 = asyncFn()
        print("--- asyncFn cost time:", dt2)  # --- asyncFn cost time: 00:00:09

    # True multi-threaded parallelism.
    def test_multi_thread(self):
        def fn001(name):
            # print("--- hello 111, name: {}".format(name))
            # time.sleep(5)
            # print("--- hello 222, name: {}".format(name))

            # error
            # assert False, "--- wolegequ"
            # arr = []
            # b = arr[1]

            utils.execute("call {}".format(utils.getDesktop("aaa.exe")))
            return "world-{}".format(name)

        # res Sequential return value
        res = async_util.doRun(*[async_util.CThreadInfo(target=fn001, args=(i,)) for i in range(3)])
        # print("--- end, res: {}".format(utils.beautyJson([str(ti.result) for ti in res])))
        for ti in res:
            print("--- result is error:", utils.isError(ti.result))
            # print("--- exmsg:", utils.exmsg(ti.result))


if __name__ == "__main__":
    ins = Test_Async()
    ins.test_multi_thread()

Read More: