Category Archives: Python

Python uses try… Except… To output detailed errors

When a Python segment USES try... except... After , I don’t know how to locate the detailed program crush
. These two days, the program needs to use this aspect, so I looked it up.

You need to use the Traceback package

import traceback

try:
		#Dividing by zero errors is an example
		3/0
except Exception, e:
		#This is the output for the error category, which is not really visible if the catch is a generic error
		print 'str(Exception):\t', str(Exception) #Output str(Exception): <type 'exceptions.Exception'>
		#This is the specific reason for the output error. 
		print 'str(e):\t\t', str(e) #output str(e): integer division or modulo by zero
		print 'repr(e):\t', repr(e) # Output repr(e): ZeroDivisionError('integer division or modulo by zero',)
		print 'traceback.print_exc():';    
		#Both of the following steps output the exact location of the error
		traceback.print_exc()
		print 'traceback.format_exc():\n%s' % traceback.format_exc()

In addition, Python 2.6 except after the sentence can be replaced with except Exception as e

How to Fix keyerror in Python dictionary lookup

Python dictionaries usually look for keys directly, for example

dict={'a':1,'b':2,'c':3}
print(dict['a'])

But if the key is not found, it will report: KeyError:
So you want to look at print(dict[‘d’])
Since the dict doesn’t have the key in it, it’s just going to report an error, and python gives us a great way to do that, which is to use

