加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

day35:线程队列&进程池和线程池&回调函数&协程

发布时间:2020-12-20 09:56:07 所属栏目:Python 来源:网络整理
导读:目录 1.线程队列 2.进程池和线程池 3.回调函数 4.协程:线程的具体实现 5.利用协程爬取数据 附:并发编程思维导图 线程队列 1.线程队列的基本方法 put 存 get 取 put_nowait 存,超出了队列长度,报错 get_nowait 取,没数据的时候,直接报错 [linux windows] 线程

目录

1.线程队列

2.进程池和线程池

3.回调函数

4.协程:线程的具体实现

5.利用协程爬取数据

附:并发编程思维导图

线程队列

1.线程队列的基本方法

put

get

put_nowait 存,超出了队列长度,报错

get_nowait 取,没数据的时候,直接报错

[linux windows] 线程中 put_nowait/get_nowait 都支持(区别于进程队列)

2.Queue:先进先出,后进后出

# (1) Queue 
"""先进先出,后进后出"""
q = Queue()
q.put(1)
q.put(2)
print(q.get())
(q.get())

print(q.get())  取不出来,阻塞

print(q.get_nowait())  没有数据时,报错

 指定队列长度
q2 = Queue(3)
q2.put(100)
q2.put(101)
q2.put(102)

q2.put(103)  存放的数据超出了队列长度,阻塞
q2.put_nowait(104) 
 (2)LifoQueue 先进后出,后进先出(栈的特点)
from queue import LifoQueue
lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)

lq.put_nowait(44)  error

print(lq.get())  33
 22
 11

 阻塞

4.PriorityQueue:按照优先级顺序进行排序

 (3)PriorityQueue 按照优先级顺序进行排序(默认从小到大)
 PriorityQueue
pq = PriorityQueue()
 1.可以存放数字--->按照数字大小排序
pq.put(80)
pq.put(81)
pq.put(18)

 2.可以存放字符串 (按照ascii编码进行排序,依次返回)
