项目成品

用到的技术栈:Python+MQTT5.0+PHP

        当顾客扫了三合一二维码且支付完成时,监控收到新的订单,将数据发送给订阅了YF的客户端,客户端通过收到的金额进行记录,并通过接口计算出与该金额最相符的香烟价格,并返回香烟条码,客户端拿到条码完成键盘输入,点击挂单完成支付

顾客扫码支付后回调自动操作​​​​​​​


目录

项目成品

开发背景

配置后端服务及接口

搭建MQTT服务

 Python基本连接实例

导入 Paho MQTT客户端

设置 MQTT Broker 连接参数

编写 MQTT 连接函数

发布消息

订阅消息

完整代码

消息发布代码

消息订阅代码

监控回调

Api接口

开发自动化操作程序

连接服务

自动挂单

补单

卷烟盘点


开发背景

         朋友家的烟店,为了能够更快升级档位,公司规定:需要定期 盘点、会员扫码、挂单下单等操作,当然这只是所有企业的理想状态下,现实生活中需要考虑到很多因素,例如:终端无人操作、顾客不愿意提供个人信息积分、盘点费时等等,于是从开发到落地使用,经历了6个月多,至今才打算发文记录下,最开始没有考虑到监听收款音箱这个方案,我们商店使用的收款音箱是中国农业银行的,认为他不支持第三方接口,后面咨询了下确实不支持

        原先使用的方案是:通过一个机器(单片机/旧手机)监听语音:农行收款**元,后面发现效果不理想,识别到的语音不完整,第三方语音识别接口成本太高,打算放弃了,搁置了1个月左右

        后面觉得都弄了一半了,就差个监控回调,实在划不来,就因为这个导致项目半自动化,实在可惜,打算抓包看下能不能实现token保活,每经过一段时间触发一次,期望不是很高,因为这是银行的产品,我认为安全系数应该要高,结果实现了,亲测保活可以使用半年(简直太离谱了)

配置后端服务及接口

搭建MQTT服务

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。监控和操作程序要能通信需要保持双向连接,且MQTT具有低延迟、低功耗的特点,emqx免费版提供的MQTT服务已经足够使用了,所以本次使用Emqx的服务。

如果你不会搭建MQTT,推荐看我的文章《Esp8266-01s、51单片机实现连接MQTT踩坑:附加烧录安信可固件+宝塔搭建MQTT服务器 全套攻略》

 Python基本连接实例

文档《MQTT 客户端库 & SDKs》

————Python版本:3.7+ ————

导入 Paho MQTT客户端

from paho.mqtt import client as mqtt_client

设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint 函数随机生成 MQTT 客户端 id。

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'

编写 MQTT 连接函数

编写连接回调函数 on_connect,该函数将在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 broker.emqx.io

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # Set Connecting Client ID
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

发布消息

首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish 函数向 /python/mqtt 主题发送消息。

def publish(client):
     msg_count = 0
     while True:
         time.sleep(1)
         msg = f"messages: {msg_count}"
         result = client.publish(topic, msg)
         # result: [0, 1]
         status = result[0]
         if status == 0:
             print(f"Send `{msg}` to topic `{topic}`")
         else:
             print(f"Failed to send message to topic {topic}")
         msg_count += 1

订阅消息

编写消息回调函数 on_message,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

完整代码

消息发布代码

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

消息订阅代码

# python3.6

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()
 

监控回调

由于涉及到账户资金安全,本次代码仅作参考:

# python 3.6

import random
import time
from datetime import datetime
import json5
from paho.mqtt import client as mqtt_client
import requests

