Python使用thread模块实现多线程-LMLPHP

介绍:

        线程(Threads)是操作系统提供的一种轻量级的执行单元,可以在一个进程内并发执行多个任务。每个线程都有自己的执行上下文,包括栈、寄存器和程序计数器。

        在Python中,可以使用threading模块创建和管理线程。线程可以同时执行多个任务,可以在一个线程中执行耗时操作,而不会阻塞其他线程的执行。线程之间共享进程的资源,如内存空间,因此需要注意线程安全的问题。

        然而,Python的线程在特定情况下可能会受到全局解释器锁(Global Interpreter Lock,GIL)的限制。GIL是一种机制,它确保同一时刻只有一个线程可以执行Python字节码。这意味着在多线程场景下,即使有多个线程,也无法真正实现并行执行。因此,在CPU密集型的任务中,Python的线程并不能充分利用多核处理器的能力。

1、导入threading模块

在使用Python线程之前,首先需要导入 threading 模块。可以使用以下语句导入该模块:

import threading

2、创建线程

        使用 threading.Thread 类创建线程对象。可以通过传递一个可调用的目标函数和其他参数来实例化线程对象。目标函数是线程实际执行的任务。

# 定义一个目标函数作为线程的执行任务
def my_task(arg1, arg2):
    # 执行任务的代码

# 创建线程对象
my_thread = threading.Thread(target=my_task, args=(arg1, arg2))

3、启动线程

        通过调用线程对象的 start() 方法来启动线程。启动线程后,它将在后台运行,执行目标函数中的代码。

my_thread.start()

4、等待线程完成

        可以使用 join() 方法等待线程执行完毕。调用 join() 方法会阻塞当前线程,直到目标线程执行完成。

my_thread.join()

5、线程同步

        在多线程编程中,线程之间的同步是一项重要的任务,旨在确保线程按照预期的顺序执行,并避免竞态条件和数据不一致的问题。Python提供了几种同步原语,常用的包括锁(Lock)、信号量(Semaphore)、事件(Event)和条件变量(Condition)。下面详细介绍这些同步原语的特点和使用方法:

锁(Lock)

        锁是一种最基本的同步原语,在Python中由 threading.Lock 类实现。它提供了两个主要方法:acquire() 和 release()。一个线程可以通过调用 acquire() 来获取锁,如果锁当前没有被其他线程持有,则该线程将获得锁并继续执行,否则将被阻塞直到锁被释放。当线程完成对临界区的访问后,应该调用 release() 来释放锁,以便其他线程可以获取它。

import threading

# 创建锁对象
lock = threading.Lock()

# 线程函数
def thread_function():
    lock.acquire()
    # 临界区代码
    lock.release()

锁还支持上下文管理器的使用方式,可以使用 with 语句来自动获取和释放锁:

import threading

# 创建锁对象
lock = threading.Lock()

# 线程函数
def thread_function():
    with lock:
        # 临界区代码

信号量(Semaphore)

        信号量是一种更高级的同步原语,用于控制对共享资源的并发访问。Python中的信号量由 threading.Semaphore 类实现。信号量维护一个内部计数器,线程可以通过调用 acquire() 来减少计数器的值,如果计数器为零,则线程将被阻塞。线程在完成对共享资源的访问后,应该调用 release() 来增加计数器的值,以便其他线程可以获取信号量。

import threading

# 创建信号量对象
semaphore = threading.Semaphore(value=3)  # 设置初始计数器值为3

# 线程函数
def thread_function():
    semaphore.acquire()
    # 访问共享资源
    semaphore.release()

信号量的计数器可以控制同时访问共享资源的线程数量。

事件(Event)

        事件是一种用于线程间通信的同步原语,由 threading.Event 类实现。事件有两种状态:已设置和未设置。线程可以通过调用 set() 来设置事件,将其状态设置为已设置;通过调用 clear() 可以将事件状态设置为未设置。线程可以通过调用 wait() 来等待事件的设置,如果事件已设置,则线程可以继续执行,否则将被阻塞。

import threading

# 创建事件对象
event = threading.Event()

# 线程函数
def thread_function():
    event.wait()  # 等待事件设置
    # 执行操作

# 主线程设置事件
event.set()

事件还可以使用 is_set() 方法来检查事件的状态。

条件变量(Condition)

        条件变量是一种复杂的同步原语,由 threading.Condition 类实现。它提供了一个条件队列,允许线程等待某个条件的发生。条件变量结合锁一起使用,可以实现更复杂的线程间同步。

import threading

# 创建条件变量对象
condition = threading.Condition()

# 线程函数 A
def thread_function_a():
    with condition:
        while not condition_predicate():
            condition.wait()
        # 执行操作

