前言

我们知道,经典设计模式总共有 23 种,但其中只有少数几种被广泛采用。根据我的工作经验,实际常用的可能不超过其中的一半。如果随机找一位程序员,并要求他列举出自己最熟悉的三种设计模式,那么单例模式肯定会是其中之一,这也是今天我们要讨论的。

为什么要单例模式?

单例设计模式(Singleton Design Pattern): 一个类只允许创建一个对象(或者实例),那这个类就是一个单例类,这种设计模式就叫作单例设计模式,简称单例模式。

当一个类的功能比较单一,只需要一个实例对象就可以完成需求时,就可以使用单例模式来节省内存资源。

处理资源冲突

class Logger:
    def __init__(self, file_path):
        self.file_path = file_path

    def log(self, message):
        with open(self.file_path, "a") as file:
            file.write(message + "\n")

class UserController:
    def __init__(self):
        self.logger = Logger("/Users/haige/log.txt")

    def login(self, username, password):
        # ...省略业务逻辑代码...
        self.logger.log(username + " logged in!")

class OrderController:
    def __init__(self):
        self.logger = Logger("/Users/haige/log.txt")

    def create(self, order):
        # ...省略业务逻辑代码...
        self.logger.log("Created an order: " + str(order))

# Logger类的应用示例:
user_controller = UserController()
user_controller.login("Alice", "123456")

order_controller = OrderController()
order_controller.create({"id": 1, "product": "Widget", "quantity": 5})

思考一下,以上代码是否存在什么问题呢?

答案是存在的

解决的办法有很多,如:使用类级别锁,分布式锁等。

相对于这两种解决方案,单例模式的解决思路就简单一些了。单例模式相对于之前类级别锁的好处是,不用创建那么多 Logger 对象,一方面节省内存空间,另一方面节省系统文件句柄(对于操作系统来说,文件句柄也是一种资源,不能随便浪费)。

import logging

class Logger:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(Logger, cls).__new__(cls)
            cls._instance.init_logger()
        return cls._instance

    def init_logger(self):
        self.logger = logging.getLogger("MyLogger")
        file_handler = logging.FileHandler("/Users/wangzheng/log.txt")
        formatter = logging.Formatter("%(asctime)s - %(message)s")
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler)
        self.logger.setLevel(logging.INFO)

    def log(self, message):
        self.logger.info(message)


class UserController:
    def login(self, username, password):
        # ...省略业务逻辑代码...
        Logger().log(username + " logged in!")

class OrderController:
    def create(self, order):
        # ...省略业务逻辑代码...
        Logger().log("Created an order: " + str(order))

# Logger类的应用示例:
user_controller = UserController()
user_controller.login("Alice", "123456")

order_controller = OrderController()
order_controller.create({"id": 1, "product": "Widget", "quantity": 5})

表示全局唯一类

从业务概念上,如果有些数据在系统中只应保存一份,那就比较适合设计为单例类。

  • 比如,配置信息类。在系统中,我们只有一个配置文件,当配置文件被加载到内存之后,以对象的形式存在,也理所应当只有一份。
  • 再比如,唯一递增 ID 号码生成器,如果程序中有两个对象,那就会存在生成重复 ID 的情况,所以,我们应该将 ID 生成器类设计为单例。
import threading


class IdGenerator:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(IdGenerator, cls).__new__(cls)
                    cls._instance.init_generator()
        return cls._instance

    def init_generator(self):
        self.id = 0

    def get_id(self):
        with self._lock:
            self.id += 1
            return self.id


# IdGenerator使用举例
id_generator1 = IdGenerator()
my_id1 = id_generator1.get_id()

id_generator2 = IdGenerator()
my_id2 = id_generator2.get_id()

print(id_generator1 is id_generator2)   # True
print(my_id1, my_id2)  # 1 2

ok,至此我们已经初步入门单例模式了。不过设计的并不十分优雅,而且还存在不少的问题。至于有什么问题以及如何改造,且听我慢慢道来。

如何实现一个单例?