def get_order(token, merchantid, client):
    headers = {
        'Host': 'mscs.abchina.com',
        'Connection': 'keep-alive',
       ****
    }

    params = {
        'merchantId': str(merchantid),
        'datatime': '2',
        'employeeid': 'all',
        'tradechannel': '',
        'pageno': '1',
        'pageview': '13',
        'tradestatus': 'SUCCESS',
        'qrFlag': '0',
    }

    response = requests.get('https://****.com/app***oot/transd***ail/t***ail/add***ket', params=params,headers=headers)
    data = response.json()
    # print(response.text)

    if 'transDetail' in data and data['transDetail']:
        day_trans_num = data['day_trans_num']
        merchant_name = data['merchant_name']
        day_trans_amount = data['day_trans_amount']
        print(f"======{merchant_name}=======")
        print(f"今日收款流水:{day_trans_amount}")
        print(f"今日收钱笔数:{day_trans_num}")
        for transaction in data['transDetail']:
            print(f"商户号: {transaction['employee_name']}-{transaction['trade_date']}")
            print(f"收款金额: {transaction['cash_amount']}")
            print(f"支付渠道: {transaction['trade_channel']}")
            print(f"交易状态: {transaction['trade_status']}")
            print("---")

        if day_trans_num:
            url = "http://*********/select.php"

            response = requests.get(url)

            if response.status_code == 200:
                print(response.text)
                new_data = response.text
            else:
                print(f"请求失败,状态码: {response.status_code}")
                new_data = ''

            if int(day_trans_num) > int(new_data):
                order_data = {
                    'user_id': data['transDetail'][0]['user_id'],
                    'trade_date': data['transDetail'][0]['trade_date'],
                    'trade_channel': data['transDetail'][0]['trade_channel'],
                    'cash_amount': data['transDetail'][0]['cash_amount'],
                    'trade_status': data['transDetail'][0]['trade_status'],
                    'merchant': data['transDetail'][0]['employee_name'],
                    'total_price': day_trans_amount,
                    'total_num': day_trans_num
                }

                json_payload = json5.dumps(order_data)
                print(json_payload)
                # result = client.publish(topic, data['transDetail'][0]['cash_amount'])
                result = client.publish(topic,json_payload)

                url = "http://***/updata.php?newDayTransNum=" + day_trans_num
                response = requests.get(url)
            else:
                url = "http://***/updata.php?newDayTransNum=" + day_trans_num
                response = requests.get(url)

    else:
        print("No transaction details found.")

    print("---300s后继续查询保持token存活----")

broker = 'mqtt.***.club'
port = 1883
topic = "YF"
client_id = f'listen-mqtt-{random.randint(0, 1000)}'

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def publish(client):
    token = 'e******9k'
    merchantid = '924***40'

    while True:
        now = datetime.now()
        current_time = now.strftime("%H:%M")
        if "08:00" <= current_time <= "23:00":
            sleep_duration = 18
        else:
            sleep_duration = 600
        time.sleep(1)
        get_order(token, merchantid, client)

        print(f"Sleeping for {sleep_duration} seconds...")
        time.sleep(sleep_duration)

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()

通过进程守护启动

数据库设计

云香印象终端自动化工具(监听农行收款云音箱)-LMLPHP

新建listen监控当天的订单笔数和收款余额,用来判断通知是否是新的一笔订单

新建shop数据表,用于存放香烟的数据

Api接口

云香印象终端自动化工具(监听农行收款云音箱)-LMLPHP

数据操作接口

select.php

<?php
// 引入数据库连接配置
require_once('db_config.php');

// 查询数据
$sql = "SELECT day_trans_num FROM listen WHERE id = 1";
$result = $conn->query($sql);

if ($result->num_rows > 0) {
    // 输出数据
    $row = $result->fetch_assoc();
    echo $row["day_trans_num"];
} else {
    echo '';
}

// 关闭数据库连接
$conn->close();
?>

Update.php

<?php
// 引入数据库连接配置
require_once('db_config.php');

// 获取POST请求的数据
$newDayTransNum = $_GET['newDayTransNum'];

// 更新数据
$sql = "UPDATE listen SET day_trans_num = $newDayTransNum WHERE id = '1'";

if ($conn->query($sql) === TRUE) {
    echo "记录更新成功";
} else {
    echo "Error: " . $sql . "<br>" . $conn->error;
}