# 线程函数 B
def thread_function_b():
    with condition:
        # 修改条件
        condition.notify()  # 通知等待的线程

# 主线程
with condition:
    # 修改条件
    condition.notify()  # 通知等待的线程

        在线程函数 A 中,线程会等待条件谓词成立的情况下才继续执行,否则会调用 wait() 方法将线程挂起。线程函数 B 可以在某个条件发生变化时调用 notify() 方法来通知等待的线程。

6、共享数据

        共享数据是指多个线程同时访问和修改的数据。当多个线程同时读写共享数据时,可能会发生竞态条件(Race Condition)和数据损坏的问题。为了确保线程安全性,需要采取适当的同步措施来保护共享数据。以下是一些常用的同步机制和技术:

锁(Lock)

        锁是一种最常见的同步原语,用于保护共享数据的互斥访问。在多线程环境中,一个线程可以通过获取锁来独占地访问共享数据,其他线程必须等待锁释放后才能访问。锁可以使用 threading.Lock 类来实现,通过调用 acquire() 和 release() 方法来获取和释放锁。

import threading

# 创建锁对象
lock = threading.Lock()

# 共享数据
shared_data = 0

# 线程函数
def thread_function():
    global shared_data
    lock.acquire()
    # 访问和修改共享数据
    shared_data += 1
    lock.release()

在访问共享数据之前获取锁,确保同一时间只有一个线程可以修改数据,从而避免竞态条件。

信号量(Semaphore)

        信号量也可以用于保护共享数据的访问,在多线程环境中控制并发访问的数量。信号量维护一个内部计数器,线程在访问共享数据之前通过获取信号量来减少计数器的值,如果计数器为零,则线程将被阻塞。线程在完成对共享数据的访问后,通过释放信号量来增加计数器的值,从而允许其他线程继续访问。

import threading

# 创建信号量对象
semaphore = threading.Semaphore()

# 共享数据
shared_data = 0

# 线程函数
def thread_function():
    global shared_data
    semaphore.acquire()
    # 访问和修改共享数据
    shared_data += 1
    semaphore.release()

通过适当设置信号量的初始值,可以控制同时访问共享数据的线程数量。

其他同步原语

        Python还提供了其他一些同步原语,如条件变量(Condition)和事件(Event)。它们可以用于更复杂的同步需求,如线程之间的通信和等待特定条件的发生。

import threading

# 创建条件变量对象
condition = threading.Condition()

# 共享数据
shared_data = []

# 线程函数 A
def thread_function_a():
    with condition:
        while not condition_predicate():
            condition.wait()
        # 访问和修改共享数据

# 线程函数 B
def thread_function_b():
    with condition:
        # 修改条件
        condition.notify()  # 通知等待的线程

        在上述示例中,线程函数 A等待条件谓词成立的情况下才能访问共享数据,线程函数 B在条件发生变化时通过 notify() 方法通知等待的线程。

7、线程状态

        线程状态是指线程在不同的时间点上所处的状态,它反映了线程的执行情况和可用性。在多线程编程中,线程可以处于以下几种不同的状态:

新建(New)状态

        当创建线程对象但尚未启动线程时,线程处于新建状态。此时线程对象已经被创建,但尚未分配系统资源和执行代码。可以通过实例化线程类或者从线程池中获取线程来创建新线程。

import threading

# 创建新线程对象
thread = threading.Thread(target=thread_function)

就绪(Runnable)状态

        当线程准备好执行,但由于系统调度的原因还未开始执行时,线程处于就绪状态。线程已经分配了系统资源,并且等待调度器将其放入运行队列中。多个就绪状态的线程可能会竞争CPU资源,调度器会根据调度算法决定哪个线程被选中执行。

运行(Running)状态

        当线程获得CPU资源并开始执行线程函数时,线程处于运行状态。此时线程的代码正在被执行,它可能会与其他线程并发执行或通过时间片轮转进行切换。只有一个线程可以处于运行状态。

阻塞(Blocked)状态

        当线程被暂停执行,等待某个条件的发生时,线程处于阻塞状态。在阻塞状态下,线程不会占用CPU资源,直到满足特定条件后才能继续执行。常见的阻塞原因包括等待I/O操作、获取锁失败、等待其他线程的通知等。

终止(Terminated)状态

        当线程完成了它的执行任务或被显式终止时,线程处于终止状态。线程函数执行完毕或者出现异常时,线程将自动终止。也可以通过调用线程对象的 join() 方法来等待线程执行完毕。