要实现一个单例,我们需要知道要重点关注的点是哪些?

  • 考虑对象创建时的线程安全问题
  • 考虑是否支持延迟加载
  • 考虑获取实例的性能是否高(是否加锁)

在python中,我们可以使用多种方法来实现单例模式:

  • 使用模块
  • 使用装饰器
  • 使用类(方法)
  • 基于__new__方法实现
  • 基于元类metaclass实现

饿汉式(着急吃)

饿汉式的实现方式比较简单。在类加载的时候,instance 静态实例就已经创建并初始化好了,所以,instance 实例的创建过程是线程安全的。不过,这样的实现方式不支持延迟加载(在真正用到 IdGenerator 的时候,再创建实例)。

一种常见的方式是利用python模块化实现单例。

创建一个名为 id_generator_module.py 的模块文件,将 IdGenerator 类定义放入其中:

# id_generator_module.py

import threading


class IdGenerator:
    def __init__(self):
        self._lock = threading.Lock()
        self.id = 0

    def get_id(self):
        with self._lock:
            self.id += 1
            return self.id


# 创建单例实例
id_generator_instance = IdGenerator()

在另一个文件(例如 main.py)中导入模块并使用单例模式:

# main.py

import id_generator_module

if __name__ == '__main__':
    singleton1 = id_generator_module.id_generator_instance
    singleton2 = id_generator_module.id_generator_instance

    print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

懒汉式(不着急吃)

有饿汉式,对应的,就有懒汉式。懒汉式相对于饿汉式的优势是支持延迟加载。

通过__new__方法实现单例

在Python中,对象的实例化过程通常遵循以下步骤:首先,执行类的__new__方法,如果未定义此方法,将默认调用父类的__new__方法来创建一个实例化对象。接着,再执行__init__方法来对这个新创建的对象进行初始化。

我们可以充分利用这个实例化过程来实现单例模式。具体做法是在类的__new__方法中判断是否已经存在实例,如果存在,则直接返回现有的实例,否则创建一个新的实例。这样就能够确保只有一个实例存在,从而实现了单例模式的效果。

#! -*-conding=: UTF-8 -*-
# 2023/9/13 18:21


import threading


class LazySingleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(LazySingleton, cls).__new__(cls, *args, **kwargs)
                cls._instance.init_generator()
        return cls._instance

    def init_generator(self):
        self.id = 0

    def get_id(self):
        with self._lock:
            self.id += 1
            return self.id


# 使用示例
singleton1 = LazySingleton()
singleton2 = LazySingleton()

print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

双重检测

饿汉式不支持延迟加载,懒汉式有性能问题,不支持高并发。那我们再来看一种既支持延迟加载、又支持高并发的单例实现方式,也就是双重检测实现方式。

使用__new__方法实现

#! -*-conding=: UTF-8 -*-
# 2023/9/13 18:21


import threading


class IdGenerator:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(IdGenerator, cls).__new__(cls)
                    cls._instance.init_generator()
        return cls._instance

    def init_generator(self):
        self.id = 0

    def get_id(self):
        with self._lock:
            self.id += 1
            return self.id


# 使用示例
singleton1 = IdGenerator()
singleton2 = IdGenerator()

print(singleton1 is singleton2)  # 输出 True,表示是同一个实例


使用装饰器函数实现单例模式

import threading


def singleton(cls):
    instances = {}
    lock = threading.Lock()

    def get_instance(*args, **kwargs):
        if cls not in instances:
            with lock:
                if cls not in instances:
                    instances[cls] = cls(*args, **kwargs)
        return instances[cls]

    return get_instance


@singleton
class IdGenerator:
    def __init__(self):
        self.id = 0

    def get_id(self):
        with threading.Lock():
            self.id += 1
            return self.id


if __name__ == '__main__':
    singleton1 = IdGenerator()
    singleton2 = IdGenerator()

    print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

    # 使用 __call__ 方法获取单例
    singleton3 = IdGenerator()
    print(singleton1 is singleton3)  # 输出 True,表示是同一个实例

使用了类装饰器 @singleton 来标记 IdGenerator 类,并且确保只有一个实例被创建。

