一. 延迟消息队列

1. 在提交支付之后,可以发送一个延迟检查的队列,来主动查询用户在支付宝上的支付状态

在mq的配置/conf/activeMq.xml的broker实例上配置加上schedulerSupport="true",如下图所示

activemq 的延迟队列和幂等性检查-LMLPHP

2 延迟检查如果失败,则从新发送新的检查队列,并且要将检查次数减一

// 没有支付,再次发送检查队列
System.out.println("订单号:"+out_trade_no+"尚未支付成功,再次发送检查队列,剩余检查次数:"+count);
if(count>0){
    System.out.println("订单号:"+out_trade_no+"检查次数尚未用尽,再次发送检查队列"+count);
    count --;
    paymentService.sendDelayPaymentCheckQueue(out_trade_no,count);
}else {


System.out.println("订单号:"+out_trade_no+"检查次数耗尽,停止检查,调用关单服务");

}

二. 幂等性检查

调用相同的服务,返回的结果始终一样,叫做幂等性检查.

例子:

第一步,在支付的controller层发起延迟队列

@RequestMapping("/alipay/submit")
@ResponseBody
public String alipay(Model model, HttpServletRequest request,String orderSn){ // 查询对应订单信息
OmsOrder omsOrder = orderService.getOrderBySn(orderSn); AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();//创建API对应的request
alipayRequest.setReturnUrl(AlipayConfig.return_payment_url);
alipayRequest.setNotifyUrl(AlipayConfig.notify_payment_url);//在公共参数中设置回跳和通知地址 Map<String,Object> requestMap= new HashMap<>();
requestMap.put("out_trade_no",orderSn);
requestMap.put("product_code","FAST_INSTANT_TRADE_PAY");
requestMap.put("total_amount",0.01);
requestMap.put("subject",omsOrder.getOmsOrderItems().get(0).getProductName()); alipayRequest.setBizContent(JSON.toJSONString(requestMap));//填充业务参数
String form="";
try {
form = alipayClient.pageExecute(alipayRequest).getBody(); //调用SDK生成表单
} catch (AlipayApiException e) {
e.printStackTrace();
} // 保存支付数据
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setOrderSn(orderSn);
paymentInfo.setPaymentStatus("0");
paymentInfo.setTotalAmount(omsOrder.getPayAmount());
paymentInfo.setSubject(omsOrder.getOmsOrderItems().get(0).getProductName());
paymentInfo.setOrderId(omsOrder.getId());
paymentService.addPayment(paymentInfo); System.out.println(form); // 发送一个检查支付结果的队列(延迟队列)
paymentService.sendDelayPaymentCheckQueue(orderSn,7);// 调用支付宝的支付状态接口程序 return form;
}

@Override
public void sendDelayPaymentCheckQueue(String orderSn,long count) {
// 发送orderSn的延迟检查队列
ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory(); Connection connection = null;
Session session = null;// 开启消息事务
Queue paymentResultQueue = null; // 队列
MessageProducer producer = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
paymentResultQueue = session.createQueue("PAYMENT_CHECK_QUEUE");
//text文本格式,map键值格式
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("out_trade_no",orderSn);
mapMessage.setLong("count",count); //消息延迟消费
mapMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000*30); producer = session.createProducer(paymentResultQueue);// 消息的生成者
producer.send(mapMessage);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
} }

@Component
public class PaymentConsumer { @Autowired
PaymentService paymentService; @JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_CHECK_QUEUE")
public void consumePaymentCheckQueue(MapMessage mapMessage) throws JMSException {
String out_trade_no = mapMessage.getString("out_trade_no");
long count = mapMessage.getLong("count"); System.out.println("开始检查支付状态。。。订单号:"+out_trade_no); // 系统支付的幂等性检查
String status = paymentService.checkPayStatus(out_trade_no);
if(status!=null&&!status.equals("已支付")){
// 检查支付状态
Map<String,Object> map = paymentService.checkPayment(out_trade_no); if(map!=null&&(((String)map.get("trade_status")).equals("TRADE_SUCCESS")||((String)map.get("trade_status")).equals("TRADE_FINISHED"))){
// 保存支付信息,发送支付成功队列
System.out.println("订单号:"+out_trade_no+"已经支付成功,继续后续操作");
//
}else{
// 没有支付,再次发送检查队列
System.out.println("订单号:"+out_trade_no+"尚未支付成功,再次发送检查队列,剩余检查次数:"+count);
if(count>0){
System.out.println("订单号:"+out_trade_no+"检查次数尚未用尽,再次发送检查队列"+count);
count --;
paymentService.sendDelayPaymentCheckQueue(out_trade_no,count);
}else {
System.out.println("订单号:"+out_trade_no+"检查次数耗尽,停止检查,调用关单服务"); }
}
} }
}

@Override
public String checkPayStatus(String order_sn) { PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setOrderSn(order_sn);
PaymentInfo paymentInfo1 = paymentInfoMapper.selectOne(paymentInfo); return paymentInfo1.getPaymentStatus();
}

@Override
public Map<String, Object> checkPayment(String out_trade_no) { // 调用支付宝,检查支付状态
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest(); Map<String,Object> map = new HashMap<>();
map.put("out_trade_no",out_trade_no);
request.setBizContent(JSON.toJSONString(map));
AlipayTradeQueryResponse response = null;
try {
response = alipayClient.execute(request);
} catch (AlipayApiException e) {
e.printStackTrace();
} if(response.isSuccess()){// isSuccess
System.out.println("调用成功,交易已经创建");
String tradeNo = response.getTradeNo();
String tradeStatus = response.getTradeStatus();
map.put("trade_no",tradeNo);
map.put("trade_status",tradeStatus);
return map;
} else {
System.out.println("调用失败,用户尚未创建交易");//
} return null;
} 以上,就完成了!
05-25 13:52