import re
import reprlib
RE_WORD = re.compile('\w+')
class Sentence:
def __init__(self, text):
self.text = text ➊
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
def __iter__(self):
for match in RE_WORD.finditer(self.text): ➋
yield match.group() ➌
❶ 不再需要 words 列表。
❷ finditer 函数构建一个迭代器,包含 self.text 中匹配 RE_WORD 的单词,产出 MatchObject 实例。
❸ match.group() 方法从 MatchObject 实例中提取匹配正则表达式的具体文本
生成器表达式可以理解为列表推导的惰性版本:不会迫切地构建列表,而是返回一个生成器,按需惰性生成元素
也就是说,如果列表推导是制造列表的工厂,那么生成器表达式就是制造生成器的工厂
import re
import reprlib
RE_WORD = re.compile('\w+')
class Sentence:
def __init__(self, text):
self.text = text
def __repr__(self):
return 'Sentence(%s)' % reprlib.repr(self.text)
def __iter__(self):
return (match.group() for match in RE_WORD.finditer(self.text))
import itertools
nums = itertools.count(start=2,step=3)
for i in nums:
if i>15:
break
print(i, end=' ')
Out[17]:
2 5 8 11 14
[i for i in itertools.repeat(['a','b'],3)]
Out[18]: [['a', 'b'], ['a', 'b'], ['a', 'b']]
import itertools
import threading
def generator():
for i in range(1000000):
yield i
g = generator()
g_1, g_2 = itertools.tee(g, 2)
for x in [g_1, g_2]:
threading.Thread(target=sum, args=(x,)).start()
多线程安全版本
class ThreadingTee:
def __init__(self, tee_obj, lock):
self.tee_obj = tee_obj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.tee_obj)
def __copy__(self):
return ThreadingTee(self.tee_obj.__copy__(), self.lock)
def threading_tee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(ThreadingTee(tee_obj, lock) for tee_obj in itertools.tee(iterable, n))
def take(n, iterable):
"Return first n items of the iterable as a list"
return list(islice(iterable, n))
def tabulate(function, start=0):
"Return function(0), function(1), ..."
return imap(function, count(start))
def consume(iterator, n):
"Advance the iterator n-steps ahead. If n is none, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
def nth(iterable, n, default=None):
"Returns the nth item or a default value"
return next(islice(iterable, n, None), default)
def quantify(iterable, pred=bool):
"Count how many times the predicate is true"
return sum(imap(pred, iterable))
def padnone(iterable):
"""Returns the sequence elements and then returns None indefinitely.
Useful for emulating the behavior of the built-in map() function.
"""
return chain(iterable, repeat(None))
def ncycles(iterable, n):
"Returns the sequence elements n times"
return chain.from_iterable(repeat(tuple(iterable), n))
def dotproduct(vec1, vec2):
return sum(imap(operator.mul, vec1, vec2))
def flatten(listOfLists):
"Flatten one level of nesting"
return chain.from_iterable(listOfLists)
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return izip(a, b)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
def roundrobin(*iterables):
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).next for it in iterables)
while pending:
try:
for next in nexts:
yield next()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
def powerset(iterable):
"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in ifilterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_justseen(iterable, key=None):
"List unique elements, preserving order. Remember only the element just seen."
# unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
# unique_justseen('ABBCcAD', str.lower) --> A B C A D
return imap(next, imap(itemgetter(1), groupby(iterable, key)))
def iter_except(func, exception, first=None):
""" Call a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like __builtin__.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
Examples:
bsddbiter = iter_except(db.next, bsddb.error, db.first)
heapiter = iter_except(functools.partial(heappop, h), IndexError)
dictiter = iter_except(d.popitem, KeyError)
dequeiter = iter_except(d.popleft, IndexError)
queueiter = iter_except(q.get_nowait, Queue.Empty)
setiter = iter_except(s.pop, KeyError)
"""
try:
if first is not None:
yield first()
while 1:
yield func()
except exception:
pass
def random_product(*args, **kwds):
"Random selection from itertools.product(*args, **kwds)"
pools = map(tuple, args) * kwds.get('repeat', 1)
return tuple(random.choice(pool) for pool in pools)
def random_permutation(iterable, r=None):
"Random selection from itertools.permutations(iterable, r)"
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(random.sample(pool, r))
def random_combination(iterable, r):
"Random selection from itertools.combinations(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.sample(xrange(n), r))
return tuple(pool[i] for i in indices)
def random_combination_with_replacement(iterable, r):
"Random selection from itertools.combinations_with_replacement(iterable, r)"
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.randrange(n) for i in xrange(r))
return tuple(pool[i] for i in indices)
def tee_lookahead(t, i):
"""Inspect the i-th upcomping value from a tee object
while leaving the tee object at its current position.
Raise an IndexError if the underlying iterator doesn't
have enough values.
"""
for value in islice(t.__copy__(), i, None):
return value
raise IndexError(i)
自定义扩展
# 将序列按大小切分,更好的性能
from itertools import chain, islice
def chunks(iterable, size, format=iter):
it = iter(iterable)
while True:
yield format(chain((it.next(),), islice(it, size - 1)))
>>> l = ["a", "b", "c", "d", "e", "f", "g"]
>>> for chunk in chunks(l, 3, tuple):
print(chunk)
("a", "b", "c")
("d", "e", "f")
("g",)
Python 新引入的 yield from 句法允许生成器或协程把工作委托给第三方完成,这样就无需嵌套 for 循环作为变通了
import random,time
def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_cnt = yield b
print('let me think {0} secs'.format(sleep_cnt))
time.sleep(sleep_cnt)
a, b = b, a + b
index += 1
print('-'*10 + 'test yield send' + '-'*10)
N = 5
sfib = stupid_fib(N)
fib_res = next(sfib)
while True:
print(fib_res)
try:
fib_res = sfib.send(random.uniform(0, 0.5))
except StopIteration:
break
----------test yield send----------
1
let me think 0.2438615286011866 secs
1
let me think 0.027476256830278822 secs
2
let me think 0.09717699872403579 secs
3
let me think 0.017161862262742633 secs
5
let me think 0.3313821890336833 secs
In [202]: import inspect
In [203]: def generator():
...: i = '激活生成器'
...: while True:
...: try:
...: value = yield i
...: except ValueError:
...: print('OVER')
...: i = value
...:
In [204]: g = generator() # 1
In [205]: inspect.getgeneratorstate(g) # 2
Out[205]: 'GEN_CREATED'
In [206]: next(g) # 3
Out[206]: '激活生成器'
In [207]: inspect.getgeneratorstate(g)
Out[207]: 'GEN_SUSPENDED'
In [208]: g.send('Hello Shiyanlou') # 4
Out[208]: 'Hello Shiyanlou'
In [209]: g.throw(ValueError) # 5
OVER
Out[209]: 'Hello Shiyanlou'
In [210]: g.close() # 6
In [211]: inspect.getgeneratorstate(g)
Out[211]: 'GEN_CLOSED'
在 Python3.3 出现了 yield from 语法, yield from item 表达式从 item 中获得迭代器
yield from 可以代替 for 循环,使得代码更为精炼,yield from 后面需要加的是可迭代对象
yield from i 完全代替了内层的 for 循环。而且代码读起来更顺畅,不过感觉更像是语法糖
除了代替循环之外,yield from 还会创建通道,把内层生成器直接与外层生成器的客户端联系起来
把生成器当成协程使用时,这个通道特别重要,不仅能为客户端代码生成值,还能使用客户端代码提供的值
# yield from
def first_gen():
for c in "AB":
yield c
for i in range(0, 3):
yield i
print(list(first_gen()))
def second_gen():
yield from "AB"
yield from range(0, 3)
print(list(second_gen()))
['A', 'B', 0, 1, 2]
['A', 'B', 0, 1, 2]
当 yiled from 后面加上一个生成器之后,就实现了生成的嵌套。实现生成器的嵌套,不一定要使用 yield from
但它可以让我们避免让自己处理各种料想不到的异常,如果自己去实现,会加大编码的难度。
yield from 的主要功能是打开双向通道,把最外层的调用与最内层的子生成器连接起来
这样二者就可以直接发送和产出值,还可以直接穿入异常
子生成器再把产出值发给调用方,子生成器返回之后,解释器会抛出 StopIteration 异常
委托生成器的作用就是:在调用方与子生成器之间建立一个双向通道
为什么一定要使用 yield from 语句呢:在使用 yiled from 语句时,语句为我们已经处理了很多的异常
def my_generator():
for i in range(5):
if i==2:
return '我被迫中断了'
else:
yield i
def main(generator):
try:
for i in generator: #不会显式触发异常,故而无法获取到return的值
print(i)
except StopIteration as exc:
print(exc.value)
g=my_generator() #调用
main(g)
'''
运行结果为:
0
1
'''
def my_generator():
for i in range(5):
if i==2:
return '我被迫中断了'
else:
yield i
def main(generator):
try:
print(next(generator)) #每次迭代一个值,则会显式出发StopIteration
print(next(generator))
print(next(generator))
print(next(generator))
print(next(generator))
except StopIteration as exc:
print(exc.value) #获取返回的值
g=my_generator()
main(g)
'''
运行结果为:
0
1
我被迫中断了
'''
现在我们使用yield from来完成上面的同样的功能
def my_generator():
for i in range(5):
if i==2:
return '我被迫中断了'
else:
yield i
def wrap_my_generator(generator): #定义一个包装“生成器”的生成器,它的本质还是生成器
result = yield from generator #自动触发StopIteration异常,并且将return的返回值赋值给yield from表达式的结果
print(result)
def main(generator):
for j in generator:
print(j)
g = my_generator()
wrap_g = wrap_my_generator(g)
main(wrap_g) #调用
'''
运行结果为:
0
1
我被迫中断了
'''
从上面的比较可以看出,yield from具有以下几个特点:
调用方—>生成器函数(协程函数)
调用方—>生成器包装函数—>生成器函数(协程函数)
return返回的值或者是StopIteration的value 属性的值变成 yield from 表达式的值,即上面的result
在上面的代码中, asyncio.sleep 中,创建了一个 Futrure 对象,作为更内层的协程对象,通过 yield from 交给了事件循环,而 Future 是一个实现了 __iter__ 对象的生成器。
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
future = futures.Future(loop=loop)
h = future._loop.call_later(delay,
future._set_result_unless_cancelled, result)
try:
return (yield from future)
finally:
h.cancel()
class Future:
#blabla...
def __iter__(self):
if not self.done():
self._blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
# 当协程 yield from asyncio.sleep 时,事件循环其实是与 Future 对象建立了联系。程序运行结果如下:
T-minus 2 (A)
T-minus 5 (B)
T-minus 1 (A)
T-minus 4 (B)
T-minus 3 (B)
T-minus 2 (B)
T-minus 1 (B)
async 和 await
hello world
import asyncio
import time
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
# 预期输出
started at 17:13:52
hello
world
finished at 17:13:55
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
# 预期的输出显示代码段的运行时间比之前快了 1 秒
started at 17:14:32
hello
world
finished at 17:14:34
asyncio模块历史演进
asyncio是python3.4引入的库,翻译过来就是异步I/O
用await代替yield from,功能一模一样,程序调度
装饰器@asyncio.coroutine和关键字async
@asyncio.coroutine
def func1():
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
# 等价于
async def func1():
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
result = [i async for i in aiter() if i % 2]
result = [await func() for fun in funcs if await condition()]
async def test(x, y):
for i in range(y):
yield i
await asyncio.sleep(x)
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: async_test.py
@Description:
@time: 2023/4/6 10:31
"""
import asyncio, random
import threading
random.seed(5)
async def rnd_sleep(t):
# sleep for T seconds on average
await asyncio.sleep(t * random.random() * 2)
async def producer(queue):
lst = list(range(10))
for token in lst:
# produce a token and send it to a consumer
print(f'produced {token}')
await queue.put(token)
await rnd_sleep(.1)
async def consumer(queue):
while True:
token = await queue.get()
# process the token received from a producer
await rnd_sleep(.1)
queue.task_done()
print(f'consumed {token}')
async def main():
queue = asyncio.Queue()
# fire up the both producers and consumers
producers = [asyncio.create_task(producer(queue)) for _ in range(3)]
consumers = [asyncio.create_task(consumer(queue)) for _ in range(10)]
# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print('---- done producing')
# wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
if __name__ == '__main__':
print("hello")
# 多线程+协程方式
t1 = threading.Thread(target=asyncio.run, args=(main(),))
t1.start()
t1.join()
# 协程调用
# asyncio.run(main())
print("end")