目录

前言

对于这种简单的电子书式网站采集,文章数不多,串行完全也足够快,没有必要单独去考虑多线程的实现方式。

但我们主要是借这个例子,梳理下,如何将单线程串行的多IO的业务,转换成采用多线程的方式,将多个IO分解异步进行,提高IO处理能力(这里的业务逻辑不是CPU密集计算型),也就是目的:

  • 了解多线程解决IO异步问题,提高效率
  • 了解多线程与单线程编码差异
  • 了解计算密集型任务与IO密集型任务
  • 采集markdown格式化毛选文章,以备自己阅读

两种思路

  • 单线程阻塞

    • 采集网站目录页面所有文章地址
    • 遍历每一个文章地址,串行采集、markdown格式化、写入本地文件,组织md文章
    • 程序结束退出
  • 多线程非阻塞

    • 采集网站目录页面获取所有文章地址
    • 多线程采集与markdown格式化页面内容,写入内容队列
    • 多线程读取内容队列,并写入本地文件,组织成md文章
    • 采集线程与写文件线程退出,主线程退出

单线程采集毛选

单线程阻塞的编码方式很自然简单,处理完一个所有环节后,再处理下一个,逻辑详见入口方法run:

# coding:utf-8
## 从https://weiyinfu.cn/MaoZeDongAnthology/目录.html 获取毛泽东选集所有文章
import requests
from bs4 import BeautifulSoup
import os

class GetBooksMarkdown:
    __desDir = "I:\\9ong\\毛泽东选集\\" #目的文件目录
    __content = "" # 文章内容
    __urlList = {}

    def __init__(self):
        print("get books ...\n")

    # 获取目录页面是所有文章的标题与url地址,组织成字典__urlList
    def __getUrlList(self):
        page = requests.get("https://weiyinfu.cn/MaoZeDongAnthology/")
        soupObj = BeautifulSoup(page.content,"html.parser")
        bookItems = soupObj.select("li.chapter-item")
        urls = {}
        for item in bookItems:
            title = item.select("a")[0].attrs["href"]
            url = 'https://weiyinfu.cn/MaoZeDongAnthology/' + title
            title = title[:-5]
            if len(title) > 0 and len(url) > 0:
                urls[title] = url
        
        self.__urlList = urls    
        return self.__urlList
        
    def __getDesDir(self):
        return self.__desDir
        
    # 采集指定url地址及区域,并转换成markdown格式,返回内容        
    def __toMarkdown(self,__pageUrl):
        url = "http://www.9ong.com/xxx/api" # 演示demo
        postdata = {
            "url":__pageUrl,
            "selector":"main",
            "strip_tags":True,
            "remove_nodes":"script",
            "save_file":"false",            
        }
        response = requests.post(url,data=postdata)
        return response
    # 以标题作为文件名,将md内容写入文件
    def __writePage(self,desFilePath):
        desDirPath = os.path.dirname(desFilePath)
        # print("##Final Path:"+desFilePath)
        # return 
        if not os.path.exists(desDirPath):
            os.makedirs(desDirPath)
        with open(desFilePath,"w",encoding="utf-8") as nf:
            nf.write(self.__content)

        if os.path.exists(desFilePath):
            print("写入文章成功:",desFilePath,"\n")
        else:
            print("!!!写入新文件失败\n")     
    # 入口                   
    def run(self):
        # 遍历urls
        urls = self.__getUrlList()
        for title in urls:
            print("开始处理文章:",title,"\n")

            # 请求转换成markdown
            print("请求转换markdown...\n")
            res = self.__toMarkdown(urls[title])
            # print(res.text)

            # 写入文件
            print("文章准备写入文件...","\n")
            desFilePath = self.__getDesDir()+title+".md"
            self.__content = res.text           
            self.__writePage(desFilePath)
            
            # 循环下一个url
            self.__content = ""
            # return False # 测试一次
        return True