# 等待线程执行完毕
thread.join()

        线程状态之间可以相互转换,线程的状态转换通常由操作系统的调度器和线程的执行情况决定。例如,当线程处于就绪状态并获得CPU资源时,它将进入运行状态;当线程在执行期间发生阻塞,它将进入阻塞状态;当线程执行完毕或被终止时,它将进入终止状态。

8、线程属性和方法

   threading.Thread 类提供了一些属性和方法来管理和操作线程。其中一些常用的属性和方法包括:

  • name:获取或设置线程的名称。
  • ident:获取线程的标识符。
  • is_alive():检查线程是否处于活动状态。
  • setDaemon(daemonic):将线程设置为守护线程,当主线程退出时,守护线程也会被终止。
  • start():启动线程。
  • join(timeout):等待线程执行完成,可选地设置超时时间。
  • run():线程的执行入口点,在线程启动时被调用。
  • sleep(secs):线程休眠指定的秒数。

9、线程间通信

        线程间通信是指在多线程编程中,多个线程之间进行数据传递和共享的过程。线程间通信的目的是实现线程之间的协作和数据交换,以完成复杂的任务。在Python中,可以使用 queue 模块提供的队列来实现线程安全的数据传递。

queue 模块提供了几种队列类型,常用的有以下三种:

Queue(先进先出队列)

   Queue 是最常用的线程安全队列,它使用先进先出(FIFO)的方式存储和获取数据。多个线程可以安全地将数据放入队列中,并从队列中获取数据。Queue 类提供了以下常用方法:

  • put(item[, block[, timeout]]):将数据放入队列,可指定是否阻塞和超时时间。
  • get([block[, timeout]]):从队列中获取数据,可指定是否阻塞和超时时间。
  • empty():判断队列是否为空。
  • full():判断队列是否已满。
  • qsize():返回队列中的元素数量。
import queue

# 创建队列对象
q = queue.Queue()

# 线程函数 A
def thread_function_a():
    while True:
        item = q.get()
        # 处理数据

# 线程函数 B
def thread_function_b():
    while True:
        # 产生数据
        q.put(item)

        在上述示例中,线程函数 A从队列中获取数据并进行处理,线程函数 B产生数据并放入队列中,两个线程通过队列进行数据交换。

LifoQueue(后进先出队列)

        LifoQueue 是一种后进先出(LIFO)的队列类型,与 Queue 不同的是,它的获取顺序与放入顺序相反。其他方法与 Queue 类相同。

import queue

# 创建后进先出队列对象
q = queue.LifoQueue()

后进先出队列适用于某些特定的场景,例如需要按照相反的顺序处理数据。

PriorityQueue(优先级队列)

    PriorityQueue 是一种根据优先级排序的队列类型,可以为队列中的每个元素指定一个优先级。优先级高的元素先被获取。元素的优先级可以是数字、元组或自定义对象。其他方法与 Queue 类相同。

import queue

# 创建优先级队列对象
q = queue.PriorityQueue()

优先级队列适用于需要根据优先级顺序处理数据的场景。

10、线程池

        线程池是一种用于管理和复用线程的机制,可以有效地管理大量线程的生命周期,并提供简化的接口来提交和管理任务。在Python中,可以使用 concurrent.futures 模块中的 ThreadPoolExecutor 类来创建线程池。

线程池的特点

  • 线程复用:线程池中的线程可以被重复使用,避免了线程频繁创建和销毁的开销。
  • 线程管理:线程池负责管理线程的生命周期,包括线程的创建、销毁和回收。
  • 并发控制:线程池可以限制并发执行的线程数量,防止系统资源被过度占用。
  • 异步提交:线程池提供了异步提交任务的方法,可以在后台执行任务并返回结果。

创建线程池

        可以使用 ThreadPoolExecutor 类来创建线程池。可以指定线程池的大小(可同时执行的线程数量)和其他相关参数。

from concurrent.futures import ThreadPoolExecutor

# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

在上述示例中,创建了一个最大同时执行 5 个线程的线程池。

提交任务

        可以使用线程池的 submit() 方法提交任务,该方法会返回一个 Future 对象,用于获取任务的执行结果。

# 定义任务函数
def task_function():
    # 任务逻辑

# 提交任务到线程池
future = pool.submit(task_function)

在上述示例中,将任务函数 task_function 提交到线程池,并获得了一个 Future 对象。

获取任务结果

        可以使用 Future 对象的 result() 方法来获取任务的执行结果。如果任务尚未完成,result() 方法将会阻塞直到任务完成并返回结果。

# 获取任务结果
result = future.result()

关闭线程池

        在使用完线程池后,应该调用 shutdown() 方法来关闭线程池。关闭线程池后,将不再接受新的任务提交,并且会等待所有已提交的任务执行完毕后再退出。

# 关闭线程池
pool.shutdown()
05-24 19:33