// 关闭数据库连接
$conn->close();
?>

get_shopcode.php 

<?php
// 连接到数据库
$servername = "localhost";
$username = "YF";
$password = "j7j*****tYr";
$dbname = "yf";

$conn = new mysqli($servername, $username, $password, $dbname);

// 检查连接
if ($conn->connect_error) {
    die("连接失败: " . $conn->connect_error);
}


$price=$_GET['price'];
// 从外部传递进来的金额
$amount = $price; // 例如:30.5

// SQL 查询,找出价格小于等于传入金额的记录,并按价格降序排列
$sql = "SELECT smonke,price,name FROM shop WHERE num > 0 AND price <= $amount ORDER BY price DESC LIMIT 1";
$result = $conn->query($sql);

// 检查是否有结果
if ($result->num_rows > 0) {
    // 输出商品编码
    while($row = $result->fetch_assoc()) {
         die(
        json_encode(
            array(
            'code' => 200,
            'msg' => '查询成功',
            'name' => $row["name"],
            'price' => $row["price"],
            'smonkecode' => $row["smonke"]
        ),480)
);
    }
} else {
    echo "";
     die(
        json_encode(
            array(
            'code' => 100,
            'msg' => '没有找到符合条件的商品',
            'name' => $row["name"],
            'price' => $row["price"],
            'smonkecode' => $row["smonke"]
        ),480)
);
}

$conn->close();
?>

查询num不为0且用户付款价格大于烟价

开发自动化操作程序

连接服务

将emqx的实例代码稍做整改,完成基本的通讯能力后,我们需要进行自定义,规定room为YF,以及格式化 json          

对应的接收端只需要订阅到YF频道即可

def mqtt_init(self):
    print(self)
    # client = mqtt.Client()
    client = mqtt.Client(userdata=self)  # 将self作为userdata传递

    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("4*****25", 1883, 60)
    client.loop_forever()

自动挂单

当服务端检测到一笔新的订单时,通过MQTT发送json到客户端自动化程序,客户端收款工具进行上传金额,服务器计算得出付款金额与价格最相近的香烟条码,并返回烟码,客户端通过python的autogui操作进行操作(例如点击事件、选中事件等等),例如:由于烟草公司要求挂单之前需要添加会员信息,于是我们新增一个自定义事件add_user()

# 收到消息的回调函数
def on_message(client, userdata, msg):
    self = userdata
    print(msg.topic + " " + str(msg.payload))

    try:
        data=json5.loads(msg.payload)
        # 新增收到回调数据显示给工具
        self.Text3Var.set(data['cash_amount'])# 收款金额
        self.Text4Var.set(data['trade_channel'])  # 支付渠道
        self.Text5Var.set(data['trade_status'])  # 交易状态

        self.Text6Var.set(data['total_price'])  # 总收款金额
        self.Text7Var.set(data['total_num'])  # 总笔数
        self.Label11.config(text=data['merchant'])  # 商户
        #
        # print(data['user_id'])
        # b'{user_id: "oHFX_jtTmL1GALI0PdYu-QU3UfU4", trade_date: "20240309", trade_channel: "wx", cash_amount: "15", trade_status: "SUCCESS"}'
        try:
            amount = float(data['cash_amount'])  # 尝试转换为浮点数
            # 定义接口URL
            url = 'https://********/yf/get_shopcode.php'

            # 定义请求参数
            params = {'price': amount}

            # 发送GET请求
            response = requests.get(url, params=params)

            # 检查请求是否成功
            if response.status_code == 200:
                # 解析响应内容为JSON格式
                data = response.json()

                # 打印返回的数据
                print("code:", data['code'])
                print("msg:", data['msg'])
                print("name:", data['name'])
                print("price:", data['price'])
                print("smonkecode:", data['smonkecode'])

                print("--【正在进行商品添加】--")
                # 添加前结束点一次轮询开关
                error_click()

                # 加入随机会员信息
                add_user()

                # 添加前删除原商品
                delete_shop()
                print('删除可能存在表盘的缓存')
                # 添加商品
                smonke = str(data['smonkecode'])
                add_shop(smonke)
                # 微信动态二维码结算
                submit_()
            else:
                print("没有符合的数据")
                # 打印状态码和错误信息
                print("Error:", response.status_code)
            # selected_index = select_shop(amount, price)
            # print(selected_index)
            # if selected_index is None:
            #     print("小于三元没烟了")
            # else:
            #     print("--【正在进行商品添加】--")
            #     # 添加前结束点一次轮询开关
            #     error_click()
            #
            #     # 加入随机会员信息
            #     add_user()
            #
            #     # 添加前删除原商品
            #     delete_shop()
            #     print('delete')
            #     # 添加商品
            #     add_shop(price_code[selected_index])
            #     # 微信动态二维码结算
            #     submit_()
        except ValueError:  # 如果转换失败,则打印错误信息
            print("接收到的消息不是一个有效的数字")
    except json.JSONDecodeError:
        print("error")