pq.put("wangwen")
pq.put(wangzhihegelong 3.可以存放容器--->按照容器的元素排序,从第一个元素开始
pq.put(  (18,)  )
pq.put( (18,1)">maohonglei) )
pq.put( (18,1)">wangawei) )

 4.是否可以将不同类型的数据都放到一个队列中的呢? 不可以!
# error

pq.put(1)    
pq.put("abc")
"""

(pq.get())
print(pq.get())

进程池和线程池

1.ProcessPoolExecutor:进程池的基本使用

from concurrent.futures  ProcessPoolExecutor

def func(i):
    print(任务执行中 ... start,os.getpid())
    time.sleep(3)
    任务执行结束 ... end return i


if __name__ == __main__:
    lst = []
     (1) 创建进程池对象
    参数: 默认获取的是最大cpu逻辑核心数 8
    p = ProcessPoolExecutor(8)

     (2) 异步提交任务
    默认如果一个进程短时间内可以完成更多的任务,进程池就不会使用更多的进程来完成,以节省资源"""
    for i in range(10):
        res = p.submit(func,i)
        lst.append(res)

     (3) 获取当前进程任务中的返回值(result在获取任务的返回值时,有阻塞)
    in lst:
        (i.result())

     (4) 等待所有子进程执行结束之后,在继续执行主进程内容(shutdown)
    p.shutdown()   <=> join
    <=======>print(os.getpid())

1.创建进程池对象

2.异步提交任务

?3.获取当前进程任务中的返回值

?4.shutdown:等待所有子进程执行结束之后,在继续执行主进程内容

2.ThreadPoolExecutor:线程池的基本使用

 ThreadPoolExecutor

from threading  current_thread  as cthread


thread ... startthread ... end  cthread().ident


 []
    setvar = set()
     (1) 创建线程池对象
    参数: 默认并发的线程数 是 os.cpu_count() * 5 = 40
    tp = ThreadPoolExecutor()

    默认如果一个线程短时间内可以完成更多的任务,线程池就不会使用更多的线程来完成,1)">in range(100):
        res = tp.submit(func,10)
        lst.append(res)

     (3) 获取返回值
     lst:
        setvar.add(i.result())

     (4) 等待所有子线程执行结束之后,在执行主线程
     tp.shutdown()
    主线程执行结束 .... print(setvar,len(setvar))

创建线程池时注意:

回调函数

1.进程池的回调函数: 由主进程执行调用完成的

 func1(i):
    process start ... process end ... return *" * i

 call_back1(obj):
    <===回调函数callback进程号===>(obj.result())

:
    p = ProcessPoolExecutor()
    in range(1,11 p.submit(func1,i)
         print(res.result())
        res.add_done_callback(call_back1)
         self.func(func2)
    p.shutdown()
    主进程执行结束 ... ",os.getpid())

回调函数执行流程

2.线程池的回调函数 : 由当前子线程调用完成的

 func2(i):
    thread start ... thread end ...  call_back2(obj):
    <===回调函数callback线程号===>(obj.result())


:
    tp = ThreadPoolExecutor(5 tp.submit(func2,i)
        res.add_done_callback(call_back2)

    tp.shutdown()
    主线程执行结束 ... 线程池的回调函数是由当前子线程调用完成的

协程:线程的具体实现

1.用协程改写生产者消费者模型

 prodecer():
    ):
        yield 消费者
 consumer(gen):
    (next(gen))

 初始化生成器函数 -> 生成器
gen = prodecer()
consumer(gen)
consumer(gen)
consumer(gen)

2.协程的历史:greenlet+switch

from greenlet  greenlet


 eat():
    eat 1)
    g2.switch()  手动切换到play
    time.sleep(3eat 2)


 play():
    play 1)
    time.sleep(3play 2)
    g1.switch()  手动切换到eat


g1 = greenlet(eat)  创建协程对象g1
g2 = greenlet(play)  创建协程对象g2
g1.switch()  手动切换到eat,执行程序

运行结果如下图所示

3.协程的历史:gevent可以实现切换,但是不能识别time.sleep阻塞

 gevent


 time.sleep(3) # gevent无法识别time.sleep阻塞,要使用自己的gevent.sleep
    gevent.sleep(3 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)

 如果不加join阻塞,默认主线程执行时,不等待直接结束. 阻塞,必须等待g1协程任务执行完毕之后,放行
g1.join()

4.协程的历史:使用monkey彻底解决gevent模块不识别阻塞的问题

from gevent  monkey
monkey.patch_all()  只要在monkey下面的导入,gevent都可以识别这些阻塞
 time
 gevent

)
    
 gevent.spawn(play)
    
g2.join()

")

执行结果如下图所示

5.协程相关方法

1.spawn(函数,参数1,参数2... ) 启动协程

2.join 阻塞,直到某个协程任务执行完毕之后,在执行下面代码

3.joinall 等待所有协程任务都执行完毕之后,放行

  g1.join() g2.join() => gevent.joinall( [g1,g2] ) (推荐)

4.value 获取协程任务中的返回值 g1.value g2.value

 monkey

monkey.patch_all()
吃完了"


玩完了


g1 = gevent.spawn(eat)
g2 = 等待g1,g2协程任务执行完毕之后,在放行
gevent.joinall([g1,g2])

主线程执行结束 .. print(g1.value)  吃完了
print(g2.value)  玩完了

利用协程爬取数据

HTTP 状态码:

  200 ok

  404 not found

  400 bad request

response基本用法

 获取状态码
(response.status_code)

 获取网站中的编码
res = response.apparent_encoding
(res)

 设置编码集,防止乱码
response.encoding = res

 获取网页里面的数据
res = response.text
print(res)

1.用正常的方式去爬取数据

 monkey ; monkey.patch_all()
 requests
 gevent

url_lst = [
http://www.baidu.com/http://www.taobao.com/http://www.jingdong.com/http://www.4399.com/http://www.7k7k.com/
]

 get_url(url):
    response = requests.get(url)
    if response.status_code == 200:
         print(response.text)
        pass

startime = time.time()
 url_lst:
    get_url(i)
endtime =执行时间:8.165239095687866

正常的方式去爬取数据,需要8.16s

2.用协程的方式去爬取数据?

lst = []
startime = url_lst:
    g = gevent.spawn(get_url,i)
    lst.append(g)

gevent.joinall(lst)
endtime = 2.189042329788208

协程的方式去爬取数据,只需要2.18s

3.最终的理想状态

利用好多进程,多线程,多协程让服务器运行速度更快,抗住更大的并发

并发编程思维导图

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读