41线程3_RLock_Condition_Barrier-创新互联
目录
网站的建设创新互联建站专注网站定制,经验丰富,不做模板,主营网站定制开发.小程序定制开发,H5页面制作!给你焕然一新的设计体验!已为成都围栏护栏等企业提供专业服务。threading.RLock类:...1
threading.Condition类:...2
threading.Barrier类:...4
threading.RLock类:
可重入锁,是线程相关的锁;
线程A获得可重复锁,并可多次成功获取,不会阻塞,最后要在线程A中做和acquire次数相同的release(拿多少次锁,还多少回来);
注,线程相关:
threading.local类;
例:
lock = threading.RLock()
ret = lock.acquire()
print(ret)
ret = lock.acquire(timeout=5)
print(ret)
ret = lock.acquire(False)
print(ret)
ret = lock.acquire(False) #全能拿到锁
print(ret)
lock.release()
lock.release()
lock.release()
lock.release()
# lock.release() #前面没有对应的acquire,抛RuntimeError: cannot release un-acquired lock
def sub(lock:threading.RLock):
lock.release() #主线程中加的,不能在子线程中释放,理解线程级别
threading.Thread(target=sub, args=(lock,)).start()
输出:
True
True
True
True
Exception in thread Thread-1:
Traceback (most recent call last):
File "D:\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner
self.run()
File "D:\Python\Python35\lib\threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "E:/git_practice/cmdb/example_threading2.py", line 249, in sub
lock.release()
RuntimeError: cannot release un-acquired lock
threading.Condition类:
Condition(lock=None),构造方法,可传入一个lock或RLock对象,默认是RLock;
cond = threading.Condition()
cond.acquire(*args),获取锁;
cond.release()
cond.wait(timeout=None),等待或超时;
cond.notify(n=1),唤醒至多指定数目个数的等待线程,默认1个,没有等待的线程就没有任何操作,源码中waiter;
cond.notify_all(),唤醒所有等待的线程,源码中waiters;
总结:
Condition用于生产者-消费者模型,解决生产者-消费者速度匹配问题;
采用了通知机制,非常有效率;
使用Condition,必须先acquire,用完要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文;
消费者wait等待通知,生产者生产好消息,对消费者发通知,可使用notify或notify_all;
可把Condition理解为一把高级的锁,它提供了Lock、RLock更高级的功能,允许我们能够控制复杂的线程同步问题;
threading.Condition内部维护了一个锁对象(默认是RLock),可在创建Condition对象时把锁对象作为参数传入;
threading.Condition也提供了acquire和release方法,含义与锁的一致,其实它只是简单调用内部锁对象的对应的方法而已;
threading.Condition还提供了wait、notify、notify_all方法:
wait([timeout]),释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供timeout),当线程被唤醒并重新占用锁时,程序才会继续执行下去;
notify(),唤醒一个挂起的线程(如果存在挂起的线程),notify()不会释放所占用的锁;
notify_all(),唤醒所有挂起的线程(如果存在挂起的线程),不会释放所占用的锁;
Lock与RLock:
RLock允许在同一线程中被多次acquire,而Lock不允许这种情况;
如果使用RLock,那么 acquire和release必须成对出现,即调用了n次acquire,必须调用n次release才能真正释放所占用的锁;
例:
class Dispatcher:
def __init__(self):
self.data = 0
self.event = threading.Event()
def produce(self):
for i in range(100):
data = random.randint(1,100)
self.data = data
self.event.wait(1)
def custom(self):
while True: #消费者浪费了大量cpu时间,主动来查看有没有数据
logging.info(self.data) #有重复消费问题
self.event.wait(1) #隔1秒生成1个
d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c.start() #消费者先启动
p.start()
输出:
……
2018-08-06-15:54:25 Thread info: 13052 Thread-1 13
2018-08-06-15:54:25 Thread info: 12052 Thread-2 13
2018-08-06-15:54:26 Thread info: 12052 Thread-2 13
……
例:
class Dispatcher:
def __init__(self):
self.data = 0
self.event = threading.Event()
self.cond = threading.Condition()
def produce(self):
for i in range(100):
data = random.randint(1,100)
# logging.info(data)
with self.cond:
self.data = data
self.cond.notify(2) #通知机制,有数据,通知消费者来消费;交给2个人做,一般是1(生产者)对多(消费者)
self.cond.notify_all() #通知所有消费者,1对多
self.event.wait(1)
def custom(self):
# while True:
while not self.event.is_set():
# logging.info(self.data)
with self.cond: #消费者被迫匹配生产者
self.cond.wait()
logging.info(self.data)
# self.event.wait(1)
d = Dispatcher()
p = threading.Thread(target=d.produce)
# c = threading.Thread(target=d.custom)
# c1 = threading.Thread(target=d.custom) #开启2个消费线程
# c.start()
# c1.start()
for i in range(5): #开启5个消费线程;如果produce中self.conf.notify(2),生产者通知2个线程处理,5个消费者中谁抢在前谁处理
threading.Thread(target=d.custom, name='c-{}'.format(i)).start()
p.start() #如果生产者先启动,已经生成的数据不会被消费者消费,除非在队列中
注:
以上有线程安全问题,解决:中间加MQ;
上例不是线程安全的,程序逻辑有很多瑕疵,但可很好的理解Condition的使用和生产者消费者模型;
一对多,其实就是广播模式;
threading.Barrier类:
屏障、栅栏,可以想象成路障、道闸,3.2引入;
Barrier(paties,action=None,timeout=None),构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值;
n_waiting,当前在屏障中等待的线程数;
paties,参与方数目,需要多少个等待;
wait(timeout=None),等待通过屏障,返回0到线程数count-1的整数,count为等待的线程总数,每个线程返回不同;如果wait方法设置了超时,并超时发送,屏障将处于broken状态;wait方法超时发生,屏障处于broken状态,直至reset;
broken,如果屏障处于打破的状态,返回True;
abort(),将屏障置于broken状态,等待中的线程或调用等待方法的线程中都会抛BrokenBarrierError异常,直至reset方法来恢复屏障;
reset(),恢复屏障,重新开始拦截;
应用场景:
1、并发初始化;如,centos7中systemd,能并行启动就并行;
所有线程都必须初始化完成后,才能继续工作,如运行前加载数据、检查,如果这些工作没完成就开始运行,将不能正常工作;
10个线程做10种工作准备,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程;
如,启动一个程序,先加载磁盘文件、缓存预热、初始化连接池等,这些工作齐头并进,不过只有等满足了,程序才能继续后向执行,假设数据库连接失败,则初始化工作失效,就要abort,屏障broken,所有线程收到异常退出;
2、工作量,有10个计算任务,完成6个就算工作完成,如求样本数、求平均数;
例:
def worker(barrier:threading.Barrier):
logging.info('n_waiting={}'.format(barrier.n_waiting))
try:
bid = barrier.wait()
logging.info('after barrier {}'.format(bid))
except threading.BrokenBarrierError:
logging.info('broken barrier is {}'.format(threading.current_thread()))
barrier = threading.Barrier(3) #3个一拨3个一拨
for _ in range(3): #依次3,4,5,6
threading.Thread(target=worker,args=(barrier,)).start()
输出:
2018-08-07-08:27:53 Thread info: 11496 Thread-1 n_waiting=0
2018-08-07-08:27:53 Thread info: 12540 Thread-2 n_waiting=1
2018-08-07-08:27:53 Thread info: 4612 Thread-3 n_waiting=2
2018-08-07-08:27:53 Thread info: 4612 Thread-3 after barrier 2
2018-08-07-08:27:53 Thread info: 11496 Thread-1 after barrier 0
2018-08-07-08:27:53 Thread info: 12540 Thread-2 after barrier 1
例:
for i in range(6):
if i == 2: #屏障中等待2个,屏障被broken,wait的线程抛异常,新wait的线程也抛异常,直至屏障恢复,才继续按达到参与方的数目继续拦截
barrier.abort()
elif i == 3:
barrier.reset()
threading.Event().wait(1)
threading.Thread(target=worker,args=(barrier,)).start()
输出:
2018-08-07-09:21:49 Thread info: 12668 Thread-1 n_waiting=0
2018-08-07-09:21:50 Thread info: 12424 Thread-2 n_waiting=1
2018-08-07-09:21:50 Thread info: 12424 Thread-2 broken barrier is
2018-08-07-09:21:50 Thread info: 12668 Thread-1 broken barrier is
2018-08-07-09:21:51 Thread info: 11468 Thread-3 n_waiting=0
2018-08-07-09:21:51 Thread info: 11468 Thread-3 broken barrier is
2018-08-07-09:21:52 Thread info: 9788 Thread-4 n_waiting=0
2018-08-07-09:21:53 Thread info: 12680 Thread-5 n_waiting=1
2018-08-07-09:21:54 Thread info: 10948 Thread-6 n_waiting=2
2018-08-07-09:21:54 Thread info: 10948 Thread-6 after barrier 2
2018-08-07-09:21:54 Thread info: 9788 Thread-4 after barrier 0
2018-08-07-09:21:54 Thread info: 12680 Thread-5 after barrier 1
例:
wait方法超时发生,屏障处于broken状态,直至reset;
def worker(barrier:threading.Barrier, i:int):
logging.info('waiting for {} threads'.format(barrier.n_waiting))
try:
logging.info(barrier.broken)
if i < 3:
barrier_id = barrier.wait(1)
else:
if i == 6:
barrier.reset()
barrier_id = barrier.wait()
logging.info('after barrier {}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('broken barrier. run.')
barrier = threading.Barrier(3)
for x in range(9):
threading.Event().wait(2)
threading.Thread(target=worker, args=(barrier,x), name='worker-{}'.format(x)).start()
输出:
2018-08-07-09:33:24 Thread info: 10556 worker-0 waiting for 0 threads
2018-08-07-09:33:24 Thread info: 10556 worker-0 False
2018-08-07-09:33:25 Thread info: 10556 worker-0 broken barrier. run.
2018-08-07-09:33:26 Thread info: 12752 worker-1 waiting for 0 threads
2018-08-07-09:33:26 Thread info: 12752 worker-1 True
2018-08-07-09:33:26 Thread info: 12752 worker-1 broken barrier. run.
2018-08-07-09:33:28 Thread info: 5324 worker-2 waiting for 0 threads
2018-08-07-09:33:28 Thread info: 5324 worker-2 True
2018-08-07-09:33:28 Thread info: 5324 worker-2 broken barrier. run.
2018-08-07-09:33:30 Thread info: 6716 worker-3 waiting for 0 threads
2018-08-07-09:33:30 Thread info: 6716 worker-3 True
2018-08-07-09:33:30 Thread info: 6716 worker-3 broken barrier. run.
2018-08-07-09:33:32 Thread info: 9180 worker-4 waiting for 0 threads
2018-08-07-09:33:32 Thread info: 9180 worker-4 True
2018-08-07-09:33:32 Thread info: 9180 worker-4 broken barrier. run.
2018-08-07-09:33:34 Thread info: 6788 worker-5 waiting for 0 threads
2018-08-07-09:33:34 Thread info: 6788 worker-5 True
2018-08-07-09:33:34 Thread info: 6788 worker-5 broken barrier. run.
2018-08-07-09:33:36 Thread info: 12044 worker-6 waiting for 0 threads
2018-08-07-09:33:36 Thread info: 12044 worker-6 True
2018-08-07-09:33:38 Thread info: 5020 worker-7 waiting for 1 threads
2018-08-07-09:33:38 Thread info: 5020 worker-7 False
2018-08-07-09:33:40 Thread info: 13052 worker-8 waiting for 2 threads
2018-08-07-09:33:40 Thread info: 13052 worker-8 False
2018-08-07-09:33:40 Thread info: 13052 worker-8 after barrier 2
2018-08-07-09:33:40 Thread info: 5020 worker-7 after barrier 1
2018-08-07-09:33:40 Thread info: 12044 worker-6 after barrier 0
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
标题名称:41线程3_RLock_Condition_Barrier-创新互联
本文URL:http://scyanting.com/article/dgiedj.html