当前位置:   article > 正文

【python第三方库】python多线程编程---threading库

threading库

一、python多线程

前言:为什么有人说 Python 的多线程是鸡肋,不是真正意义上的多线程?

关于多线程,多进程的详细介绍 可阅读此篇blog

这里自己用白话简单概括一下(非准确):

  • 一个要执行的程序(应用)可以看作一个进程
  • 改程序里的一串代码指令(或一个函数)看作是进程里的一个线程。(main函数就是主线程)

1. GIL

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。

其他语言,CPU是多核时 是支持多个线程同时执行。但在Python中,无论是单核还是多核,同时只能由一个线程在执行。

其根源是GIL的存在。GIL的全称是Global Interpreter Lock(全局解释器锁),来源是Python设计之初的考虑,为了数据安全所做的决定。

某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个Python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。

而目前Python的解释器有多种,例如:

  • CPython: CPython是用C语言实现的Python解释器。 作为官方实现,它是最广泛使用的Python解释器。

  • PyPy: PyPy是用RPython实现的解释器。RPython是Python的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy旨在提高性能,同时保持最大兼容性(参考CPython的实现)。

  • Jython: Jython是一个将Python代码编译成Java字节码的实现,运行在JVM (Java Virtual Machine) 上。另外,它可以像是用Python模块一样,导入并使用任何Java类。

  • IronPython: IronPython是一个针对 .NET 框架的Python实现。它可以用Python和 .NET framework的库,也能将Python代码暴露给 .NET框架中的其他语言。

GIL只在CPython中才有,而在PyPy和Jython中是没有GIL的

每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。这就导致打印线程执行时长,会发现耗时更长的原因。

并且由于GIL锁存在,Python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。

二、threading库使用介绍

multiprocess模块(python多进程库)完全模仿了threading模块的接口,二者在使用层面,有很大的相似性。

1. 创建多线程

Python提供两个模块进行多线程的操作,分别是thread和threading,前者是比较低级的模块,用于更底层的操作,一般应用级别的开发不常用。

  • 方法1:直接使用threading.Thread()
import threading
 
# 这个函数名可随便定义
def run(n):
    print("current task:", n)
 
if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=("thread 1",))
    t2 = threading.Thread(target=run, args=("thread 2",))
    t1.start()
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主线程')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 方法2:继承threading.Thread来自定义线程类,重写run方法
import threading
 
class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n
 
    def run(self):
        print("current task:", n)
 
if __name__ == "__main__":
    t1 = MyThread("thread 1")
    t2 = MyThread("thread 2")
 
    t1.start()
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主线程')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2. 线程合并

join函数执行顺序是逐个执行每个线程,执行完毕后继续往下执行。主线程结束后,子线程还在运行,join函数使得主线程等到子线程结束时才退出

import threading
 
def count(n):
    while n > 0:
    	print(n)
        n -= 1
 
