'''
存放配置文件
'''
import os
#获取项目根目录
BASE_PATH=os.path.dirname(os.path.dirname(__file__)) #获取用户目录
USER_DATE_PATH=os.path.join(BASE_PATH,'interface','user_date') """
logging配置
""" # 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s' # 定义日志输出格式 结束
# ****************注意1: log文件的目录
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
logfile_dir = os.path.join(BASE_PATH, 'log')
# print(logfile_dir) # ****************注意2: log文件名
logfile_name = 'atm.log' # 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir) # log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name) LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard',
'filename': logfile_path, # 日志文件
'maxBytes': 1024 * 1024 * 5, # 日志大小 5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': True, # 向上(更高level的logger)传递
},
},
}
settings
'''
管理员视图层
'''
from interface import admin_interface
# - 冻结账户
def lock_user():
while True:
lock_user=input('请输入需要冻结的用户:').strip()
flag,msg=admin_interface.lock_user_interface(lock_user)
if flag:
print(msg)
break
else:
print(msg)
# - 添加账户
def add_user():
from core import src
src.register()
# - 修改额度
def alter_balance():
while True:
to_user=input('请输入需要修改的用户').strip()
money=input('请输入修改后的额度:').strip()
if not money.isdigit():
print('请输入整数')
continue
money=int(money)
flag,msg=admin_interface.alter_balance(to_user,money)
if flag:
print(msg)
break
else:
print(msg) admin_dic={
'':lock_user,
'':add_user,
'':alter_balance,
} def run():
while True:
print('''
1 冻结账户
2 添加账户
3 修改额度
''')
choice=input('请输入功能编号:').strip()
if choice not in admin_dic:
print('没有该编号')
continue
admin_dic.get(choice)()
break
admin
'''
用户视图层
''' from interface import user_interface
from interface import bank_interface
from interface import shop_interface
from interface import admin_interface
from lib import common #记录管理状态
login_user=None
# 1、注册(5分)
def register():
while True:
username=input('请输入用户名:').strip()
pwd=input('请输入密码:').strip()
re_pwd=input('请确认密码:').strip()
if pwd == re_pwd:
flag,msg=user_interface.register_interface(username,pwd)
if flag:
print(msg)
break
else:
print(msg)
continue
else:
print('两次密码不一致,请重新输入!') # 2、登录(5分)
def login():
while True:
username=input('请输入用户名:').strip()
pwd=input('请输入密码:').strip()
flag,msg=user_interface.login_interface(username,pwd)
if flag:
global login_user
login_user=username
print(msg)
break
else:
print(msg) # 3、查看余额(2分)
@common.login_auth
def check_balance():
balance=bank_interface.check_balance(login_user)
print(f'当前用户:{login_user} 余额为:{balance}$') # 4、提现(5分)
@common.login_auth
def withdraw():
while True:
money=input('请输入提现金额:').strip()
if not money.isdigit():
print('请输入整数')
continue
money=int(money)
if money > 0:
flag,msg=bank_interface.withdraw_interface(login_user,money)
if flag:
print(msg)
break
else:
print(msg)
else:
print('请输入大于0的整数')
# 5、还款(5分)
@common.login_auth
def re_pay():
while True:
money=input('请输入还款金额:').strip()
if not money.isdigit():
print('请输入整数')
continue
money=int(money)
if money > 0:
flag,msg=bank_interface.re_pay_interface(login_user,money)
print(msg)
break
# 6、转账(10分)
@common.login_auth
def transfer():
while True:
to_user=input('请输入收款用户:').strip()
money=input('请输入转账金额:').strip()
if not money.isdigit():
print('请输入整数')
continue
money=int(money)
if money > 0:
flag,msg=bank_interface.transfer_interface(login_user,to_user,money)
if flag:
print(msg)
break
else:
print(msg) # 7、查看流水(2分)
@common.login_auth
def check_flow():
flag,msg=bank_interface.check_flow_interface(login_user)
if flag:
for flow in msg:
print(flow)
else:
print(msg)
# 8、购物功能(15分)
@common.login_auth
def shopping():
shop_car = {}
shop_list = [
['macbook', 10000],
['iphone', 6999],
['雪梨PHONE', 999],
] while True:
for index,shop in enumerate(shop_list):
shop_name,shop_price=shop
print(f'商品编号:{index},商品名称:{shop_name},单价:{shop_price}$')
choice=input('请输入商品编号(y结账n添加购物车):').strip()
if choice == 'y':
flag,msg=shop_interface.pay_interface(login_user,shop_car)
if flag:
print(msg)
break
else:
print(msg)
continue
elif choice == 'n':
if shop_car:
flag,msg=shop_interface.add_shop_car_interface(login_user,shop_car)
print(msg)
break
else:
print('当前购物车为空')
continue
if not choice.isdigit():
print('没有这个编号')
continue
choice=int(choice)
if choice not in range(len(shop_list)):
print('没有这个编号')
continue
#获取商品名称,单价
shop_name,price=shop_list[choice]
if shop_name in shop_car:
shop_car[shop_name][1] += 1
else:
shop_car[shop_name]=[price,1]
print(shop_car) # 9、查看购物车功能(2分)
@common.login_auth
def check_shop_car():
while True:
shop_car = shop_interface.check_shop_car_interface(login_user)
print(shop_car)
choice=input('输入y结算购物车,输入q清空购物车:').strip()
if choice == 'y':
flag,msg=shop_interface.pay_interface(login_user,shop_car)
if flag:
print(msg)
break
else:
print(msg)
elif choice == 'q':
msg=shop_interface.qingkong_shop_car_interface(login_user)
print(msg)
break
else:
print('没有该编号') # 10、管理员功能
@common.login_auth
def admin():
from core import admin
admin.run() func_dic={
'':register,
'':login,
'':check_balance,
'':withdraw,
'':re_pay,
'':transfer,
'':check_flow,
'':shopping,
'':check_shop_car,
'':admin,
} def run():
while True:
print('''
=======欢迎来到ATM +购物车 APP=======
1、注册功能
2、登录功能
3、查看余额功能
4、提现功能
5、还款功能
6、转账功能
7、查看流水功能
8、购物功能
9、查看购物车功能
10、管理员功能
============ END ============
''')
choice=input('请输入功能编号:').strip()
if choice not in func_dic:
print('没有该编号')
continue
func_dic[choice]()
src
'''
数据处理层
'''
from conf import settings
import os
import json
#查看数据
def select(username):
user_path=os.path.join(settings.USER_DATE_PATH,f'{username}.json')
if os.path.exists(user_path):
with open(user_path,'r',encoding='utf-8') as f:
user_dic=json.load(f)
return user_dic #保存数据
def save(user_dic):
username=user_dic['username']
user_path=os.path.join(settings.USER_DATE_PATH,f'{username}.json')
with open(user_path,'w',encoding='utf-8') as f:
json.dump(user_dic,f,ensure_ascii=False)
return True
db_handler
{"username": "lqb", "password": "b51c4142d51ce2f5651fbf5baa8861bb", "balance": 960007, "flow": ["用户:lqb,还款:1000000$成功"], "shop_car": {}, "locked": false}
lqb.json
'''
管理员接口
''' from db import db_handler
from lib import common
admin_logger=common.get_logger('admin')
#冻结用户
def lock_user_interface(lock_user):
user_dic=db_handler.select(lock_user)
if user_dic:
user_dic['locked']=True
db_handler.save(user_dic)
msg=f'冻结用户{lock_user}成功'
admin_logger.info(msg)
return True,msg
else:
return False,f'用户{lock_user}不存在,无法冻结' #修改用户额度
def alter_balance(to_user,money):
to_user_dic=db_handler.select(to_user)
#用户存在则修改额度
if to_user_dic:
to_user_dic['balance']=money
db_handler.save(to_user_dic)
msg=f'用户{to_user}额度修改为{money}$成功'
admin_logger.info(msg)
return True,msg
else:
return False,f'被修改用户不存在,重新输入'
admin_interface
'''
银行逻辑接口
''' from db import db_handler
from lib import common
bank_logger=common.get_logger('bank')
#查看余额
def check_balance(login_user):
user_dic=db_handler.select(login_user)
return user_dic['balance'] #提现接口
def withdraw_interface(login_user,money):
user_dic=db_handler.select(login_user)
money2=money*1.05
if user_dic['balance'] >= money2:
user_dic['balance'] -= money2
# 记录流水
flow = f'用户:{login_user} 提现金额:{money} $成功,手续费:{money2-money}$'
bank_logger.info(flow)
user_dic['flow'].append(flow)
#保存数据
db_handler.save(user_dic)
return True,flow
else:
msg=f'用户:{login_user} 金额不足,无法提现'
bank_logger.warn(msg)
return False,msg #还款接口
def re_pay_interface(login_user,money):
user_dic=db_handler.select(login_user)
user_dic['balance'] += money
# 记录流水
flow=f'用户:{login_user},还款:{money}$成功'
bank_logger.info(flow)
user_dic['flow'].append(flow)
db_handler.save(user_dic)
return True,flow #转账接口
def transfer_interface(login_user,to_user,money):
login_user_dic=db_handler.select(login_user)
to_user_dic = db_handler.select(to_user)
if login_user_dic['balance'] >= money:
#判断收款用户是否存在
if to_user_dic:
#存在则当前用户减钱,收款用户加钱
login_user_dic['balance'] -= money
to_user_dic['balance'] += money
#记录流水
login_user_flow = f'用户:{login_user} 向用户:{to_user}转账{money}$成功'
login_user_dic['flow'].append(login_user_flow)
to_user_flow=f'用户:{to_user} 收到用户:{login_user}转账{money}$'
to_user_dic['flow'].append(to_user_flow)
# 保存数据
db_handler.save(login_user_dic)
db_handler.save(to_user_dic)
return True,login_user_flow
return False,f'用户:{to_user}不存在' return False,f'当前用户:{login_user} 金额不足无法转账,请重新输入金额' #查看流水
def check_flow_interface(login_user):
user_dic=db_handler.select(login_user)
flow=user_dic['flow']
if flow:
return True,flow
else:
return False,f'用户{login_user}还没有流水'
bank_interface
'''
商场逻辑接口
'''
from db import db_handler
from lib import common
shop_logger=common.get_logger('shop')
#查看购物车
def check_shop_car_interface(login_user):
user_dic=db_handler.select(login_user)
shop_car=user_dic['shop_car']
if shop_car:
return shop_car
else:
return f'用户{login_user}购物车没有商品' #商品结账接口
def pay_interface(login_user,shop_car):
cost=0
if shop_car:
for price_number in shop_car.values():
price,number=price_number
cost+=(price*number)
login_user_dic=db_handler.select(login_user)
if login_user_dic['balance'] >= cost:
#扣钱
login_user_dic['balance'] -= cost
#扣钱之后清空购物车
login_user_dic['shop_car']={}
#保存数据
db_handler.save(login_user_dic)
msg = f'用户{login_user}结账成功,消费{cost}$'
shop_logger.info(msg)
return True,msg
else:
msg = f'用户{login_user}余额不足,无法结账'
shop_logger.warn(msg)
return False,msg
else:
msg = f'用户{login_user}购物车为空,不需要支付'
shop_logger.warn(msg)
return False, msg #添加购物车接口
def add_shop_car_interface(login_user,shopping_car):
user_dic = db_handler.select(login_user)
shop_car = user_dic['shop_car']
for shop_name, price_number in shopping_car.items():
number = price_number[1]
if shop_name in shop_car:
user_dic['shop_car'][shop_name][1] += 1
else:
user_dic['shop_car'].update(
{shop_name: price_number}
)
db_handler.save(user_dic)
msg = '购物车添加成功'
shop_logger.info(msg)
return True, msg #清空购物车接口
def qingkong_shop_car_interface(login_user):
user_dic=db_handler.select(login_user)
user_dic['shop_car']=[]
db_handler.save(user_dic)
msg = '清空购物车成功'
shop_logger.info(msg)
return msg
shop_interface
'''
用户逻辑接口
'''
from db import db_handler
from lib import common
user_logger=common.get_logger('user')
#注册接口
def register_interface(username,pwd):
# 判断用户是否存在
user_dic=db_handler.select(username)
if user_dic:
msg=f'用户名{username}已存在,请重新输入!'
user_logger.warn(msg)
return False,msg
pwd=common.get_pwd_md5(pwd)
user_dic = {
'username': username,
'password': pwd,
'balance': 15000,
'flow': [],
'shop_car': {},
'locked': False,
}
msg = f'用户{username}注册成功!'
user_logger.info(msg)
db_handler.save(user_dic)
return True,msg #登录接口
def login_interface(username,pwd):
user_dic=db_handler.select(username)
if user_dic:
if user_dic['locked']:
msg = f'用户{username}已被锁定,无法登录'
user_logger.warn(msg)
return False,msg
pwd = common.get_pwd_md5(pwd)
if pwd == user_dic['password']:
msg = f'用户{username}登录成功'
user_logger.info(msg)
return True,msg
else:
msg = f'用户{username}登录失败,密码错误'
user_logger.warn(msg)
return False,msg
else:
msg = f'用户{username}不存在,请重新输入'
user_logger.warn(msg)
return False,msg
user_interface
'''
存放公共方法
'''
import hashlib
import logging.config
from conf import settings
# 10、记录日志(5分)
# def ():
# pass
# 11、登录认证装饰器(2分)
def login_auth(func):
from core import src
def inner(*args,**kwargs):
if src.login_user:
res=func(*args,**kwargs)
return res
else:
print('未登录不能操作,请登录')
src.login()
func()
return inner # 12、密码加密(2分)
def get_pwd_md5(pwd):
m=hashlib.md5()
salt='这是yan'
m.update(salt.encode('utf-8'))
m.update(pwd.encode('utf-8'))
m.update(salt.encode('utf-8'))
return m.hexdigest() #添加日志功能
def get_logger(log_type):
logging.config.dictConfig(settings.LOGGING_DIC)
logger=logging.getLogger(log_type)
return logger
common
3机试题(60分)
- ATM + 购物车
1、注册(5分)
2、登录(5分)
3、查看余额(2分)
4、提现(5分)
5、还款(5分)
6、转账(10分)
7、查看流水(2分)
8、购物功能(15分)
9、查看购物车功能(2分)
10、记录日志(5分)
11、登录认证装饰器(2分)
12、密码加密(2分) 4拔高题(5分) - 管理员功能
- 冻结账户
- 添加账户
- 修改额度
readme.md
#项目的入口 #添加解释器环境变量
import os
import sys
sys.path.append(os.path.dirname(__file__)) from core import src if __name__ == '__main__':
src.run()
start.py