路漫漫其修远兮
吾将上下而求索

python:多线程并发和锁

单线程执行
python的内置模块提供了两个内置模块:thread和threading,thread是源生模块,threading是扩展模块,在thread的基础上进行了封装及改进。
所以只需要使用threading这个模块就能完成并发的测试

实例

创建并启动一个单线程

import threading


def myTestFunc():
    print("我是一个函数")

t = threading.Thread(target=myTestFunc)  # 创建一个线程
t.start()  # 启动线程
执行结果

C:\Python36\python.exe D:/MyThreading/myThread.py
我是一个线程函数

Process finished with exit code 0
其实单线程的执行结果和单独执行某一个或者某一组函数结果是一样的,区别只在于用线程的方式执行函数,而线程是可以同时执行多个的,函数是不可以同时执行的。

多线程执行
上面介绍了单线程如何使用,多线程只需要通过循环创建多个线程,并循环启动线程执行就可以了

实例

import threading
from datetime import datetime


def thread_func():  # 线程函数
    print('我是一个线程函数', datetime.now())


def many_thread():
    threads = []
    for _ in range(10):  # 循环创建10个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
    for t in threads:  # 循环启动10个线程
        t.start()


if __name__ == '__main__':
    many_thread()
执行结果

C:\Python36\python.exe D:/MyThreading/manythread.py
我是一个线程函数 2019-06-23 16:54:58.205146
我是一个线程函数 2019-06-23 16:54:58.205146
我是一个线程函数 2019-06-23 16:54:58.206159
我是一个线程函数 2019-06-23 16:54:58.206159
我是一个线程函数 2019-06-23 16:54:58.206159
我是一个线程函数 2019-06-23 16:54:58.207139
我是一个线程函数 2019-06-23 16:54:58.207139
我是一个线程函数 2019-06-23 16:54:58.207139
我是一个线程函数 2019-06-23 16:54:58.208150
我是一个线程函数 2019-06-23 16:54:58.208150

Process finished with exit code 0
通过循环创建10个线程,并且执行了10次线程函数,但需要注意的是python的并发并非绝对意义上的同时处理,因为启动线程是通过循环启动的,
还是有先后顺序的,通过执行结果的时间可以看出还是有细微的差异,但可以忽略不记。当然如果线程过多就会扩大这种差异。我们启动500个线程看下程序执行时间

实例

import threading
from datetime import datetime


def thread_func():  # 线程函数
    print('我是一个线程函数', datetime.now())


def many_thread():
    threads = []
    for _ in range(500):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
    for t in threads:  # 循环启动500个线程
        t.start()


if __name__ == '__main__':
    start = datetime.today().now()
    many_thread()
    duration = datetime.today().now() - start
    print(duration)
执行结果

0:00:00.111657

Process finished with exit code 0
500个线程共执行了大约0.11秒

那么针对这种问题我们该如何优化呢?我们可以创建25个线程,每个线程执行20次线程函数,这样在启动下一个线程的时候,上一个线程已经在循环执行了,这样就
大大减少了并发的时间差异

优化

import threading
from datetime import datetime


def thread_func():  # 线程函数
    print('我是一个线程函数', datetime.now())


def execute_func():
    for _ in range(20):
        thread_func()


def many_thread():
    start = datetime.now()
    threads = []
    for _ in range(25):  # 循环创建500个线程
        t = threading.Thread(target=execute_func)
        threads.append(t)
    for t in threads:  # 循环启动500个线程
        t.start()
    duration = datetime.now() - start
    print(duration)

if __name__ == '__main__':
    many_thread()
输出结果(仅看程序执行间隔)

0:00:00.014959

Process finished with exit code 0
后面的优化执行500次并发一共花了0.014秒。比未优化前的500个并发快了几倍,如果线程函数的执行时间比较长的话,那么这个差异会更加显著,
所以大量的并发测试建议使用后者,后者比较接近同时“并发”

守护线程
多线程还有一个重要概念就是守护线程。那么在这之前我们需要知道主线程和子线程的区别,之前创建的线程其实都是main()线程的子线程,即先启动主线程main(),
然后执行线程函数子线程。

那么什么是守护线程?即当主线程执行完毕之后,所有的子线程也被关闭(无论子线程是否执行完成)。默认不设置的情况下是没有守护线程的,
主线程执行完毕后,会等待子线程全部执行完毕,才会关闭结束程序。

但是这样会有一个弊端,当子线程死循环了或者一直处于等待之中,则程序将不会被关闭,被被无限挂起,我们把上述的线程函数改成循环10次, 并睡眠2秒,这样效果会更明显

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
   time.sleep(2)
    i = 0
    while(i < 11):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(10):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
    for t in threads:  # 循环启动500个线程
        t.start()


if __name__ == '__main__':
    many_thread()
    print("thread end")
