本文共 5173 字,大约阅读时间需要 17 分钟。
协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。
所谓用户态就是说协程是由用户来控制的,CPU不认识协程,协程是跑在线程中的。
协程拥有自己的寄存器上下文栈。协程调试切换时,将寄存器上下文栈保存到其他地方,在切回来时,恢复先前保存的寄存器上下文栈。
因此,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,也就是进入上一次离开时所处逻辑流的位置。
线程切换时会将上下文和栈保存到CPU的寄存器中。
协程的标准定义,即符合以下所有条件就能称之为协程:
1.在单线程里实现并发
2.修改共享数据不需要加锁
3.用户程序里自己保存多个控制流的上下文栈
4.一个协程遇到IO操作自动切换到其它协程
协程的好处:
无需线程上下文切换的开销
无需原子操作锁定及同步的开销
原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都行,很适合用于高并发处理
协程的缺点:
无法利用多核资源:
协程的本质是个单线程,它不能同时将单个CPU的多个核用上
协程需要和进程配合才能运行在多CPU上。
进行阻塞(Blocking)操作(如IO)时会阻塞掉整个程序
使用yield实现协程的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | #!/usr/bin/python #Author:sean import time def consumer(name): print ( "--->start eating baozi..." ) while True : new_baozi = yield print ( "[%s] is eating baozi %s" % (name,new_baozi)) # time.sleep(2) def producter(): r = tom.__next__() r = jerry.__next__() n = 0 while n < 5 : n + = 1 tom.send(n) jerry.send(n) print ( "\033[32;1m[producter]\033[0m is making baozi %s" % n) if __name__ = = '__main__' : tom = consumer( "tom" ) jerry = consumer( "jerry" ) p = producter() |
如何在单线程下实现并发效果?
答案是遇到IO操作就切换,因为IO操作耗时比较长
协程之所以能处理高并发,其实就是把IO操作给干掉了,就是一遇到IO操作就切换。
这样的话整个程序就变成了只有CPU在运算。
一遇到IO操作就切换,那么到底什么时候再切回去呢?
答案是当IO操作结束后就切回去。
那么问题又来了,python怎么来监测IO操作是否结束呢?带着这个问题先来看看几个例子
greenlet模块:
greenlet是一个封装好的协程,通过switch方法手动进行切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | #!/usr/bin/python #Author:sean from greenlet import greenlet def func1(): print ( "haha11" ) gr2.switch() print ( "haha22" ) gr2.switch() def func2(): print ( "haha33" ) gr1.switch() print ( "haha44" ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() |
gevent模块:
gevent是一个第三方库,可以轻松实现并发同步或异步编程。
在gevent中用到的主要是greenlet,它是以C扩展模式形式接入python的轻量级协程。
greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度
gevent能够自动进行IO切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | #!/usr/bin/python #Author:sean import gevent def foo(): print ( "Running in foo" ) gevent.sleep( 0 ) #模仿IO操作 print ( 'Explicit context switch to foo again' ) def bar(): print ( 'Explicit context to bar' ) gevent.sleep( 0 ) #模仿IO操作 print ( 'Implicit context switch back to bar' ) gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar) ]) |
同步与异步的区别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | #!/usr/bin/python #Author:sean import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep( 0.5 ) print ( 'Task %s done' % pid) def synchronous(): for i in range ( 1 , 10 ): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range ( 10 )] gevent.joinall(threads) print ( 'Synchronous:' ) synchronous() print ( 'Asynchronous:' ) asynchronous() |
用协程并发爬虫爬取网站:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | #!/usr/bin/python #Author:sean from urllib import request import gevent #默认情况下,gevent并不知道urllib或者socket什么时候进行了IO操作 #默认情况下,gevent和urllib以及socket并没有任何关联,当然就无法提高效率,因为其实质上还是串行操作 #要想让gevent知道urllib或socket正在进行IO操作,需要给gevent打个补丁 from gevent import monkey monkey.patch_all() #把当前程序的所有IO操作单独做上标记 def f(url): print ( 'GET: %s' % url) resp = request.urlopen(url) data = resp.read() # f = open("url.html","wb") # f.write(data) # f.close() print ( '%d bytes received from %s.' % ( len (data),url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org' ), gevent.spawn(f, 'https://yahoo.com' ), gevent.spawn(f, 'https://github.com' ) ]) |
用gevent协程写一个单线程高并发的socket:
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | #!/usr/bin/python #Author:sean import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() #把当前程序的所有IO操作单独做上标记 def server(host,port): s = socket.socket() s.bind((host,port)) s.listen( 500 ) while True : cli,addr = s.accept() gevent.spawn(handle_request,cli) def handle_request(conn): try : while True : data = conn.recv( 1024 ) print ( "recv: " ,data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as e: print (e) finally : conn.close() if __name__ = = '__main__' : server( '0.0.0.0' , 8001 ) |
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #!/usr/bin/python #Author:sean import socket HOST = 'localhost' #The remote host PORT = 8001 #The same port as used by the server s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True : msg = bytes( input ( ">>:" ),encoding = "utf-8" ) s.sendall(msg) data = s.recv( 1024 ) print ( 'Received' , repr (data)) s.close() |
并发100个sock连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | #!/usr/bin/python #Author:sean import socket import threading def sock_conn(): client = socket.socket() client.connect(( "localhost" , 8001 )) count = 0 while True : #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ( "hello %s" % count).encode( "utf-8" )) data = client.recv( 1024 ) print ( "[%s]recv from server:" % threading.get_ident(),data.decode()) #结果 count + = 1 client.close() for i in range ( 100 ): t = threading.Thread(target = sock_conn) t.start() |
事件驱动与异步IO,
现在我们可以来回答下这个问题了,python如何监测IO操作是否结束?
IO操作是由操作系统进行处理的,当遇到IO操作时就切换
等IO操作完以后让其调用回调函数,回调函数会通知协程说这个IO操作完成了