if __name__ == "__main__":
    t1 = threading.Thread(target=count, args=(10",))
    t2 = threading.Thread(target=count, args=(10,))
    t1.start()
    t2.start()
    # 将 t1 和 t2 加入到主线程中
    t1.join()
    t2.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3. 线程同步与互斥锁Lock

线程之间数据共享的。当多个线程对某一个共享数据进行操作时,就需要考虑到线程安全问题。threading模块中定义了Lock 类,提供了互斥锁的功能来保证多线程情况下数据的正确性。

用法的基本步骤:

#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([timeout])
#释放
mutex.release()

------模板:--------
R=threading.Lock()
R.acquire()
'''
对公共数据的操作
'''
R.release()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

解释:

一把锁有两个状态:locked 和 unlocked 状态。锁刚被创建的时候是处于 unlocked 状态。

  • lock.acquire() 会让锁从 unlocked -> locked。

  • lock.release() 会让锁从 locked -> unlocked。

如果锁已经处于 locked 状态,对它使用 acquire() 的线程会被阻塞,直到另一个线程调用了 release() 使该锁解锁。

release()方法只能在锁locked时调用,并释放锁。否则会抛出RuntimeError错误。

其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。具体用法见示例代码:

import threading
import time
 
num = 0
mutex = threading.Lock()
 
class MyThread(threading.Thread):
    def run(self):
        global num 
        time.sleep(1)
 
        if mutex.acquire(1):  
            num = num + 1
            msg = self.name + ': num value is ' + str(num)
            print(msg)
            mutex.release()
 
if __name__ == '__main__':
    for i in range(5):
        t = MyThread()
        t.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

另一个示例:

import threading
import time
from queue import Queue

def a():
    global A,lock
    lock.acquire()
    for i in range(10):
        A+=1
        print("a",A)
    lock.release()
    
def b():
    global A,lock
    lock.acquire()
    for i in range(10):
        A+=10
        print("b",A)
    lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    A=0
    t1=threading.Thread(target=a,)
    t2=threading.Thread(target=b,)
    t1.start()
    t2.start()
    
输出结果:
a 1
a 2
a 3
a 4
a 5
a 6
a 7
a 8
a 9
a 10
b 20
b 30
b 40
b 50
b 60
b 70
b 80
b 90
b 100
b 110
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

4. 死锁与可重入锁(递归锁)RLock

进程也有死锁与可重入锁,使用方法都是一样的,所以放到这里一起说:

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁。
示例1.

from threading import Lock as Lock
import time

mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

示例2. 科学家吃面

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条'%name)
    fork_lock.acquire()
    print('%s 抢到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

解决方法:使用可重入锁

为了满足在同一线程中多次请求同一资源的需求,Python提供了可重入锁(RLock)。

RLock内部维护着一个Lock和一个counter变量,counter记录了acquire
的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

具体用法如下:

#创建 RLock
mutex = threading.RLock()
 
class MyThread(threading.Thread):
    def run(self):
        if mutex.acquire(1):
            print("thread " + self.name + " get mutex")
            time.sleep(1)
            mutex.acquire()
            mutex.release()
            mutex.release()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

对应上面两个死锁问题,可以按照如下写法解决:
示例1.

from threading import RLock
import time
mutexA=RLock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

示例2. 科学家吃面

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条'%name)
    fork_lock.acquire()
    print('%s 抢到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

5. 守护线程

无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。 需要强调的是:运行完毕并非终止运行

  • #1.对主进程来说,运行完毕指的是主进程代码运行完毕
  • #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

如果希望主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。我们可以使用setDaemon(bool)函数,它跟join函数是相反的。

它的作用是设置子线程是否随主线程一起结束,必须在start() 之前调用,默认为False。

看下示例

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
    '''
    主线程
    True
    '''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

6. 定时器

如果需要规定函数在多少秒后执行某个操作,需要用到Timer类。具体用法如下:

from threading import Timer
 
def show():
    print("Pyhton")
 
# 指定一秒钟之后执行 show 函数
t = Timer(1, hello)
t.start()  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7. Thread类的其他方法

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主线程/主进程
    Thread-1
    '''
 代码示例
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

三、常见问题

1. 我们有了GIL锁为什么还要自己在代码中加锁呢?

GIL保护的是Python解释器级别的数据资源,自己代码中的数据资源就需要自己加锁防止竞争。如下图:
在这里插入图片描述

2. python因为CPython解释器中GIL的原因,多线程还能用吗?

有了GIL的存在,同一时刻同一进程中只有一个线程被执行,进程可以利用多核,但是开销大,而Python的多线程开销小,但却无法利用多核优势,也就是说Python这语言难堪大用。

其实编程所解决的现实问题大致分为IO密集型和计算密集型。

对于IO密集型的场景,Python的多线程编程完全OK
而对于计算密集型的场景,Python中有很多成熟的模块或框架如Pandas等能够提高计算效率。

3. 多线程中加锁和使用join的区别

#不加锁:并发执行,速度快,数据不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加锁的代码并发运行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加锁的代码串行运行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊

#没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是

#start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
#单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗时是多么的恐怖
'''
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

参考链接:
https://blog.csdn.net/zong596568821xp/article/details/99678390
https://www.cnblogs.com/haitaoli/articles/10302508.html

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/768977
推荐阅读
相关标签
  

闽ICP备14008679号