补单

gui界面编写按钮,点击触发

    def Command3_Cmd(self, event=None):
        text3_value = self.Text3Var.get()
        amount = float(text3_value)  # 尝试转换为浮点数
        budan(amount)
def budan(cash_amount):
    amount = float(cash_amount)  # 尝试转换为浮点数
    url = 'https://*****/yf/get_shopcode.php'

    # 定义请求参数
    params = {'price': amount}

    # 发送GET请求
    response = requests.get(url, params=params)

    # 检查请求是否成功
    if response.status_code == 200:
        # 解析响应内容为JSON格式
        data = response.json()

        # 打印返回的数据
        print("code:", data['code'])
        print("msg:", data['msg'])
        print("name:", data['name'])
        print("price:", data['price'])
        print("smonkecode:", data['smonkecode'])

        #补单操作
        print("--【正在进行商品添加】--")
        # 添加前结束点一次轮询开关
        error_click()

        # 加入随机会员信息
        add_user()

        # 添加前删除原商品
        delete_shop()
        print('delete')
        # 添加商品
        smonke = str(data['smonkecode'])
        add_shop(smonke)
        # 微信动态二维码结算
        submit_()

        #补单操作
    else:
        print("没有符合的数据")
        # 打印状态码和错误信息
        print("Error:", response.status_code)
    # selected_index = select_shop(amount, price)
    # print(selected_index)
    # if selected_index is None:
    #     print("小于三元没烟了")
    # else:
    #     print("--【正在进行商品添加】--")
    #     # 添加前结束点一次轮询开关
    #     error_click()
    #
    #     # 加入随机会员信息
    #     add_user()
    #
    #     # 添加前删除原商品
    #     delete_shop()
    #     print('delete')
    #     # 添加商品
    #     add_shop(price_code[selected_index])
    #     # 微信动态二维码结算
    #     submit_()

卷烟盘点

        香烟定期入库后,需要进行盘点,需要将入库的香烟信息记录下来,人工操作过于麻烦,需要事先记录入库香烟条码,手动录入每一个香烟入库,自动化操作,可以节约人力时间成本



最后

        程序在使用时会遇到一些异常问题,例如:当前页面非最大化、模块遮挡等问题,建议使用像素点进行识别,例如本项目通过像素点判断多个点的颜色值如果是桌面的蓝色,三个点都是蓝色则在桌面,具体的逻辑方法开发者可以自己完成

🚀Python爬虫项目实战系列文章
⭐⭐欢迎订阅⭐⭐

【Python爬虫项目实战一】获取Chatgpt3.5免费接口文末付代码(过Authorization认证)
【Python爬虫项目实战二】Chatgpt还原验证算法-解密某宝伪知网数据接口


⭐⭐欢迎订阅⭐⭐
云香印象终端自动化工具(监听农行收款云音箱)-LMLPHP​​​​​​​

04-27 13:22