使用装饰器类实现单例模式

通过 __call__方法来获取单例,并使用一个装饰器函数来包装类,确保只有一个实例被创建。

import threading


class SingletonDecorator:
    def __init__(self, cls):
        self._cls = cls
        self._instances = {}
        self._lock = threading.Lock()

    def __call__(self, *args, **kwargs):
        if self._cls not in self._instances:
            with self._lock:
                if self._cls not in self._instances:
                    instance = self._cls(*args, **kwargs)
                    self._instances[self._cls] = instance
        return self._instances[self._cls]


@SingletonDecorator
class IdGenerator:
    def init_generator(self):
        self.id = 0

    def get_id(self):
        with threading.Lock():
            self.id += 1
            return self.id


if __name__ == '__main__':
    singleton1 = IdGenerator()
    singleton2 = IdGenerator()

    print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

    # 使用装饰器获取单例
    singleton3 = IdGenerator()
    print(singleton1 is singleton3)  # 输出 True,表示是同一个实例

使用类方法实现单例

import threading


class IdGenerator:
    _instance = None
    _lock = threading.Lock()

    def __init__(self):
        self.init_generator()

    def init_generator(self):
        self.id = 0

    def get_id(self):
        with self._lock:
            self.id += 1
            return self.id

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance


if __name__ == '__main__':
    singleton1 = IdGenerator.get_instance()
    singleton2 = IdGenerator.get_instance()

    print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

在这个示例中,我们将__new__方法去掉,并添加了一个类方法 get_instance(),该方法负责创建和返回单例实例。通过调用 IdGenerator.get_instance(),你可以获得同一个单例实例,输出应该是 True,表示是同一个实例。

使用元类metaclass实现单例模式

import threading


class SingletonMeta(type):
    _instances = {}
    _lock = threading.Lock()

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            with cls._lock:
                if cls not in cls._instances:
                    instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
                    cls._instances[cls] = instance
        return cls._instances[cls]


class IdGenerator(metaclass=SingletonMeta):
    def init_generator(self):
        self.id = 0

    def get_id(self):
        with threading.Lock():
            self.id += 1
            return self.id


if __name__ == '__main__':
    singleton1 = IdGenerator()
    singleton2 = IdGenerator()

    print(singleton1 is singleton2)  # 输出 True,表示是同一个实例

在这个示例中,我们定义了一个名为 SingletonMeta 的元类,它负责控制单例的创建。我们将 IdGenerator 类的元类设置为 SingletonMeta,这样就可以确保只有一个实例被创建。输出结果应该是 True,表示是同一个实例。这种方式使用元类更加符合面向对象的思想,使得单例模式的实现更加清晰。

如何实现集群环境下的单例?

具体来说,我们需要把这个单例对象序列化并存储到外部共享存储区(比如文件)。进程在使用这个单例对象的时候,需要先从外部共享存储区中将它读取到内存,并反序列化成对象,然后再使用,使用完成之后还需要再存储回外部共享存储区。

为了保证任何时刻,在进程间都只有一份对象存在,一个进程在获取到对象之后,需要对对象加锁,避免其他进程再将其获取。在进程使用完这个对象之后,还需要显式地将对象从内存中删除,并且释放对对象的加锁。

使用场景示例

场景1

某个项目的配置信息存放在一个配置文件中,通过一个 Config 的类来读取配置文件的信息。如果在程序运行期间,有很多地方都需要使用配置文件的内容,也就是说,很多地方都需要创建 Config 对象的实例,这就导致系统中存在多个 Config 的实例对象,而这样会严重浪费内存资源,尤其是在配置文件内容很多的情况下。事实上,类似 Config 这样的类,我们希望在程序运行期间只存在一个实例对象。

import configparser
import threading