执行结果

C:\Python36\python.exe D:/MyThreading/manythread.py
thread end
2019-06-23 19:08:00.468612
2019-06-23 19:08:00.468612

2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546
2019-06-23 19:08:00.477546

Process finished with exit code 0
根据上述结果可以看到主线程打印了“thread end”之后(主线程结束),子线程还在继续执行,并未随着主线程的结束而结束

下面我们通过 setDaemon方法给子线程添加守护线程,我们把循环改为死循环,再来看看输出结果(注意守护线程要加在start之前)

import threading
from datetime import datetime
def thread_func():  # 线程函数
    i = 0
    while(1):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(10):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
        t.setDaemon(True)  # 给每个子线程添加守护线程
    for t in threads:  # 循环启动500个线程
        t.start()


if __name__ == '__main__':
    many_thread()
    print("thread end")
输出结果

2019-06-23 19:12:35.564539
2019-06-23 19:12:35.564539
2019-06-23 19:12:35.564539
2019-06-23 19:12:35.564539
2019-06-23 19:12:35.564539
2019-06-23 19:12:35.564539
2019-06-23 19:12:35.565529
2019-06-23 19:12:35.565529
2019-06-23 19:12:35.565529
thread end

Process finished with exit code 0
通过结果我们可以发现,主线程关闭之后子线程也会随着关闭,并没有无限的循环下去,这就像程序执行到一半强制关闭执行一样,看似暴力却很有用,
如果子线程发送一个请求未收到请求结果,那不可能永远等下去,这时候就需要强制关闭。所以守护线程解决了主线程和子线程关闭的问题。

阻塞线程
上面说了守护线程的作用,那么有没有别的方法来解决上述问题呢? 其实是有的,那就是阻塞线程,这种方式更加合理,使用join()方法阻塞线程,让主
线程等待子线程执行完成之后再往下执行,再关闭所有子线程,而不是只要主线程结束,不管子线程是否执行完成都终止子线程执行。下面我们给子线程添
加上join()(主要join要加到start之后)

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
    time.sleep(1)
    i = 0
    while(i < 11):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(10):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
        t.setDaemon(True)  # 给每个子线程添加守护线程
    for t in threads:  # 循环启动500个线程
        t.start()
    for t in threads:
        t.join()  # 阻塞线程

if __name__ == '__main__':
    many_thread()
    print("thread end")
执行结果

程序会一直执行,但是不会打印“thread end”语句,因为子线程并未结束,那么主线程就会一直等待。

疑问:有人会觉得这和什么都不设置是一样的,其实会有一点区别的,从守护线程和线程阻塞的定义就可以看出来,如果什么都没设置,那么主线程会先执
行完毕打印后面的“thread end”,而等待子线程执行完毕。两个都设置了,那么主线程会等待子线程执行结束再继续执行。

而对于死循环或者一直等待的情况,我们可以给join设置超时等待,我们设置join的参数为2,那么子线程会告诉主线程让其等待2秒,如果2秒内子线程执行
结束主线程就继续往下执行,如果2秒内子线程未结束,主线程也会继续往下执行,执行完成后关闭子线程

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
    time.sleep(1)
    i = 0
    while(1):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(10):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
        t.setDaemon(True)  # 给每个子线程添加守护线程
    for t in threads:  # 循环启动500个线程
        t.start()
    for t in threads:
        t.join(2)  # 设置子线程超时2秒

if __name__ == '__main__':
    many_thread()
    print("thread end")
 

输出结果

你运行程序后会发现,运行了大概2秒的时候,程序会数据“thread end” 然后结束程序执行, 这就是阻塞线程的意义,控制子线程和主线程的执行顺序

总结
最好呢,再次说一下守护线程和阻塞线程的定义

守护线程:子线程会随着主线程的结束而结束,无论子线程是否执行完毕

阻塞线程:主线程会等待子线程的执行结束,才继续执行

线程锁

Lock
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,
所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于
多个线程同时改一个变量,把内容给改乱了。

两个线程同时一存一取,就可能导致余额不对,你肯定不希望你的银行存款莫名其妙地变成了负数,所以,我们必须确保
一个线程在修改balance的时候,别的线程一定不能改。

如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,我们说,该线程因
为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,
无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()
            
当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段
代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有
的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

GIL锁

多核CPU
如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。
如果写一个死循环的话,会出现什么情况呢?
打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。
我们可以监控到一个死循环线程会100%占用一个CPU。
如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。
要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。
试试用Python写个死循环:

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
    
启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,
必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有
线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

参考:https://www.cnblogs.com/linuxchao/p/linuxchao-thread.html

未经允许不得转载:江哥架构师笔记 » python:多线程并发和锁

分享到:更多 ()

评论 抢沙发

评论前必须登录!