网站首页 > 文章中心 > 其它

python里的阻塞函数_PYthon函数

作者:小编 更新时间:2023-10-16 11:30:41 浏览量:442人看过

python里并发执行协程时部分阻塞超时怎么办

碰到这种需求时不要惊慌,可以使用wait()里的timeout参数来设置等待时间,也就是从这个函数开始运行算起,如果时间到达协程没有执行完成,就可以不再等它们了,直接从wait()函数里返回,返回之后就可以判断那些没有执行成功的,可以把这些协程取消掉.例子如下:

[python]?view plain?copy

import?asyncio

async?def?phase(i):

print('in?phase?{}'.format(i))

python里的阻塞函数_PYthon函数-图1

try:

await?asyncio.sleep(0.1?*?i)

except?asyncio.CancelledError:

print('phase?{}?canceled'.format(i))

raise

else:

print('done?with?phase?{}'.format(i))

python里的阻塞函数_PYthon函数-图2

return?'phase?{}?result'.format(i)

async?def?main(num_phases):

print('starting?main')

phases?=?[

phase(i)

for?i?in?range(num_phases)

]

print('waiting?0.1?for?phases?to?complete')

completed,?pending?=?await?asyncio.wait(phases,?timeout=0.1)

print('{}?completed?and?{}?pending'.format(

len(completed),?len(pending),

))

#?Cancel?remaining?tasks?so?they?do?not?generate?errors

#?as?we?exit?without?finishing?them.

if?pending:

print('canceling?tasks')

for?t?in?pending:

t.cancel()

print('exiting?main')

event_loop?=?asyncio.get_event_loop()

finally:

event_loop.close()

结果输出如下:

starting main

waiting 0.1 for phases to complete

in phase 0

in phase 1

done with phase 0

canceling tasks

exiting main

phase 1 canceled

python多进程中队列不空时阻塞,求解为什么

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

第一段:先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的"先吃先拉"与"后吃先吐",其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put('yang')

q.put(['yan','xing'])

在队列中取值get()

默认的队列是先进先出的

q.get()

'yang'

['yan', 'xing']

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小

Queue.empty() 如果队列为空,返回True,反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.get([block[, timeout]]) 获取队列,timeout等待时间

Queue.get_nowait() 相当Queue.get(False)

非阻塞 Queue.put(item) 写入队列,timeout等待时间

Queue.put_nowait(item) 相当Queue.put(item, False)

第二段:multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

import os

# 子进程要执行的代码

def run_proc(name):

print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':

print 'Parent process %s.' % os.getpid()

p = Process(target=run_proc, args=('test',))

print 'Process will start.'

p.start()

p.join()

print 'Process end.'

第三段:在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

import os, time

def long_time_task(name):

print 'Run task %s (%s)...' % (name, os.getpid())

start = time.time()

end = time.time()

p = Pool()

p.apply_async(long_time_task, args=(i,))

print 'Waiting for all subprocesses done...'

p.close()

print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了.

当时也可以是实例pool的时候给它定义一个进程的多少

第三段:多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue

import os, time, random

# 写数据进程执行的代码:

def write(q):

for value in ['A', 'B', 'C']:

print 'Put %s to queue...' % value

q.put(value)

time.sleep(random.random())

# 读数据进程执行的代码:

def read(q):

while True:

if not q.empty():

value = q.get(True)

print 'Get %s from queue.' % value

break

# 父进程创建Queue,并传给各个子进程:

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

# 启动子进程pw,写入:

pw.start()

# 等待pw结束:

pw.join()

# 启动子进程pr,读取:

pr.start()

pr.join()

# pr进程里是死循环,无法等待其结束,只能强行终止:

print

print '所有数据都写入并且读完'

第四段:关于上面代码的几个有趣的问题

pw = p.apply_async(write,args=(q,))

pr = p.apply_async(read,args=(q,))

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

manager = multiprocessing.Manager()

q = manager.Queue()

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

from multiprocessing import Process,Queue,Pool

import multiprocessing

def write(q,lock):

lock.acquire() #加上锁

lock.release() #释放锁

value = q.get(False)

lock = manager.Lock() #初始化一把锁

pw = p.apply_async(write,args=(q,lock))

python wait()函数问题

看了你发的函数:

def Wait(self):

self._app.MainLoop()