setdefault,The usage is as follows: dict.setdefault(key, [here set what you want the value to be if it doesn't exist, default is None]).


So here we can use this method to solve:

print(dict.setdefault('d',0))

If you want to add a new value to the dict, use setdefault. If you simply want to find the value, but the key does not exist, or you want to get a default value by reading the value through this key, then I recommend using defaultdict
I’ll start with this so-called Defaultdict from the Collections module, which is a collections module. Defaultdict (Function_factory) builds a Dictionary-like object where the value of the key is assigned by itself, but the type of value is an instance of a class in the Function_Factory and has a default value. Another concept introduced here is factory functions. Python’s factory functions are those built-in functions that are class objects, and when you call them, you’re actually creating an instance of a class. Int (), STR (),set(), etc.

import collections
s = [('yellow', 1), ('blue', 2), ('yellow', 3), ('blue', 4), ('red', 1)]
d = collections.defaultdict(list)
for k, v in s:
    d[k].append(v)
print(d['yellow'])
print(d['white'])
print(list(d.items()))

Our final output is as follows:

We can see that when d has no corresponding key, it returns an empty list, because the factory function we used when we set defaultdict is list, and the default value of list is empty list. Now let’s see what would happen if the factory function was set()

import collections
s = [('yellow', 1), ('blue', 2), ('yellow', 3), ('blue', 4), ('red', 1)]
d = collections.defaultdict(set)
for k, v in s:
    d[k].add(v)
print(d['yellow'])
print(d['white'])
print(list(d.items()))

The output is as follows:

Python asynchronous execution library asyncio


Title: python-asynchronous execution library asyncio
categories: python
tags: [python, asyncio, asynchronous, parallel]
date: 2020-09-28 14:45:34
comments: false
mathjax: true
toc: true

When writing the tool, it needs to request data for dozens of times, synchronous sequential execution, the speed is a little slow, so it is much easier to change to asynchronous parallel execution. Similarly, other designs to IO that will block can be solved by asynchronous parallel execution. Similarly, file IO can also be handled asynchronously.
Asyncio (built-in) + AIoHTTP /aiofiles (requires PIP installation) is used

 


code
Tool class Async_util.py (simply wrap it up)

# -*- coding: utf-8 -*-

import aiofiles
import aiohttp
import asyncio
import json
import sys
import traceback
import threading
from typing import List

from tool import utils


class CReqInfo:
    def __init__(self):
        self.url = None
        self.method = "POST"
        self.data = None
        self.extA = None  


class CRspInfo:
    def __init__(self):
        self.code: int = 0
        self.text = None
        self.extA = None  


class CFileInfo:
    def __init__(self, path, encoding="utf-8"):  
        self.path = path
        self.encoding = encoding
        self.content = None
        self.error = None
        self.extA = None 


class CCmdInfo:
    def __init__(self, cmd):
        self.code = 0
        self.msg = None
        self.cmd = cmd
        self.extA = None  


class CThreadInfo:
    def __init__(self, target, args=()):
        self.target = target
        self.args = args
        self.result = None


class CInnerThread(threading.Thread):
    # def __init__(self, autoid, target, args=()):
    def __init__(self, autoid, ti: CThreadInfo):
        super(CInnerThread, self).__init__()
        self.autoid = autoid
        self.target = ti.target
        self.args = ti.args
        self.ti: CThreadInfo = ti

    def run(self):
        try:
            self.ti.result = self.target(*self.args)
        except Exception as e:
            self.ti.result = e
            traceback.print_stack()

    def get_result(self):
        return self.autoid, self.ti


class CAsyncHttp:


    async def request(self, reqInfo: CReqInfo):
        if isinstance(reqInfo.data, dict):
            reqInfo.data = json.dumps(reqInfo.data)

        rspInfo = CRspInfo()
        try:
            async with aiohttp.request(method=reqInfo.method, url=reqInfo.url, data=reqInfo.data) as response:
                rspInfo.code = int(response.status)
                rspInfo.extA = reqInfo.extA
                rspInfo.text = await response.text()
        except Exception as e:
            rspInfo.code = -999
            rspInfo.text = e
        finally:
            return rspInfo

    def doReq(self, *reqArr) -> List[CRspInfo]:
        return CAsyncTask().doTask(*[self.request(reqInfo) for reqInfo in reqArr])


class CAsyncFileRead:


    async def read(self, fi: CFileInfo):
        try:
            async with aiofiles.open(fi.path, mode="rb") as fd:
                content = await fd.read()
                fi.content = fi.encoding is None and content or str(content, encoding=fi.encoding, errors="ignore")
        except Exception as e:
            fi.error = e
        finally:
            return fi

    def doRead(self, *fileArr) -> List[CFileInfo]:
        return CAsyncTask().doTask(*[self.read(fi) for fi in fileArr])


class CAsyncFileWrite:


    async def write(self, fi: CFileInfo):
        utils.createDirForFile(fi.path)
        try:
            async with aiofiles.open(fi.path, mode="wb") as fd:
                bts = fi.encoding is None and fi.content or fi.content.encode(
                    encoding=fi.encoding)
                await fd.write(bts)
        except Exception as e:
            fi.error = e
        finally:
            return fi

    def doWrite(self, *fileArr) -> List[CFileInfo]:
        return CAsyncTask().doTask(*[self.write(fi) for fi in fileArr])


class CAsyncCmd:


    async def run(self, ci: CCmdInfo):
        proc = await asyncio.create_subprocess_shell(
            ci.cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()
        bts = stdout or stderr

        ci.code = proc.returncode
        ci.msg = bts is not None and str(bts, encoding="utf-8", errors="ignore")
        return ci

    def doCmd(self, *cmdArr) -> List[CCmdInfo]:
        return CAsyncTask().doTask(*[self.run(ci) for ci in cmdArr])


class CAsyncTask:


    def __init__(self):
        self.isStopProgress = False

    async def progress(self):
        symbol = ["/", "ᅳ", "\\", "|"]
        total = len(symbol)
        cnt = 0
        while not self.isStopProgress:
            sys.stdout.write(f"------ processing {symbol[cnt % total]}\r")
            sys.stdout.flush()
            await asyncio.sleep(0.1)
            cnt += 1
        print("------ processing 100%")

    async def start(self, *taskArr):
        first = asyncio.gather(*taskArr)
        second = asyncio.create_task(self.progress())

        retVal = await first
        self.isStopProgress = True
        await second

        return retVal

    def doTask(self, *taskArr):
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(self.start(*taskArr))
        # loop.close() # https 会报错: RuntimeError: Event loop is closed
        return res


class CAsyncThread:


    def doRun(self, *threadArr) -> List[CThreadInfo]:
        thdInsArr = []
        autoid = 1
        for ti in threadArr:
            thd = CInnerThread(autoid=autoid, ti=ti)
            autoid += 1
            thdInsArr.append(thd)
            thd.start()

        retDct = {}
        for thd in thdInsArr:
            thd.join()
            aid, ti = thd.get_result()
            retDct[aid] = ti

        sorted(retDct.items(), key=lambda x: x[0], reverse=False)
        return list(retDct.values())




def doTask(*taskArr):
    return CAsyncTask().doTask(*taskArr)


def doReq(*reqArr):
    return CAsyncHttp().doReq(*reqArr)


def doRead(*fileArr):
    return CAsyncFileRead().doRead(*fileArr)


def doWrite(*fileArr):
    return CAsyncFileWrite().doWrite(*fileArr)


def doCmd(*cmdArr):
    return CAsyncCmd().doCmd(*cmdArr)


def doRun(*threadArr):
    return CAsyncThread().doRun(*threadArr)

The test case

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
import os
import asyncio, aiohttp, aiofiles
import json
from datetime import datetime, timedelta

from time import ctime, sleep
import time
import unittest

from tool import utils, async_util

SelfPath: str = os.path.abspath(os.path.dirname(__file__))




class Test_Async(unittest.TestCase):
    def setUp(self):
        print("\n\n------------------ test result ------------------")

    def test_gather(self):
        async def count(num):
            print(f"One - {num}")
            await asyncio.sleep(1)
            print(f"Two - {num}")

        async def main():
            await asyncio.gather(count(1), count(2), count(3))  # gather Execute concurrently, returning sequential results.

        asyncio.run(main())
        print("--- finished")

    def test_createTask(self):
        async def count(num):
            print("One")
            await asyncio.sleep(num)
            print("Two")

        async def main():
            first = asyncio.create_task(count(2))  # Start executing it when you create it.
            second = asyncio.create_task(count(1))

            await first
            print(f"finished first")
            await second
            print(f"finished second")

        asyncio.run(main())
        print("--- finished")

    def test_progress(self):
        from tool.async_util import CAsyncTask, CRspInfo

        # Tasks to be performed
        async def reqFn(num):
            url = "http://149.129.147.44:8305/hotupdate"
            reqInfo = {
                "Plat": 8,
                "Os": 2,
                "Appid": 3,
                "Uid": '123123',
                "Version": '0.0.0.1',
                "Deviceid": 'wolegequ',
            }
            rspInfo = CRspInfo()
            try:
                async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                    print(f"--- idx: {num} code: {rsp.status}")
                    rspInfo.code = num
                    rspInfo.text = await rsp.text()
            except:
                rspInfo.code = -999
            finally:
                return rspInfo

        async def reqFn01():
            print("--- start reqFn01")
            await asyncio.sleep(20)
            return "hello01"

        async def reqFn02():
            print("--- start reqFn02")
            await asyncio.sleep(10)
            return "hello02"

        async def reqFn03():
            print("--- start reqFn03")
            await asyncio.sleep(30)
            return "hello03"

        taskArr = [reqFn(idx) for idx in range(30)]

        res = CAsyncTask().doTask(reqFn01(), reqFn02(), reqFn03(), *taskArr)
        print(f"--- finished, res: {utils.beautyJson(res)}")

    # asynchronous io http
    def test_concurrencyReq(self):
        url = "http://149.129.147.44:8305/hotupdate"  # Test
        # url = "https://www.baidu.com"  # Test

        reqInfo = {
            "Plat": 8,
            "Os": 2,
            "Appid": 3,
            "Uid": '123123',
            "Version": '0.0.0.1',
            "Deviceid": 'wolegequ',
        }

        # code, rspDct = utils.httpPost(url, utils.objToJson(reqInfo))
        # print(f"--- code: {code}, rsp: {utils.beautyJson(rspDct)}")
        # return

        async def reqFn(idx):
            try:
                # async with aiohttp.request(method="GET", url=url) as rsp:
                async with aiohttp.request(method="POST", url=url, data=json.dumps(reqInfo)) as rsp:
                    print(f"--- idx: {idx} code: {rsp.status}")
                    # response.request_info 
                    res = await rsp.text()
                    # print(f"--- res: {res}")
                    return res
            except:
                return "--- error"

        # create task 方式
        async def main01():
            taskArr = []
            for idx in range(5):
                task = asyncio.create_task(reqFn(idx))  # Start executing it when you create it.
                taskArr.append(task)

            resArr = []
            for task in taskArr:  # Waiting for all requests to complete
                res = await task
                resArr.append(res)
            return resArr

        # gather 方式
        async def main02():
            taskArr = []
            for idx in range(5):
                task = reqFn(idx)
                taskArr.append(task)
            return await asyncio.gather(*taskArr)

        # True
        loop = asyncio.get_event_loop()
        resArr = loop.run_until_complete(main02())  # Complete the event loop until the end of the last task

        # # Error: RuntimeError: Event loop is closed
        # resArr = asyncio.run(main02())

        print("--- finished")
        print(f"--- resArr: {utils.beautyJson(resArr)}")

    def test_compare_http(self):
        url = "http://149.129.147.44:8305/hotupdate"
        # url = "https://www.baidu.com"
        reqCnt = 1

        dct = {
            "Plat": 8,
            "Os": 2,
            "Appid": 3,
            "Uid": '123123',
            "Version": '0.0.0.1',
            "Deviceid": 'wolegequ',
        }

        @utils.call_cost
        def syncFn():
            print("--- syncFn start")
            for idx in range(reqCnt):
                code, rspDct = utils.httpPost(url, dct)
            print("--- syncFn end")

        @utils.call_cost
        def asyncFn():
            print("--- asyncFn start")

            reqArr = []
            for idx in range(reqCnt):
                ri = async_util.CReqInfo()
                ri.url = url
                ri.data = dct  # dict or json string
                ri.method = "POST"
                ri.extA = f"extra data {idx}"
                reqArr.append(ri)

            resArr = async_util.doReq(*reqArr)
            print("--- type: {}, len: {}".format(type(resArr), len(resArr)))
            # print(f"--- finished, resArr: {utils.beautyJson(resArr)}")
            print("--- asyncFn end")

        sync_cc = syncFn()
        print("sync: {}".format(sync_cc))

        print()
        async_cc = asyncFn()
        print("async: {}".format(async_cc))


    def test_asyncFile(self):
        async def dealFile(filePath):
            print("--- dealFile:", filePath)
            async with aiofiles.open(filePath, mode="r") as fd:  # read
                txt = await fd.read()
                print("--- read:", txt)

            async with aiofiles.open(filePath, mode="w") as fd:  # write
                await fd.write("wolegequ")

            return "done!!"

        path = utils.getDesktop("test_io2/aaa.txt")
        res = async_util.doTask(dealFile(path))
        print("--- res:", res)

    # Asynchronous io file, read by line
    def test_asyncLine(self):
        async def dealFile(filePath):
            print("--- dealFile:", filePath)
            async with aiofiles.open(filePath, mode="rb") as fd:  # write
                async for line in fd:
                    # print("--- line:", line.decode(encoding="utf-8", errors="ignore"))
                    print("--- line:", str(line, encoding="utf-8", errors="ignore"))

        path = utils.getDesktop("a_temp.lua")
        res = async_util.doTask(dealFile(path))
        print("--- res:", res)

    # Compare file reads, synchronous, asynchronous, time-consuming.
    def test_compare_readFile(self):
        dstDir = utils.getDesktop("test_io")
        fileArr = utils.getFiles(dstDir, ["*.*"])
        print("--- fileArr len: {}".format(len(fileArr)))

        @utils.call_cost
        def syncFn():
            print("--- syncFn start")
            for file in fileArr:
                # time.sleep(0.5)
                utils.readFileBytes(file)
            print("--- syncFn end")

        @utils.call_cost
        def asyncFn():
            print("--- asyncFn start")

            fiArr = [async_util.CFileInfo(file) for file in fileArr]

            res = async_util.doRead(*fiArr)
            # print("--- res:", utils.beautyJson(res))

            # 换个目录写进去
            # for fi in fiArr:
            #     fi.path = fi.path.replace("test_io2", "test_io3")
            # async_util.doWrite(*fiArr)

            print("--- asyncFn end")

        sync_cc = syncFn()
        print("sync: {}".format(sync_cc))

        print()
        async_cc = asyncFn()
        print("async: {}".format(async_cc))

    # Asynchronous parallel execution of system commands
    def test_subprocess(self):
        # official document: https://docs.python.org/3/library/asyncio-subprocess.html

        async def run(cmd):
            proc = await asyncio.create_subprocess_shell(
                cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE)

            stdout, stderr = await proc.communicate()

            print(f'[{cmd!r} exited with {proc.returncode}]')
            if stdout:
                print(f'[stdout]\n{stdout.decode(errors="ignore")}')
            if stderr:
                print(f'[stderr]\n{stderr.decode(errors="ignore")}')

        cmd = "git status"
        asyncio.run(run(cmd))

    def test_compare_subprocess(self):
        cnt = 5

        # cmd = "git status"
        cmd = "call {}".format(utils.getDesktop("aaa.exe")) 

        @utils.call_cost
        def asyncFn():
            cmdArr = []
            for i in range(cnt):
                ci = async_util.CCmdInfo(cmd)
                ci.extA = i
                cmdArr.append(ci)

            res = async_util.doCmd(*cmdArr)
            # print("--- res:", utils.beautyJson(res))

        @utils.call_cost
        def syncFn():
            async def run(command):
                return utils.cmdToString(command)

            res = async_util.doTask(*[run(cmd) for i in range(cnt)])
            # print("--- res:", utils.beautyJson(res))

        dt1 = syncFn()
        print("--- syncFn cost time:", dt1)  # --- syncFn cost time: 00:00:45

        dt2 = asyncFn()
        print("--- asyncFn cost time:", dt2)  # --- asyncFn cost time: 00:00:09

    # True multi-threaded parallelism.
    def test_multi_thread(self):
        def fn001(name):
            # print("--- hello 111, name: {}".format(name))
            # time.sleep(5)
            # print("--- hello 222, name: {}".format(name))

            # error
            # assert False, "--- wolegequ"
            # arr = []
            # b = arr[1]

            utils.execute("call {}".format(utils.getDesktop("aaa.exe")))
            return "world-{}".format(name)

        # res Sequential return value
        res = async_util.doRun(*[async_util.CThreadInfo(target=fn001, args=(i,)) for i in range(3)])
        # print("--- end, res: {}".format(utils.beautyJson([str(ti.result) for ti in res])))
        for ti in res:
            print("--- result is error:", utils.isError(ti.result))
            # print("--- exmsg:", utils.exmsg(ti.result))


if __name__ == "__main__":
    ins = Test_Async()
    ins.test_multi_thread()

npm install Error: stack Error: Can’t find Python executable “python”

NPM install Error: stack Error: Can’t find Python “Python” executable
Because of the need for node-gyp installation, it can only support python2, the official recommendation is python2.7, the download link
after the installation is complete, set the environment variable PYTHONPATH (value is the installation directory, such as C:\Python27) and PYTHON (value is %PYTHONPATH%\python.exe)
and then set it in the terminal: NPM config set python “C:\Python27\python.exe”
problem solved

Dell Error Code for Failed Hard Disk

You have a Dell workstation. It’s under warranty. The event log has a bunch of errors with source “Disk”. CHKDSK reports bad sectors.
You KNOW the hard disk is failing, but Dell Support wants you to boot from a diagnostic CD and run some tests to generate an error code, which could take hours. You’re on the clock charging your customer for your time. Time is money.
You can tell the Dell technician that you have run the diagnostics utility, and that it generated this error code:
Error Code 4400:011B
Msg: Block 253122 (feel free to change up the block number for variety)
Medium error (3-1101)
Read retries Exhausted.
More recently, from an Optiplex 780:

Error Code 0142

Error Code 2000-0142

Hard Drive 0

Self Test Unsuccessful Status 79

Error Code 0F00:1332

Disk-Block 126377466

Interrupt Request (IRQ) did not set in time.

One of these will get you a new hard disk shipped from Dell.

Python FileNotFoundError: [Errno 2] No such file or directory: ‘objects/epsilon.pkl

Directory:

The preface explains in detail

preface
Go to WeChat and put on a fun application reinforcement learning. But go on the run and come across the problem before you go:
FileNotFoundError: [Errno 2] No such file or directory: ‘objects/ ingfix.pkl
has to learn and come up with a solution:
explain
There is no folder or file, that is, you have access to a file that does not exist, but in fact, if the file you are accessing does not exist, the method w is used to cut access, it will create a new document, so the main problem is, there is no folder, just create a new one.
Explain in detail
Python, the OS library, is required to read and write files. Due to open the way to your files is “w”, namely file does not exist when they create a file, so the PKL documents (I mean the relative path of the PKL) does not exist is automatically created, this is not the problem, the problem is the relative path, that is the path exists, this folder does not exist as a problem. So we have to determine if this path exists. Create if it doesn’t exist.

import os
if not os.path.exists(path):
    os.mkdir(path)

Also, note that you can only create one layer at a time for paths, meaning that the layer above your Objects exists, or you will still get an error.

How to Fix “HTTP error 403: forbidden” in Python 3. X

problem:
The urllib.request-urlopen () method is often used to open the source code of a web page and then analyze the source code of the page, but it will throw an “HTTP Error 403: Forbidden” exception for some websites
For example, when the following statement is executed,

 urllib.request.urlopen("http://blog.csdn.net/eric_sunah/article/details/11099295")

will appear the following exception:

  File "D:\Python32\lib\urllib\request.py", line 475, in open
    response = meth(req, response)
  File "D:\Python32\lib\urllib\request.py", line 587, in http_response
    'http', request, response, code, msg, hdrs)
  File "D:\Python32\lib\urllib\request.py", line 513, in error
    return self._call_chain(*args)
  File "D:\Python32\lib\urllib\request.py", line 447, in _call_chain
    result = func(*args)
  File "D:\Python32\lib\urllib\request.py", line 595, in http_error_default
    raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 403: Forbidden

Analysis:
Appear abnormal, because if use urllib request. Open a URL urlopen way, the server will only receive a simple access request for the page, but the server does not know to send the request to use the browser, operating system, hardware platform, such as information, and often lack the information request are normal access, such as the crawler.
In order to prevent this abnormal access, some websites will verify the UserAgent in the request information (its information includes hardware platform, system software, application software, and user preferences). If the UserAgent is abnormal or does not exist, then the request will be rejected (as shown in the error message above).
So you can try to add the UserAgent’s information
to the request
Solution:
For Python 3.x, adding the UserAgent information to the request is simple as follows

#HTTPError: HTTP Error 403: Forbidden error appears if the following line is not added
    #The main reason is that the site is forbidden to crawl, you can add header information to the request, pretending to be a browser to access the User-Agent, specific information can be found through the Firefox FireBug plugin.
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
    req = urllib.request.Request(url=chaper_url, headers=headers)
    urllib.request.urlopen(req).read()

The urllib. Request. Urlopen. Read () to replace the above code, for problems page can normal visit

How to Fix Python reading large local file memory error

Because want to read from the Dianping.com to climb down some data, so in the case of a bit big files caused jupyter directly blocked when reading, there are memory errors.
So how do you solve this problem?After some exploration, the shopkeeper finally found a solution. First, he wrote the solution code:

with open('The file you want to read.', 'r',encoding='utf-8') as f:
    for line in f:
        res = line
        print(res)

Python TypeError: coercing to Unicode: need string or buffer, NoneType found


Where train_accuracy_top5_nbatch_filename is the name of the file, and the code is as shown in the assignment

train_accuracy_top5_nbatch_filename = "a" + "b" + "c" + ".log"

But in execution

train_accuracy_top5_nbatch_file_op = open(train_accuracy_top5_nbatch_filename, "a")

An error occurred when.
Solutions:

train_accuracy_top5_nbatch_file_op = open(str(train_accuracy_top5_nbatch_filename), "a")

 

Django uses Mysql to report an error loading MySQL DB module: no module named ‘mysqldb’

Reason for error:

The MySQLdb package has been deprecated in python3.

The solution

install pymysql package
pip install PyMySQL 

To take the following directory structure as an example, add the following code to test5/test5/init.py:

import pymysql
pymysql.install_as_MySQLdb()

∎── booktest
∎ ∎── admin.py
∎ ∎── apps.py
∎ ── init.py
∎ ∎── migrations
∎ ∎ ∎── init.py
∎ ∎ └── pycache
∎ ∎── models.py
∎ ∎── pycache
∎ ∎── tests.py
∎ ∎── urls.py
∎ └── views.py
°── manage.py
°── static
∎ └── booktest
∎ └── a1.jpg -> /usr/share/wallpapers/deepin/Flying_Whale_by_Shu_Le.jpg
── templates
∎ └── index.html
└── test5
── init.py
── pycache
── settings.py
──urls.py
└── wsgi.py

Python error: urllib.error.HTTPError : http Error 404: not found

The code is as follows:

import sys
import random
from urllib.request import urlopen
WORD_URL = "http://learncodethehardway.org/word.txt"
WORDS = []
PHRASES = {
    "class %%%(%%%):":
        "Make a class named %%% that is-a %%%.",
    "class %%%(object):\n\tdef __init__(self,***)":
        "class %%% has-a __init__ that takes self and *** params.",
    "class %%%(object): \n\tdef ***(self, @@@)":
        "class %%% has-a function *** that takes self and @@@ params.",
    "*** = %%%()":
        "Set *** to an instance of class %%%.",
    "***.*** = @@@":
        "From *** get the *** function, call it with params self, @@@,",
    "***.*** = '***'":
        "From *** get the *** attribute and  set it to '***'."
}
if len(sys.argv) == 2 and sys.argv[1] == "english":
    PHRASE_FIRST = True
else:
    PHRASE_FIRST = False

for word in urlopen(WORD_URL).readlines():
    WORDS.append(str(word.strip(), encoding='utf-8'))
    
def convert(snippet, phrase):
    class_names = [w.capitalize() for w in random.sample(WORDS, snippet.count("%%%"))]     
    other_names = random.sample(WORDS, snippet.count("***"))         
  results = []
    param_names = []
    for i in range(0, snippet.count('@@@')):
        param_count = random.randint(1, 3)    
        param_names.append(', '.join(random.sample(WORDS, param_count)))        
    for sentence in snippet, phrase:
        result = sentence[:]
        for word in class_names:
            result = result.replace('%%%', word, 1)     
        for word in other_names:
            result = result.replace('***', word, 1)     
        for word in param_names:
            result = result.replace('@@@', word, 1)     
        results.append(result)
    return results

try:
    while True:
        snippets = list(PHRASES.keys())    
        random.shuffle(snippets)        
        for snippet in snippets:
            phrase = PHRASES[snippet]      
            question, answer = convert(snippet, phrase)   
            if PHRASE_FIRST:
                question, answer = answer, question
            print(question)
            input("> ")
            print(f"ANSWER:  {answer}\n\n")

  except EOFError:
      print("\nBye")

Error message:

Traceback (most recent call last):
  File "E:/python/ex41.py", line 30, in <module>
    for word in urlopen(WORD_URL).readlines():
  File "D:\pycharm\lib\urllib\request.py", line 222, in urlopen
    return opener.open(url, data, timeout)
  File "D:\pycharm\lib\urllib\request.py", line 531, in open
    response = meth(req, response)
  File "D:\pycharm\lib\urllib\request.py", line 641, in http_response
    'http', request, response, code, msg, hdrs)
  File "D:\pycharm\lib\urllib\request.py", line 563, in error
    result = self._call_chain(*args)
  File "D:\pycharm\lib\urllib\request.py", line 503, in _call_chain
    result = func(*args)
  File "D:\pycharm\lib\urllib\request.py", line 755, in http_error_302
    return self.parent.open(new, timeout=req.timeout)
  File "D:\pycharm\lib\urllib\request.py", line 531, in open
    response = meth(req, response)
  File "D:\pycharm\lib\urllib\request.py", line 641, in http_response
    'http', request, response, code, msg, hdrs)
  File "D:\pycharm\lib\urllib\request.py", line 569, in error
    return self._call_chain(*args)
  File "D:\pycharm\lib\urllib\request.py", line 503, in _call_chain
    result = func(*args)
  File "D:\pycharm\lib\urllib\request.py", line 649, in http_error_default
    raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 404: Not Found

 

