No1 观察者模式
定义了对象之间的一对多依赖。当被观察者对象改变状态时,它的所有依赖者(观察者)都会收到通知并更新状态。
#可以继承一个抽象类 class Observer(): """观察者基类""" # @abstractmethod def update(self, observable, object): pass class Observable: """被观察者的基类""" def __init__(self): self.__observers = [] def addObserver(self, observer): self.__observers.append(observer) def removeObserver(self, observer): self.__observers.remove(observer) def notifyObservers(self, object): for obj in self.__observers: obj.update(self, object) ''' 举例:异地登录提醒 ''' import time class Account(Observable): def __init__(self): super().__init__() self.__lastIp = {} self.__lastRegion = {} def __getRegion(self, ip): #由IP地址获取地区信息。这里只是模拟,真实项目中应该调用IP地址解析服务。 ipRegions = { "101.47.18.9":"浙江杭州", "67.218.147.69":"美国洛杉矶", } # region = ipRegions[ip] region = ipRegions.get(ip) return "" if region is None else region def __isLongDistance(self, name, region): # 计算本次登录与最近几次登录的距离 # 真实项目中调用地理信息相关服务 lastRegion = self.__lastRegion.get(name) return lastRegion is not None and lastRegion != region def login(self, name, ip, time): region = self.__getRegion(ip) if self.__isLongDistance(name, region): self.notifyObservers({"name":name,"ip":ip,"region":region,"time":time}) self.__lastRegion[name] = region self.__lastIp[name] = ip class SmsSender(Observer): def update(self, observable, object): print("[短信发送] " + object["name"] + "您好!检测到您的账户可能登录异常。最近一次登录信息:\n" + "登录地区:" + object["region"] + " 登录ip:" + object["ip"] + " 登录时间:" + time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(object["time"]))) class MailSender(Observer): """邮件发送器""" def update(self, observable, object): print("[邮件发送] " + object["name"] + "您好!检测到您的账户可能登录异常。最近一次登录信息:\n" + "登录地区:" + object["region"] + " 登录ip:" + object["ip"] + " 登录时间:" + time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(object["time"]))) if "__main__" == __name__: accout = Account() accout.addObserver(SmsSender()) accout.addObserver(MailSender()) accout.login("Tony", "101.47.18.9", time.time()) accout.login("Tony", "67.218.147.69", time.time())