看名字应该是启动了阻塞循环,去处理app的请求,这个就是需要一直运行的,因为一旦停止了,你的app请求就没发处理了.

如果你需要启动后再执行的别的程序,可以使用多进程,把这个启动放在别的进程里去执行.

如果解决了您的问题请采纳!

如果未解决请继续追问

python2.7怎么实现异步

改进之前

之前,我的查询步骤很简单,就是:

前端提交查询请求 -- 建立数据库连接 -- 新建游标 -- 执行命令 -- 接受结果 -- 关闭游标、连接

这几大步骤的顺序执行.

这里面当然问题很大:

建立数据库连接实际上就是新建一个套接字.这是进程间通信的几种方法里,开销最大的了.

在"执行命令"和"接受结果"两个步骤中,线程在阻塞在数据库内部的运行过程中,数据库连接和游标都处于闲置状态.

这样一来,每一次查询都要顺序的新建数据库连接,都要阻塞在数据库返回结果的过程中.当前端提交大量查询请求时,查询效率肯定是很低的.

第一次改进

之前的模块里,问题最大的就是第一步——建立数据库连接套接字了.如果能够一次性建立连接,之后查询能够反复服用这个连接就好了.

作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果.

第二次改进

但是对于第二个问题,使用线程池还是有些欠妥当.因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度.线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小.线程多一点,甚至会出现"抖动"问题(也就是刚刚唤醒一个线程,就进入挂起状态,刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘),效率大大降低.也就是说,线程池的并发量很有限.

试过了多进程、多线程,只能在单个线程里做文章了.

Python中的asyncio库

import asyncio

async def wait():

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函数的执行稍微麻烦点.需要首先获取一个loop对象,然后由这个对象代为执行async def函数.

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发.

不过需要注意的是,run_until_complete()函数本身是一个阻塞函数.也就是说,当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数.所以下面这段代码并不能并发执行.

for task in task_list:

loop.run_until_complete(task)

对与这个问题,asyncio库也有相应的解决方案:gather函数.

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

当然了,async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档.

Python下的I/O多路复用

协程,实际上,也存在上下文切换,只不过开销很轻微.而I/O多路复用则完全不存在这个问题.

目前,Linux上比较火的I/O多路复用API要算epoll了.Tornado,就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳).

在Linux里查文档,可以看到epoll只有三类函数,调用起来比较方便易懂.

创建epoll对象,并返回其对应的文件描述符(file descriptor).

int epoll_create(int size);

int epoll_create1(int flags);

控制监听事件.第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件.

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候.这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发.

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select库里:

select.epoll()对应于第一类创建函数;

epoll.register(),epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;

epoll.poll()则是对等候函数epoll_wait的封装.

Python里epoll相关API的最大问题应该是在epoll.poll().相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events.没法实现精确控制.所以呢只能使用替代方案:select.select()函数.

根据Python官方文档,select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近.前三个参数都是列表,其中的元素都是要注册到内核的文件描述符.如果想用自定义类,就要确保实现了fileno()方法.

其分别对应于:

rlist: 等候直到可读

wlist: 等候直到可写

xlist: 等候直到异常.这个异常的定义,要查看系统文档.

select.select(),类似于epoll.poll(),先注册文件和事件,然后保持等候内核通知,是阻塞函数.

实际应用

所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务,再向连接池请求连接,创建游标,然后执行命令,并返回结果.在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用.

其中,连接池返回连接时,会根据引用连接的协程数量,返回负载最轻的连接.这也是自己定义AsyncConnectionPool类的目的.

我的代码位于:bottle-blog/dbservice.py

存在问题

当然了,这个流程目前还一些问题.

首先就是每次轮询拿到任务之后,都会走这么一个流程.

获取连接 -- 新建游标 -- 执行任务 -- 关闭游标 -- 取消连接引用

本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务.这样可以减少轮询时的任务量.但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了.

所以这一块,还有工作要做.

还有就是epoll没能用上,有些遗憾.

以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用.

最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了!!!必须看源码才能弄清楚功能.

python:如何以非阻塞的方式读

代码是这样的:

subp = subprocess.Popen(["d:/T1.exe"], shell=True, stdout=subprocess.PIPE, bufsize=0)

subp.stdout.read()

但是发现read和readline函数是阻塞方式调用的,一定要subprocess运行结束才能返回数据.

以上就是土嘎嘎小编为大家整理的python里的阻塞函数相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章