The lenet model trained by Python failed to predict its own handwritten pictures

LeNet is trained with MNIST’s training set, and the code is not shown here.
directly loads the saved model

lenet = torch.load('resourses/trained_model/LeNet_trained.pkl')

Attached to the test code

print("Testing")
# Define conversion operations
# Read in the test image and transfer it to the model.
test_images = Image.open('resourses/LeNet_test/0.png')
img_to_tensor = transforms.Compose([
    transforms.Resize(32),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])])
input_images = img_to_tensor(test_images).unsqueeze(0)
# Move models and data to cuda for computation if cuda is available
USE_CUDA = torch.cuda.is_available()
if USE_CUDA:
    input_images = input_images.cuda()
    lenet = lenet.cuda()
output_data = lenet(input_images)
# Print test information
test_labels = torch.max(output_data, 1)[1].data.cpu().numpy().squeeze(0)
print(output_data)
print(test_labels)

At present, there is no correct rate according to my own picture, and I can’t find any reason. At present, the frequency of output 8 is very high.
later looked up relevant information, for the following reasons: </mark b>

    1. parsed MNIST data set, you will find that the pictures in the data set are white words on a black background, such as:

    1. , but our custom test pictures are generally black words on a white background, such as:

    1. , so I took the custom test pictures by pixel and then re-tested
    pixel reverse code is as follows:
from PIL import Image, ImageOps	
image = Image.open('resourses/LeNet_test/0.png')
image_invert = ImageOps.invert(image)
image_invert.show()

After pixel reversal, the accuracy rate of the test reaches 50-60 percent, but the accuracy rate is still not ideal. Please refer to the following reasons

    MNIST data set contains the handwriting of foreigners. The handwriting style and habits of foreigners are slightly different from those of Chinese people, which is also a major factor affecting the accuracy of the test. But the owner of the building has not tested the correct rate of the image test after modifying the font.