if __name__ == '__main__':
    gbm = GetBooksMarkdown()
    gbm.run()    


单线程阻塞的方式简单易理解,对于采集的数据不多的情况下,可快速无多思考的实现,时间效率(执行效率)换空间效率(编码设计效率)。

多线程采集毛选

具体逻辑详见run方法,实现两个线程池,一个处理采集IO任务,一个处理写文件IO任务,通过队列的方式,将两个任务分离异步处理,相当于IO并行,理论上效率至少会比单线程提升一倍(由于python有个全局解释器锁GIL,最终还是单线程运行的,所以多线程并不会提高cpu的利用率):

# coding:utf-8
## 从https://weiyinfu.cn/MaoZeDongAnthology/目录.html 获取毛泽东选集所有文章

import queue
import requests,os,time
from bs4 import BeautifulSoup
from queue import Queue
import threading

class GetBooksMarkdown:
    __desDir = "I:\\9ong\\毛泽东选集\\" #目的文件目录    
    __urlList = {}  # 所有文章标题与url地址 字典
    __urlQueue = Queue()   #type: Queue # 文章标题与地址队列,用于采集
    __contentQueue = Queue() #type: Queue # 采集处理后的内容队列,生产于采集线程,用于写文件线程
    __stopWriteThreadFlag = False # 用于通知写文件线程退出
    

    def __init__(self):
        print("get books ...\n")    
        

    # 获取所有文章标题与url地址 字典
    def __getUrlList(self):
        page = requests.get("https://weiyinfu.cn/MaoZeDongAnthology/")
        soupObj = BeautifulSoup(page.content,"html.parser")
        bookItems = soupObj.select("li.chapter-item")
        urls = {}
        for item in bookItems:
            title = item.select("a")[0].attrs["href"]
            url = 'https://weiyinfu.cn/MaoZeDongAnthology/' + title
            title = title[:-5]
            if len(title) > 0 and len(url) > 0:
                urls[title] = url
        
        self.__urlList = urls    
        return self.__urlList

    # 获取文章写入文件目录
    def __getDesDir(self):
        return self.__desDir
        
    # 远程文章获取并转换成markdown格式
    def __toMarkdownThread(self):
        try:
            print('当前工作的线程为:',threading.currentThread().name,"\n")
            while True:
                if self.__urlQueue.empty():
                    print('当前工作的线程为:',threading.currentThread().name," __urlQueue队列已空\n")
                    break
                else:                
                    serverUrl = "http://www.9ong.com/xxx/api" # 演示demo
                    pageUrlItem = self.__urlQueue.get() #type: map # {"title":"","url":""}
                    print('当前工作的线程为:',threading.currentThread().name," 正在采集与md转换:",pageUrlItem["title"],"\n")
                    postdata = {
                        "url":pageUrlItem["url"],
                        "selector":"main",
                        "strip_tags":True,
                        "remove_nodes":"script",
                        "save_file":"false",
                    }
                    time.sleep(1)
                    response = requests.post(serverUrl,data=postdata)
                    if response.ok:                        
                        self.__contentQueue.put({"title":pageUrlItem["title"],"content":response.text})
        except Exception as ex:
            print("toMarkdownThread采集异常:",ex)
        # finally:            
        #     return True
        # return response
    
    # md格式文章写入本地文件
    def __writePageThread(self):
        while not self.__stopWriteThreadFlag:
            try:
                #if not self.__contentQueue.empty():
                    item = self.__contentQueue.get(False)
                    if not item:
                        pass
                    else:
                        desFilePath = self.__getDesDir() + item["title"] + ".md"
                        desDirPath = os.path.dirname(desFilePath)
                        print('当前工作的线程为:',threading.current_thread().name," 正在写入文件:",desFilePath,"\n")
                        # print("##Final Path:"+desFilePath)
                        # return 
                        
                        if not os.path.exists(desDirPath):
                            os.makedirs(desDirPath)
                        with open(desFilePath,"w",encoding="utf-8") as nf:
                            nf.write(item["content"])

                        if os.path.exists(desFilePath):
                            print("写入文章成功:",desFilePath,"\n")
                        else:
                            print("!!!写入新文件失败\n")                                
            except Exception as ex:                         
                pass
                # print("writePageThread异常:",ex)


    def run(self):

        # 遍历urls
        print("获取所有文章标题与地址...\n")
        urls = self.__getUrlList()
        urlLen = len(urls) # 2
        print("共获取",urlLen,"篇文章地址信息...\n")
        # print(urls)
        # return False

        # 构造文章url队列
        print("构造文章地址信息队列...\n")
        self.__urlQueue = Queue(urlLen)
        count = 0
        for title in urls:
            # if count == 4:
            #     break                   
            self.__urlQueue.put({"title":title,"url":urls[title]})
            count += 1

        # 初始化采集thread
        print("初始化采集与md转换线程池...\n")
        markdownThreads = []
        markdownThreadIdList = ["md1","md2","md3","md4","md5"]

        for tId in markdownThreadIdList:
            mdThread = threading.Thread(target=self.__toMarkdownThread,name=tId)
            mdThread.start()
            markdownThreads.append(mdThread)

        # 初始化写入文件thread
        print("初始化文章md内容写入文件线程池...\n")
        writeThreads = []
        writeThreadIdList = ["write1","write2","write3"]

        for tId in writeThreadIdList:
            mdThread = threading.Thread(target=self.__writePageThread,name=tId)
            mdThread.start()
            writeThreads.append(mdThread)

        # 等待队列,先进行采集与md转换
        print("等待文章地址信息写入队列__urlQueue...\n")
        while not self.__urlQueue.empty():
            pass    #不为空,则继续阻塞

        for t in markdownThreads:
            # print(t)
            t.join()

        # 等待队列,写入文件
        print("等待文章写入文章内容队列__contentQueue...\n")
        while not self.__contentQueue.empty():
            pass
        
        # contentQueue空时,通知所有write线程退出
        print("通知写入文件线程退出...\n")
        self.__stopWriteThreadFlag = True

        for t in writeThreads:
            t.join()

        print("主线程结束.\n")


