协程初体验之简单的利用框架

Threezh1 2019年8月22日

最近在接触一下协程,上手不易,拖了很长时间才把这个小框架完成。有些地方很简陋,后面慢慢完善吧。

做这个的想法是方便自己后面编写需要用到协程的脚本。(还有忘记协程的知识点了就过来复习)

先看框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio, functools
import time

def SomeFunction(parameter):
"""
这里是目标函数,脚本的功能都应在此函数当中。
参数个数都由自己确定,需要与functools.partial调用的参数一致。
"""
time.sleep(1)
return parameter

async def coroutine_execution(function, param1):
"""
通过run_in_executor方法来新建一个线程来执行耗时函数。
注意:functools.partial调用的参数应与目标函数一致
"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None,functools.partial(function, parameter=param1))
# result为目标函数返回的值
print(result)

def coroutine_init(function, parameters, threads):
"""
处理线程
coroutine_execution()调用协程函数,可自行修改参数个数内容等。
"""
times = int(len(parameters) / threads) + 1
if len(parameters) == threads or int(len(parameters) % threads) == 0: times -= 1
for num in range(times):
tasks = []
Minimum = threads * num
Maximum = threads * (num + 1)
if num == times - 1 and len(parameters) % threads != 0:
Minimum = (times - 1) * threads
Maximum = len(parameters)
if len(parameters) <= threads:
Minimum = 0
Maximum = len(parameters)
for i in range(Minimum, Maximum):
# 此处的parameters[i]就是取目标参数的单个值,可自行调整
future = asyncio.ensure_future(coroutine_execution(function, param1=parameters[i]))
tasks.append(future)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("[*] The {}th thread ends".format(str(num + 1)))
return None

if __name__ == "__main__":
words = ["Hello World.", "I'm Threezh1.", "Welcome to my blog.", "One", "Tow", "Three", "Four"]
coroutine_init(SomeFunction, parameters=words, threads=3)

脚本运行的结果为:

1
2
3
4
5
6
7
8
9
10
11
I'm Threezh1.
Hello World.
Welcome to my blog.
[*] The 1th thread ends
One
Three
Tow
[*] The 2th thread ends
Four
[*] The 3th thread ends
[Finished in 3.2s]

一共只花费3秒

框架的使用

  • SomeFunction() 需要实现并发的函数
  • coroutine_execution() 新建一个线程来执行并发函数
  • coroutine_init() 处理线程与调配任务

流程图:

process.png

需要调整的内容:

一. 传递的参数列表

列表中单个成员最后都会被赋值到目标函数。
比如:批量获取网站标题,这里的成员就为单个url。

二. 目标函数的参数

通过调整目标函数的参数个数,可以使框架适用于更加复杂的场景。
需要注意的是各个函数互相调用,其中的参数要尽量保持一致。

三. 返回结果的获取

在单个线程当中无法对全局变量进行修改,对结果的处理可以放在coroutine_execution()当中。
coroutine_execution()中也可以直接return结果,则会返回到coroutine_init()中。

协程笔记

  • 一些概念和方法

    • event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。
    • coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
    • task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
    • future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
    • async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
  • 事件循环

asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

1
2
loop = asyncio.get_event_loop()
loop.run_until_complete(test1())
  • 任务

所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。

第一种方式就是等到task状态为finish时,调用task的result方法获取返回值。

asyncio.ensure_future() 等同于asyncio.create_task(coro)
将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。
1
2
3
4
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())

第二种是创建回调函数。

1
2
3
4
5
6
7
def callback(future):
print('Callback:',future.result()) # 通过future对象的result方法可以获取协程函数的返回值

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1()) # 创建task,test1()是一个协程对象
task.add_done_callback(callback) # 绑定回调函数
loop.run_until_complete(task)

同样可以使用functools.partial偏导数添加多个参数

1
2
3
4
5
def callback(param1,param2,future):
print(param1,param2)
print('Callback:',future.result())

task.add_done_callback(functools.partial(callback,"param1","param2"))
  • 协程停止
1
2
3
4
5
6
7
8
9
10
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
loop.run_forever()
finally:
loop.close()
  • 协程并发
1
2
3
4
5
6
7
tasks = [
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法

重点在asyncio.wait。当任务ALL_COMPLETED的时候才会结束。

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。
  • 对一些情况的处理

阻塞函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。

解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行,即可以通过run_in_executor方法来新建一个线程来执行耗时函数。
(羡慕大佬的总结总是这么简单明了…)

大佬的这个例子也是这个小框架的灵魂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print(e)
print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

参考学习

0%