class Config:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(Config, cls).__new__(cls)
                    cls._instance.load_config()
        return cls._instance

    def load_config(self):
        self.config = configparser.ConfigParser()
        if not self.config.read('config.ini'):
            # 如果配置文件不存在,设置默认配置
            self.set_default_config()

    def set_default_config(self):
        # 设置默认配置
        self.config['Section1'] = {'key1': 'default_value1', 'key2': 'default_value2'}

    def get_config(self, section, key):
        return self.config.get(section, key)

    def update_config(self, section, key, value):
        self.config.set(section, key, value)
        with open('config.ini', 'w') as config_file:
            self.config.write(config_file)


if __name__ == "__main__":
    config = Config()

    # 获取配置信息
    value1 = config.get_config("Section1", "key1")
    print(value1)

    # 更新配置信息
    config.update_config("Section1", "key1", "new_value")

    # 获取更新后的配置信息
    updated_value1 = config.get_config("Section1", "key1")
    print(updated_value1)

场景2
分布式ID生成是一个常见的需求,以下是一个使用雪花算法实现分布式ID生成的Python代码示例,并将雪花算法的生成ID功能与单例模式结合使用,创建了一个单例类,该类包含了雪花算法的实例,并确保只有一个该类的实例存在:


import threading
import time


class SnowflakeIDGenerator:
    def __init__(self, worker_id, datacenter_id):
        # 41位时间戳位
        self.timestamp_bits = 41
        # 10位工作机器ID位
        self.worker_id_bits = 10
        # 12位序列号位
        self.sequence_bits = 12

        # 最大工作机器ID和最大序列号
        self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
        self.max_sequence = -1 ^ (-1 << self.sequence_bits)

        # 时间戳左移的位数
        self.timestamp_shift = self.worker_id_bits + self.sequence_bits
        # 工作机器ID左移的位数
        self.worker_id_shift = self.sequence_bits

        # 配置工作机器ID和数据中心ID
        self.worker_id = worker_id
        self.datacenter_id = datacenter_id

        # 初始化序列号
        self.sequence = 0
        # 上次生成ID的时间戳
        self.last_timestamp = -1

        # 线程锁,用于保护并发生成ID的安全性
        self.lock = threading.Lock()

        # 校验工作机器ID和数据中心ID是否合法
        if self.worker_id < 0 or self.worker_id > self.max_worker_id:
            raise ValueError(f"Worker ID must be between 0 and {self.max_worker_id}")
        if self.datacenter_id < 0 or self.datacenter_id > self.max_worker_id:
            raise ValueError(f"Datacenter ID must be between 0 and {self.max_worker_id}")

    def _current_timestamp(self):
        return int(time.time() * 1000)

    def _wait_for_next_timestamp(self, last_timestamp):
        timestamp = self._current_timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._current_timestamp()
        return timestamp

    def generate_id(self):
        with self.lock:
            current_timestamp = self._current_timestamp()
            if current_timestamp < self.last_timestamp:
                raise ValueError("Clock moved backwards. Refusing to generate ID.")

            if current_timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.max_sequence
                if self.sequence == 0:
                    current_timestamp = self._wait_for_next_timestamp(self.last_timestamp)
            else:
                self.sequence = 0

            self.last_timestamp = current_timestamp

            # 构造ID
            timestamp = current_timestamp << self.timestamp_shift
            worker_id = self.worker_id << self.worker_id_shift
            id = timestamp | worker_id | self.sequence
            return id


class SingletonSnowflakeGenerator:
    _instance_lock = threading.Lock()
    _instance = None

    def __new__(cls, worker_id, datacenter_id):
        if cls._instance is None:
            with cls._instance_lock:
                if cls._instance is None:
                    cls._instance = SnowflakeIDGenerator(worker_id, datacenter_id)
        return cls._instance


if __name__ == "__main__":
    generator1 = SingletonSnowflakeGenerator(worker_id=1, datacenter_id=1)
    generator2 = SingletonSnowflakeGenerator(worker_id=2, datacenter_id=2)

    print(generator1 is generator2)  # 输出 True,表示是同一个实例

    id1 = generator1.generate_id()  
    id2 = generator2.generate_id()  

    print(id1)  # 7108005303425175552
    print(id2)  # 7108005303425175553

场景3
数据库连接池:确保在应用程序中只存在一个数据库连接池的实例,以提高性能和资源利用率。