if __name__ == '__main__':
    gbm = GetBooksMarkdown()
    gbm.run()    



python多线程采集2

两种思路小结

  • 执行效率

    单线程串行200+篇文章的采集、markdown、写文件,无sleep的实现,从第一篇文章22:47完成,到最后一篇文章22:54完成,共用时7~8分钟(480秒)。

    多线程同样内容的采集(5个线程)、markdown、写文件(3个线程),每次采集sleep(1),从第一篇文章10:40完成,到最后一篇文章10:45完成,共用时5~6分钟(360秒),扣除sleep的230秒,实际执行用时 360 - 230 = 130秒左右,比单线程串行效率提高了3~4倍。

  • 编码效率

    单线程串行不到100行能完成采集、写入(注:markdown由远端接口完成)

    多线程虽然看似只要不到200行,但更多精力会耗在设计上,毕竟解决方案思想需要人去深思,特别是追求高效、简单、快速,而这三个往往是不能同时存在的,高效、简单编码,就需要思考复杂方案,简单、快速编码,大概率不高效,快速高效编码,肯定不简单。

python多线程是真多线程吗

Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

参考:多线程 - 廖雪峰的官方网站

python多线程适合IO密集型任务

计算密集型任务,主要是进行大量的计算,消耗cpu资源,比如圆周率计算、视频高清解码、图片处理、机器学习等,全靠CPU(GPU)的运算能力,最典型的就是深度学习,cpu负责调度,gpu就负责计算(任务单一,高效率),因此对于这些消耗cpu的任务,数据结构、算法、代码效率异常重要。

除了计算密集型的任务外,我们经常看到的更多的是IO密集型任务,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言不一定是最好的。

参考


9ong.com - tsingchan