项目成品
用到的技术栈:Python+MQTT5.0+PHP
当顾客扫了三合一二维码且支付完成时,监控收到新的订单,将数据发送给订阅了YF的客户端,客户端通过收到的金额进行记录,并通过接口计算出与该金额最相符的香烟价格,并返回香烟条码,客户端拿到条码完成键盘输入,点击挂单完成支付
顾客扫码支付后回调自动操作
目录
开发背景
朋友家的烟店,为了能够更快升级档位,公司规定:需要定期 盘点、会员扫码、挂单下单等操作,当然这只是所有企业的理想状态下,现实生活中需要考虑到很多因素,例如:终端无人操作、顾客不愿意提供个人信息积分、盘点费时等等,于是从开发到落地使用,经历了6个月多,至今才打算发文记录下,最开始没有考虑到监听收款音箱这个方案,我们商店使用的收款音箱是中国农业银行的,认为他不支持第三方接口,后面咨询了下确实不支持
原先使用的方案是:通过一个机器(单片机/旧手机)监听语音:农行收款**元,后面发现效果不理想,识别到的语音不完整,第三方语音识别接口成本太高,打算放弃了,搁置了1个月左右
后面觉得都弄了一半了,就差个监控回调,实在划不来,就因为这个导致项目半自动化,实在可惜,打算抓包看下能不能实现token保活,每经过一段时间触发一次,期望不是很高,因为这是银行的产品,我认为安全系数应该要高,结果实现了,亲测保活可以使用半年(简直太离谱了)
配置后端服务及接口
搭建MQTT服务
MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。监控和操作程序要能通信需要保持双向连接,且MQTT具有低延迟、低功耗的特点,emqx免费版提供的MQTT服务已经足够使用了,所以本次使用Emqx的服务。
如果你不会搭建MQTT,推荐看我的文章《Esp8266-01s、51单片机实现连接MQTT踩坑:附加烧录安信可固件+宝塔搭建MQTT服务器 全套攻略》
Python基本连接实例
————Python版本:3.7+ ————
导入 Paho MQTT客户端
from paho.mqtt import client as mqtt_client
设置 MQTT Broker 连接参数
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint
函数随机生成 MQTT 客户端 id。
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
编写 MQTT 连接函数
编写连接回调函数 on_connect
,该函数将在客户端连接后被调用,在该函数中可以依据 rc
来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 broker.emqx.io
。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
发布消息
首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish
函数向 /python/mqtt
主题发送消息。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
订阅消息
编写消息回调函数 on_message
,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
完整代码
消息发布代码
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
消息订阅代码
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
监控回调
由于涉及到账户资金安全,本次代码仅作参考:
# python 3.6
import random
import time
from datetime import datetime
import json5
from paho.mqtt import client as mqtt_client
import requests
def get_order(token, merchantid, client):
headers = {
'Host': 'mscs.abchina.com',
'Connection': 'keep-alive',
****
}
params = {
'merchantId': str(merchantid),
'datatime': '2',
'employeeid': 'all',
'tradechannel': '',
'pageno': '1',
'pageview': '13',
'tradestatus': 'SUCCESS',
'qrFlag': '0',
}
response = requests.get('https://****.com/app***oot/transd***ail/t***ail/add***ket', params=params,headers=headers)
data = response.json()
# print(response.text)
if 'transDetail' in data and data['transDetail']:
day_trans_num = data['day_trans_num']
merchant_name = data['merchant_name']
day_trans_amount = data['day_trans_amount']
print(f"======{merchant_name}=======")
print(f"今日收款流水:{day_trans_amount}")
print(f"今日收钱笔数:{day_trans_num}")
for transaction in data['transDetail']:
print(f"商户号: {transaction['employee_name']}-{transaction['trade_date']}")
print(f"收款金额: {transaction['cash_amount']}")
print(f"支付渠道: {transaction['trade_channel']}")
print(f"交易状态: {transaction['trade_status']}")
print("---")
if day_trans_num:
url = "http://*********/select.php"
response = requests.get(url)
if response.status_code == 200:
print(response.text)
new_data = response.text
else:
print(f"请求失败,状态码: {response.status_code}")
new_data = ''
if int(day_trans_num) > int(new_data):
order_data = {
'user_id': data['transDetail'][0]['user_id'],
'trade_date': data['transDetail'][0]['trade_date'],
'trade_channel': data['transDetail'][0]['trade_channel'],
'cash_amount': data['transDetail'][0]['cash_amount'],
'trade_status': data['transDetail'][0]['trade_status'],
'merchant': data['transDetail'][0]['employee_name'],
'total_price': day_trans_amount,
'total_num': day_trans_num
}
json_payload = json5.dumps(order_data)
print(json_payload)
# result = client.publish(topic, data['transDetail'][0]['cash_amount'])
result = client.publish(topic,json_payload)
url = "http://***/updata.php?newDayTransNum=" + day_trans_num
response = requests.get(url)
else:
url = "http://***/updata.php?newDayTransNum=" + day_trans_num
response = requests.get(url)
else:
print("No transaction details found.")
print("---300s后继续查询保持token存活----")
broker = 'mqtt.***.club'
port = 1883
topic = "YF"
client_id = f'listen-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
token = 'e******9k'
merchantid = '924***40'
while True:
now = datetime.now()
current_time = now.strftime("%H:%M")
if "08:00" <= current_time <= "23:00":
sleep_duration = 18
else:
sleep_duration = 600
time.sleep(1)
get_order(token, merchantid, client)
print(f"Sleeping for {sleep_duration} seconds...")
time.sleep(sleep_duration)
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
通过进程守护启动
数据库设计
新建listen监控当天的订单笔数和收款余额,用来判断通知是否是新的一笔订单
新建shop数据表,用于存放香烟的数据
Api接口
数据操作接口
select.php
<?php
// 引入数据库连接配置
require_once('db_config.php');
// 查询数据
$sql = "SELECT day_trans_num FROM listen WHERE id = 1";
$result = $conn->query($sql);
if ($result->num_rows > 0) {
// 输出数据
$row = $result->fetch_assoc();
echo $row["day_trans_num"];
} else {
echo '';
}
// 关闭数据库连接
$conn->close();
?>
Update.php
<?php
// 引入数据库连接配置
require_once('db_config.php');
// 获取POST请求的数据
$newDayTransNum = $_GET['newDayTransNum'];
// 更新数据
$sql = "UPDATE listen SET day_trans_num = $newDayTransNum WHERE id = '1'";
if ($conn->query($sql) === TRUE) {
echo "记录更新成功";
} else {
echo "Error: " . $sql . "<br>" . $conn->error;
}
// 关闭数据库连接
$conn->close();
?>
get_shopcode.php
<?php
// 连接到数据库
$servername = "localhost";
$username = "YF";
$password = "j7j*****tYr";
$dbname = "yf";
$conn = new mysqli($servername, $username, $password, $dbname);
// 检查连接
if ($conn->connect_error) {
die("连接失败: " . $conn->connect_error);
}
$price=$_GET['price'];
// 从外部传递进来的金额
$amount = $price; // 例如:30.5
// SQL 查询,找出价格小于等于传入金额的记录,并按价格降序排列
$sql = "SELECT smonke,price,name FROM shop WHERE num > 0 AND price <= $amount ORDER BY price DESC LIMIT 1";
$result = $conn->query($sql);
// 检查是否有结果
if ($result->num_rows > 0) {
// 输出商品编码
while($row = $result->fetch_assoc()) {
die(
json_encode(
array(
'code' => 200,
'msg' => '查询成功',
'name' => $row["name"],
'price' => $row["price"],
'smonkecode' => $row["smonke"]
),480)
);
}
} else {
echo "";
die(
json_encode(
array(
'code' => 100,
'msg' => '没有找到符合条件的商品',
'name' => $row["name"],
'price' => $row["price"],
'smonkecode' => $row["smonke"]
),480)
);
}
$conn->close();
?>
查询num不为0且用户付款价格大于烟价
开发自动化操作程序
连接服务
将emqx的实例代码稍做整改,完成基本的通讯能力后,我们需要进行自定义,规定room为YF,以及格式化 json
对应的接收端只需要订阅到YF频道即可
def mqtt_init(self):
print(self)
# client = mqtt.Client()
client = mqtt.Client(userdata=self) # 将self作为userdata传递
client.on_connect = on_connect
client.on_message = on_message
client.connect("4*****25", 1883, 60)
client.loop_forever()
自动挂单
当服务端检测到一笔新的订单时,通过MQTT发送json到客户端自动化程序,客户端收款工具进行上传金额,服务器计算得出付款金额与价格最相近的香烟条码,并返回烟码,客户端通过python的autogui操作进行操作(例如点击事件、选中事件等等),例如:由于烟草公司要求挂单之前需要添加会员信息,于是我们新增一个自定义事件add_user()
# 收到消息的回调函数
def on_message(client, userdata, msg):
self = userdata
print(msg.topic + " " + str(msg.payload))
try:
data=json5.loads(msg.payload)
# 新增收到回调数据显示给工具
self.Text3Var.set(data['cash_amount'])# 收款金额
self.Text4Var.set(data['trade_channel']) # 支付渠道
self.Text5Var.set(data['trade_status']) # 交易状态
self.Text6Var.set(data['total_price']) # 总收款金额
self.Text7Var.set(data['total_num']) # 总笔数
self.Label11.config(text=data['merchant']) # 商户
#
# print(data['user_id'])
# b'{user_id: "oHFX_jtTmL1GALI0PdYu-QU3UfU4", trade_date: "20240309", trade_channel: "wx", cash_amount: "15", trade_status: "SUCCESS"}'
try:
amount = float(data['cash_amount']) # 尝试转换为浮点数
# 定义接口URL
url = 'https://********/yf/get_shopcode.php'
# 定义请求参数
params = {'price': amount}
# 发送GET请求
response = requests.get(url, params=params)
# 检查请求是否成功
if response.status_code == 200:
# 解析响应内容为JSON格式
data = response.json()
# 打印返回的数据
print("code:", data['code'])
print("msg:", data['msg'])
print("name:", data['name'])
print("price:", data['price'])
print("smonkecode:", data['smonkecode'])
print("--【正在进行商品添加】--")
# 添加前结束点一次轮询开关
error_click()
# 加入随机会员信息
add_user()
# 添加前删除原商品
delete_shop()
print('删除可能存在表盘的缓存')
# 添加商品
smonke = str(data['smonkecode'])
add_shop(smonke)
# 微信动态二维码结算
submit_()
else:
print("没有符合的数据")
# 打印状态码和错误信息
print("Error:", response.status_code)
# selected_index = select_shop(amount, price)
# print(selected_index)
# if selected_index is None:
# print("小于三元没烟了")
# else:
# print("--【正在进行商品添加】--")
# # 添加前结束点一次轮询开关
# error_click()
#
# # 加入随机会员信息
# add_user()
#
# # 添加前删除原商品
# delete_shop()
# print('delete')
# # 添加商品
# add_shop(price_code[selected_index])
# # 微信动态二维码结算
# submit_()
except ValueError: # 如果转换失败,则打印错误信息
print("接收到的消息不是一个有效的数字")
except json.JSONDecodeError:
print("error")
补单
gui界面编写按钮,点击触发
def Command3_Cmd(self, event=None):
text3_value = self.Text3Var.get()
amount = float(text3_value) # 尝试转换为浮点数
budan(amount)
def budan(cash_amount):
amount = float(cash_amount) # 尝试转换为浮点数
url = 'https://*****/yf/get_shopcode.php'
# 定义请求参数
params = {'price': amount}
# 发送GET请求
response = requests.get(url, params=params)
# 检查请求是否成功
if response.status_code == 200:
# 解析响应内容为JSON格式
data = response.json()
# 打印返回的数据
print("code:", data['code'])
print("msg:", data['msg'])
print("name:", data['name'])
print("price:", data['price'])
print("smonkecode:", data['smonkecode'])
#补单操作
print("--【正在进行商品添加】--")
# 添加前结束点一次轮询开关
error_click()
# 加入随机会员信息
add_user()
# 添加前删除原商品
delete_shop()
print('delete')
# 添加商品
smonke = str(data['smonkecode'])
add_shop(smonke)
# 微信动态二维码结算
submit_()
#补单操作
else:
print("没有符合的数据")
# 打印状态码和错误信息
print("Error:", response.status_code)
# selected_index = select_shop(amount, price)
# print(selected_index)
# if selected_index is None:
# print("小于三元没烟了")
# else:
# print("--【正在进行商品添加】--")
# # 添加前结束点一次轮询开关
# error_click()
#
# # 加入随机会员信息
# add_user()
#
# # 添加前删除原商品
# delete_shop()
# print('delete')
# # 添加商品
# add_shop(price_code[selected_index])
# # 微信动态二维码结算
# submit_()
卷烟盘点
香烟定期入库后,需要进行盘点,需要将入库的香烟信息记录下来,人工操作过于麻烦,需要事先记录入库香烟条码,手动录入每一个香烟入库,自动化操作,可以节约人力时间成本
最后
程序在使用时会遇到一些异常问题,例如:当前页面非最大化、模块遮挡等问题,建议使用像素点进行识别,例如本项目通过像素点判断多个点的颜色值如果是桌面的蓝色,三个点都是蓝色则在桌面,具体的逻辑方法开发者可以自己完成
🚀Python爬虫项目实战系列文章
⭐⭐欢迎订阅⭐⭐
【Python爬虫项目实战一】获取Chatgpt3.5免费接口文末付代码(过Authorization认证)
【Python爬虫项目实战二】Chatgpt还原验证算法-解密某宝伪知网数据接口
⭐⭐欢迎订阅⭐⭐