import threading
import pymysql
from dbutils.pooled_db import PooledDB


class DatabaseConnectionPoolProxy:
    _instance_lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if not hasattr(DatabaseConnectionPoolProxy, "_instance"):
            with DatabaseConnectionPoolProxy._instance_lock:
                if not hasattr(DatabaseConnectionPoolProxy, "_instance"):
                    DatabaseConnectionPoolProxy._instance = object.__new__(cls)
                    cls._instance.initialize_pool()
        return DatabaseConnectionPoolProxy._instance

    def initialize_pool(self):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=6,
            mincached=2,
            maxcached=5,
            maxshared=3,
            blocking=True,
            maxusage=None,
            # 配置其他数据库连接池参数
            host='192.168.91.1',
            port=3306,
            user='root',
            password='root',
            database='inventory',
            charset='utf8'
        )

    def get_connection(self):
        if self.pool:
            return self.pool.connection()

    def execute_query(self, query, params=None):
        conn = self.get_connection()
        if conn:
            cursor = conn.cursor()
            try:
                cursor.execute(query, params)
                result = cursor.fetchall()
                return result
            finally:
                cursor.close()
                conn.close()


if __name__ == "__main__":
    db_proxy = DatabaseConnectionPoolProxy()
    result = db_proxy.execute_query("SELECT * FROM inventory WHERE id=%s", [5])

    print(result)

    db_proxy1 = DatabaseConnectionPoolProxy()
    db_proxy2 = DatabaseConnectionPoolProxy()

    print(db_proxy1 is db_proxy2)  # True

场景4
缓存管理器:管理应用程序中的缓存数据,确保只有一个缓存管理器实例来避免数据一致性问题。

import threading


class CacheManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(CacheManager, cls).__new__(cls)
                    cls._instance.initialize_cache()
        return cls._instance

    def initialize_cache(self):
        self.cache_data = {}  # 实际缓存数据的数据结构

    def get_data(self, key):
        return self.cache_data.get(key)

    def set_data(self, key, value):
        with self._lock:
            self.cache_data[key] = value


if __name__ == "__main__":
    cache_manager1 = CacheManager()
    cache_manager1.set_data("key1", "value1")

    cache_manager2 = CacheManager()
    value = cache_manager2.get_data("key1")
    print(value)  # 输出 "value1"

    print(cache_manager1 is cache_manager2)  # 如果为 True,则是单例模式

场景5
线程池:确保在应用程序中只存在一个线程池的实例,以管理并发任务的执行。

import threading
from concurrent.futures import ThreadPoolExecutor


class ThreadPoolManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(ThreadPoolManager, cls).__new__(cls)
                    cls._instance.initialize_thread_pool()
        return cls._instance

    def initialize_thread_pool(self):
        self.thread_pool = ThreadPoolExecutor(max_workers=4)  # 最大工作线程数

    def submit_task(self, task_function, *args, **kwargs):
        return self.thread_pool.submit(task_function, *args, **kwargs)


if __name__ == "__main__":
    thread_pool_manager1 = ThreadPoolManager()


    def sample_task(x):
        return x * 2


    future = thread_pool_manager1.submit_task(sample_task, 5)
    result = future.result()
    print(result)  # 输出 10

    thread_pool_manager2 = ThreadPoolManager()

    print(thread_pool_manager1 is thread_pool_manager2)  # 如果为 True,则是单例模式

小结

懒汉式还是饿汉式更好我觉得需要看具体的场景。

  • 对于那些短生命周期的应用,如客户端应用来说,启动是频繁发生的,如果启动时导致了一堆饿汉初始化,会给用户带来不好的体验,如果把初始化往后延,将初始化分散在未来的各个时间点,即使某个懒汉初始化时间较长,用户也几乎无感知。

  • 而对于生命周期较长的应用,长痛不如短痛,启动时耗点时,保证后面的使用流畅也是可取的。

另外,需要关注多线程等并发带来的并发安全问题及引入锁后的性能问题等。

参考

01-14 15:13