下面是一个简单的 for 循环,迭代一个字符串。这里,字符串 'ABC' 是可迭代的对象。背后是有迭代器的,只不过我们看不到:
>>> s ='ABC'>>>for char in s:... print(char)...ABC
如果没有 for 语句,不得不使用 while 循环模拟,要像下面这样写:
>>> s ='ABC'>>> it =iter(s)# ➊>>>whileTrue:... try:... print(next(it))# ➋... exceptStopIteration:# ➌... del it # ➍... break# ➎...ABC❶ 使用可迭代的对象构建迭代器 it。❷ 不断在迭代器上调用 next 函数,获取下一个字符。❸ 如果没有字符了,迭代器会抛出 StopIteration 异常。❹ 释放对 it 的引用,即废弃迭代器对象。❺ 退出循环。
StopIteration 异常表明迭代器到头了
Python 语言内部会处理 for 循环和其他迭代上下文(如列表推导、元组拆包等等)中的 StopIteration 异常
明确可迭代的对象和迭代器之间的关系:Python 从可迭代的对象中获取迭代器
迭代器
标准的迭代器接口有两个方法
__next__: 返回下一个可用的元素,如果没有元素了,抛出 StopIteration 异常
__iter__: 返回 self,以便在应该使用可迭代对象的地方使用迭代器,例如在 for 循环中
import itertoolsnums = itertools.count(start=2,step=3)for i in nums:if i>15:breakprint(i, end=' ')Out[17]:2581114[i for i in itertools.repeat(['a','b'],3)]Out[18]: [['a','b'], ['a','b'], ['a','b']]
>>>from itertools import accumulate>>>import operator # operator --- 标准运算符替代函数>>> a = [1,2,3,4,5]>>> b =accumulate(a)# 默认是累加 这里返回的是一个可迭代对象>>>list(b)# 强制转化[1,3,6,10,15]
使用现有扩展功能
使用内置的itertools可以组合扩展出更多更强大功能的方法
deftake(n,iterable):"Return first n items of the iterable as a list"returnlist(islice(iterable, n))deftabulate(function,start=0):"Return function(0), function(1), ..."returnimap(function, count(start))defconsume(iterator,n):"Advance the iterator n-steps ahead. If n is none, consume entirely."# Use functions that consume iterators at C speed.if n isNone:# feed the entire iterator into a zero-length deque collections.deque(iterator, maxlen=0)else:# advance to the empty slice starting at position nnext(islice(iterator, n, n), None)defnth(iterable,n,default=None):"Returns the nth item or a default value"returnnext(islice(iterable, n, None), default)defquantify(iterable,pred=bool):"Count how many times the predicate is true"returnsum(imap(pred, iterable))defpadnone(iterable):"""Returns the sequence elements and then returns None indefinitely. Useful for emulating the behavior of the built-in map() function. """returnchain(iterable, repeat(None))defncycles(iterable,n):"Returns the sequence elements n times"return chain.from_iterable(repeat(tuple(iterable), n))defdotproduct(vec1,vec2):returnsum(imap(operator.mul, vec1, vec2))defflatten(listOfLists):"Flatten one level of nesting"return chain.from_iterable(listOfLists)defrepeatfunc(func,times=None,*args):"""Repeat calls to func with specified arguments. Example: repeatfunc(random.random) """if times isNone:returnstarmap(func, repeat(args))returnstarmap(func, repeat(args, times))defpairwise(iterable):"s -> (s0,s1), (s1,s2), (s2, s3), ..." a, b =tee(iterable)next(b, None)returnizip(a, b)defgrouper(iterable,n,fillvalue=None):"Collect data into fixed-length chunks or blocks"# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx args = [iter(iterable)] * nreturnizip_longest(fillvalue=fillvalue, *args)defroundrobin(*iterables):"roundrobin('ABC', 'D', 'EF') --> A D E B F C"# Recipe credited to George Sakkis pending =len(iterables) nexts =cycle(iter(it).next for it in iterables)while pending:try:fornextin nexts:yieldnext()exceptStopIteration: pending -=1 nexts =cycle(islice(nexts, pending))defpowerset(iterable):"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" s =list(iterable)return chain.from_iterable(combinations(s, r) for r inrange(len(s)+1))defunique_everseen(iterable,key=None):"List unique elements, preserving order. Remember all elements ever seen."# unique_everseen('AAAABBBCCDAABBB') --> A B C D# unique_everseen('ABBCcAD', str.lower) --> A B C D seen =set() seen_add = seen.addif key isNone:for element inifilterfalse(seen.__contains__, iterable):seen_add(element)yield elementelse:for element in iterable: k =key(element)if k notin seen:seen_add(k)yield elementdefunique_justseen(iterable,key=None):"List unique elements, preserving order. Remember only the element just seen."# unique_justseen('AAAABBBCCDAABBB') --> A B C D A B# unique_justseen('ABBCcAD', str.lower) --> A B C A Dreturnimap(next, imap(itemgetter(1), groupby(iterable, key)))defiter_except(func,exception,first=None):""" Call a function repeatedly until an exception is raised. Converts a call-until-exception interface to an iterator interface. Like __builtin__.iter(func, sentinel) but uses an exception instead of a sentinel to end the loop. Examples: bsddbiter = iter_except(db.next, bsddb.error, db.first) heapiter = iter_except(functools.partial(heappop, h), IndexError) dictiter = iter_except(d.popitem, KeyError) dequeiter = iter_except(d.popleft, IndexError) queueiter = iter_except(q.get_nowait, Queue.Empty) setiter = iter_except(s.pop, KeyError) """try:if first isnotNone:yieldfirst()while1:yieldfunc()except exception:passdefrandom_product(*args,**kwds):"Random selection from itertools.product(*args, **kwds)" pools =map(tuple, args)* kwds.get('repeat', 1)returntuple(random.choice(pool) for pool in pools)defrandom_permutation(iterable,r=None):"Random selection from itertools.permutations(iterable, r)" pool =tuple(iterable) r =len(pool)if r isNoneelse rreturntuple(random.sample(pool, r))defrandom_combination(iterable,r):"Random selection from itertools.combinations(iterable, r)" pool =tuple(iterable) n =len(pool) indices =sorted(random.sample(xrange(n), r))returntuple(pool[i] for i in indices)defrandom_combination_with_replacement(iterable,r):"Random selection from itertools.combinations_with_replacement(iterable, r)" pool =tuple(iterable) n =len(pool) indices =sorted(random.randrange(n) for i inxrange(r))returntuple(pool[i] for i in indices)deftee_lookahead(t,i):"""Inspect the i-th upcomping value from a tee object while leaving the tee object at its current position. Raise an IndexError if the underlying iterator doesn't have enough values. """for value inislice(t.__copy__(), i, None):return valueraiseIndexError(i)
自定义扩展
# 将序列按大小切分,更好的性能from itertools import chain, islicedefchunks(iterable,size,format=iter): it =iter(iterable)whileTrue:yieldformat(chain((it.next(),), islice(it, size -1)))>>> l = ["a","b","c","d","e","f","g"]>>>for chunk inchunks(l, 3, tuple):print(chunk)("a","b","c")("d","e","f")("g",)
Python 新引入的 yield from 句法允许生成器或协程把工作委托给第三方完成,这样就无需嵌套 for 循环作为变通了
import random,timedefstupid_fib(n): index =0 a =0 b =1while index < n: sleep_cnt =yield bprint('let me think {0} secs'.format(sleep_cnt)) time.sleep(sleep_cnt) a, b = b, a + b index +=1print('-'*10+'test yield send'+'-'*10)N =5sfib =stupid_fib(N)fib_res =next(sfib)whileTrue:print(fib_res)try: fib_res = sfib.send(random.uniform(0, 0.5))exceptStopIteration:break----------test yield send----------1let me think 0.2438615286011866 secs1let me think 0.027476256830278822 secs2let me think 0.09717699872403579 secs3let me think 0.017161862262742633 secs5let me think 0.3313821890336833 secs
defmy_generator():for i inrange(5):if i==2:return'我被迫中断了'else:yield idefmain(generator):try:for i in generator:#不会显式触发异常,故而无法获取到return的值print(i)exceptStopIterationas exc:print(exc.value)g=my_generator()#调用main(g)'''运行结果为:01'''
defmy_generator():for i inrange(5):if i==2:return'我被迫中断了'else:yield idefmain(generator):try:print(next(generator))#每次迭代一个值,则会显式出发StopIterationprint(next(generator))print(next(generator))print(next(generator))print(next(generator))exceptStopIterationas exc:print(exc.value)#获取返回的值g=my_generator()main(g)'''运行结果为:01我被迫中断了'''
现在我们使用yield from来完成上面的同样的功能
defmy_generator():for i inrange(5):if i==2:return'我被迫中断了'else:yield idefwrap_my_generator(generator): #定义一个包装“生成器”的生成器,它的本质还是生成器 result =yield from generator #自动触发StopIteration异常,并且将return的返回值赋值给yield from表达式的结果print(result)defmain(generator):for j in generator:print(j)g =my_generator()wrap_g =wrap_my_generator(g)main(wrap_g)#调用'''运行结果为:01我被迫中断了'''
从上面的比较可以看出,yield from具有以下几个特点:
调用方—>生成器函数(协程函数)
调用方—>生成器包装函数—>生成器函数(协程函数)
return返回的值或者是StopIteration的value 属性的值变成 yield from 表达式的值,即上面的result
在上面的代码中, asyncio.sleep 中,创建了一个 Futrure 对象,作为更内层的协程对象,通过 yield from 交给了事件循环,而 Future 是一个实现了 __iter__ 对象的生成器。@coroutinedefsleep(delay,result=None,*, loop=None):"""Coroutine that completes after a given time (in seconds).""" future = futures.Future(loop=loop) h = future._loop.call_later(delay, future._set_result_unless_cancelled, result)try:return (yield from future)finally: h.cancel()classFuture:#blabla...def__iter__(self):ifnot self.done(): self._blocking =Trueyield self # This tells Task to wait for completion.assert self.done(),"yield from wasn't used with future"return self.result()# May raise too.# 当协程 yield from asyncio.sleep 时,事件循环其实是与 Future 对象建立了联系。程序运行结果如下:T-minus 2 (A)T-minus 5 (B)T-minus 1 (A)T-minus 4 (B)T-minus 3 (B)T-minus 2 (B)T-minus 1 (B)
async 和 await
hello world
import asyncioimport timeasyncdefmain():print(f"started at {time.strftime('%X')}")awaitsay_after(1, 'hello')awaitsay_after(2, 'world')print(f"finished at {time.strftime('%X')}")asyncio.run(main())# 预期输出started at 17:13:52helloworldfinished at 17:13:55asyncdefmain(): task1 = asyncio.create_task(say_after(1, 'hello')) task2 = asyncio.create_task(say_after(2, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take around 2 seconds.)await task1await task2print(f"finished at {time.strftime('%X')}")# 预期的输出显示代码段的运行时间比之前快了 1 秒started at 17:14:32helloworldfinished at 17:14:34
asyncio模块历史演进
asyncio是python3.4引入的库,翻译过来就是异步I/O
用await代替yield from,功能一模一样,程序调度
装饰器@asyncio.coroutine和关键字async
@asyncio.coroutinedeffunc1():yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务# 等价于asyncdeffunc1():yield from asyncio.sleep(2)# 遇到IO耗时操作,自动化切换到tasks中的其他任务
asyncdefmain():awaitfunction_that_returns_a_future_object()# this is also valid:await asyncio.gather(function_that_returns_a_future_object(),some_python_coroutine() )
result = [i asyncfor i inaiter()if i %2]result = [awaitfunc()for fun in funcs ifawaitcondition()]asyncdeftest(x,y):for i inrange(y):yield iawait asyncio.sleep(x)
#!/usr/bin/env Python# -- coding: utf-8 --"""@version: v1.0@author: huangyc@file: async_test.py@Description: @time: 2023/4/6 10:31"""import asyncio, randomimport threadingrandom.seed(5)asyncdefrnd_sleep(t):# sleep for T seconds on averageawait asyncio.sleep(t * random.random() *2)asyncdefproducer(queue): lst =list(range(10))for token in lst:# produce a token and send it to a consumerprint(f'produced {token}')await queue.put(token)awaitrnd_sleep(.1)asyncdefconsumer(queue):whileTrue: token =await queue.get()# process the token received from a producerawaitrnd_sleep(.1) queue.task_done()print(f'consumed {token}')asyncdefmain(): queue = asyncio.Queue()# fire up the both producers and consumers producers = [asyncio.create_task(producer(queue))for _ inrange(3)] consumers = [asyncio.create_task(consumer(queue))for _ inrange(10)]# with both producers and consumers running, wait for# the producers to finishawait asyncio.gather(*producers)print('---- done producing')# wait for the remaining tasks to be processedawait queue.join()# cancel the consumers, which are now idlefor c in consumers: c.cancel()if__name__=='__main__':print("hello")# 多线程+协程方式 t1 = threading.Thread(target=asyncio.run, args=(main(),)) t1.start() t1.join()# 协程调用# asyncio.run(main())print("end")