碰到这种需求时不要惊慌,可以使用wait()里的timeout参数来设置等待时间,也就是从这个函数开始运行算起,如果时间到达协程没有执行完成,就可以不再等它们了,直接从wait()函数里返回,返回之后就可以判断那些没有执行成功的,可以把这些协程取消掉.例子如下:
[python]?view plain?copy
import?asyncio
async?def?phase(i):
print('in?phase?{}'.format(i))
try:
await?asyncio.sleep(0.1?*?i)
except?asyncio.CancelledError:
print('phase?{}?canceled'.format(i))
raise
else:
print('done?with?phase?{}'.format(i))
return?'phase?{}?result'.format(i)
async?def?main(num_phases):
print('starting?main')
phases?=?[
phase(i)
for?i?in?range(num_phases)
]
print('waiting?0.1?for?phases?to?complete')
completed,?pending?=?await?asyncio.wait(phases,?timeout=0.1)
print('{}?completed?and?{}?pending'.format(
len(completed),?len(pending),
))
#?Cancel?remaining?tasks?so?they?do?not?generate?errors
#?as?we?exit?without?finishing?them.
if?pending:
print('canceling?tasks')
for?t?in?pending:
t.cancel()
print('exiting?main')
event_loop?=?asyncio.get_event_loop()
finally:
event_loop.close()
结果输出如下:
starting main
waiting 0.1 for phases to complete
in phase 0
in phase 1
done with phase 0
canceling tasks
exiting main
phase 1 canceled
最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程
第一段:先说说Queue(队列对象)
Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的"先吃先拉"与"后吃先吐",其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多
import Queue
q = Queue.Queue(10)
向队列中放值(put)
q.put('yang')
q.put(['yan','xing'])
在队列中取值get()
默认的队列是先进先出的
q.get()
'yang'
['yan', 'xing']
当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到
get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常
所以更常用的方法是先判断一个队列是否为空,如果不为空则取值
队列中常用的方法
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
第二段:multiprocessing中使用子进程概念
from multiprocessing import Process
可以通过Process来构造一个子进程
p = Process(target=fun,args=(args))
再通过p.start()来启动子进程
再通过p.join()方法来使得子进程运行结束后再执行父进程
import os
# 子进程要执行的代码
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
第三段:在multiprocessing中使用pool
如果需要多个子进程时可以考虑使用进程池(pool)来管理
from multiprocessing import Pool
import os, time
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
end = time.time()
p = Pool()
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
print 'All subprocesses done.'
pool创建子进程的方法与Process不同,是通过
代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了.
当时也可以是实例pool的时候给它定义一个进程的多少
第三段:多个子进程间的通信
多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,
#coding:gbk
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
break
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子进程pr,读取:
pr.start()
pr.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
print '所有数据都写入并且读完'
第四段:关于上面代码的几个有趣的问题
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类
manager = multiprocessing.Manager()
q = manager.Queue()
这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧
关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁
from multiprocessing import Process,Queue,Pool
import multiprocessing
def write(q,lock):
lock.acquire() #加上锁
lock.release() #释放锁
value = q.get(False)
lock = manager.Lock() #初始化一把锁
pw = p.apply_async(write,args=(q,lock))
看了你发的函数:
def Wait(self):
self._app.MainLoop()
看名字应该是启动了阻塞循环,去处理app的请求,这个就是需要一直运行的,因为一旦停止了,你的app请求就没发处理了.
如果你需要启动后再执行的别的程序,可以使用多进程,把这个启动放在别的进程里去执行.
如果解决了您的问题请采纳!
如果未解决请继续追问
改进之前
之前,我的查询步骤很简单,就是:
前端提交查询请求 -- 建立数据库连接 -- 新建游标 -- 执行命令 -- 接受结果 -- 关闭游标、连接
这几大步骤的顺序执行.
这里面当然问题很大:
建立数据库连接实际上就是新建一个套接字.这是进程间通信的几种方法里,开销最大的了.
在"执行命令"和"接受结果"两个步骤中,线程在阻塞在数据库内部的运行过程中,数据库连接和游标都处于闲置状态.
这样一来,每一次查询都要顺序的新建数据库连接,都要阻塞在数据库返回结果的过程中.当前端提交大量查询请求时,查询效率肯定是很低的.
第一次改进
之前的模块里,问题最大的就是第一步——建立数据库连接套接字了.如果能够一次性建立连接,之后查询能够反复服用这个连接就好了.
作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果.
第二次改进
但是对于第二个问题,使用线程池还是有些欠妥当.因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度.线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小.线程多一点,甚至会出现"抖动"问题(也就是刚刚唤醒一个线程,就进入挂起状态,刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘),效率大大降低.也就是说,线程池的并发量很有限.
试过了多进程、多线程,只能在单个线程里做文章了.
Python中的asyncio库
import asyncio
async def wait():
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函数的执行稍微麻烦点.需要首先获取一个loop对象,然后由这个对象代为执行async def函数.
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发.
不过需要注意的是,run_until_complete()函数本身是一个阻塞函数.也就是说,当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数.所以下面这段代码并不能并发执行.
for task in task_list:
loop.run_until_complete(task)
对与这个问题,asyncio库也有相应的解决方案:gather函数.
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
当然了,async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档.
Python下的I/O多路复用
协程,实际上,也存在上下文切换,只不过开销很轻微.而I/O多路复用则完全不存在这个问题.
目前,Linux上比较火的I/O多路复用API要算epoll了.Tornado,就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳).
在Linux里查文档,可以看到epoll只有三类函数,调用起来比较方便易懂.
创建epoll对象,并返回其对应的文件描述符(file descriptor).
int epoll_create(int size);
int epoll_create1(int flags);
控制监听事件.第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候.这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发.
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select库里:
select.epoll()对应于第一类创建函数;
epoll.register(),epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;
epoll.poll()则是对等候函数epoll_wait的封装.
Python里epoll相关API的最大问题应该是在epoll.poll().相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events.没法实现精确控制.所以呢只能使用替代方案:select.select()函数.
根据Python官方文档,select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近.前三个参数都是列表,其中的元素都是要注册到内核的文件描述符.如果想用自定义类,就要确保实现了fileno()方法.
其分别对应于:
rlist: 等候直到可读
wlist: 等候直到可写
xlist: 等候直到异常.这个异常的定义,要查看系统文档.
select.select(),类似于epoll.poll(),先注册文件和事件,然后保持等候内核通知,是阻塞函数.
实际应用
所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务,再向连接池请求连接,创建游标,然后执行命令,并返回结果.在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用.
其中,连接池返回连接时,会根据引用连接的协程数量,返回负载最轻的连接.这也是自己定义AsyncConnectionPool类的目的.
我的代码位于:bottle-blog/dbservice.py
存在问题
当然了,这个流程目前还一些问题.
首先就是每次轮询拿到任务之后,都会走这么一个流程.
获取连接 -- 新建游标 -- 执行任务 -- 关闭游标 -- 取消连接引用
本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务.这样可以减少轮询时的任务量.但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了.
所以这一块,还有工作要做.
还有就是epoll没能用上,有些遗憾.
以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用.
最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了!!!必须看源码才能弄清楚功能.
代码是这样的:
subp = subprocess.Popen(["d:/T1.exe"], shell=True, stdout=subprocess.PIPE, bufsize=0)
subp.stdout.read()
但是发现read和readline函数是阻塞方式调用的,一定要subprocess运行结束才能返回数据.
以上就是土嘎嘎小编为大家整理的python里的阻塞函数相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!