我正在使用Java处理从Arduino获取传感器数据,并通过使用MQTT协议将它们保存到MySql数据库中。我成功将数据发布到我的主题


家庭/温度
家庭/湿度


并订阅所有主题


家/#


我收到了所有消息。现在,我想将这些值保存到数据库中。

我的问题是:

如何获取适当的数据并根据其主题(家庭/温度和家庭/湿度)进行保存?
谢谢

这是我的代码:

 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  import de.bezier.data.sql.*;
 import processing.serial.*;
  import java.util.regex.*;
  Serial myPort;
  String value;
    String broker;
   String topic;
  String topic1;
  String topic2;
  String payloadtemp;
   String payloadhum;
   MqttClient myClient;
    MySQL DB;
    String tempsensor_ID;
     String humsensor_ID;
     MqttMessage messagetemp ;
     MqttMessage messagehum ;

     void setup()
        {
     String portlisten= Serial.list()[0];
     myPort = new Serial( this, portlisten, 115200 );
    broker   = "tcp://192.192.191.198:1883";//;
     topic = "home/#";
    topic1="home/temperatureValue" ;
     topic2= "home/HumidityValue";
     tempsensor_ID="temp_ID";
      humsensor_ID="hum_ID";
      String user = "root";
      String pass = "";
      String database = "iot";
      DB= new MySQL(this, "localhost", database , user , pass);


          }
       void draw(){

       if (myPort.available() > 0 ) {
        value = myPort.readString();
        String tempdata = trim(value);
         String humdata=trim(value);
            String patterntemp= "Temperature:";
           String patterntempunit="C";
            String patternhum="Humidity:";
               String patternhumunit="%";
            Pattern ptemp= Pattern.compile(Pattern.quote(patterntemp) + "             (.*?)" + Pattern.quote(patterntempunit));
         Matcher mtemp= ptemp.matcher(tempdata);
             while(mtemp.find()){
            tempdata=mtemp.group(1);
              payloadtemp=tempdata;

          }

         Pattern phum= Pattern.compile(Pattern.quote(patternhum) + "(.*?)" +               Pattern.quote(patternhumunit));
          Matcher mhum= phum.matcher(humdata);
           while(mhum.find()){
           humdata=mhum.group(1);
               payloadhum=humdata;

                }

              try {
          myClient = new MqttClient(broker, MqttClient.generateClientId());
     myClient.connect();
            messagetemp = new MqttMessage((" " + payloadtemp).getBytes());
            messagehum = new MqttMessage((" " + payloadhum).getBytes());



             if(payloadtemp!=(null) & payloadhum!=(null)){

                 myClient.publish(topic1, messagetemp);
                  myClient.subscribe(topic1);

                       myClient.publish(topic2, messagehum);
                     myClient.subscribe(topic2);


          //myClient.subscribe(topic); //wildcard (topic) used when  subscribing to topics but not allowed when publishing a msg
                       }

                if ( DB.connect() )
               {
               myClient.setCallback(new MqttCallback() {
                  @Override
                public void messageArrived(String arg0, MqttMessage arg1) {

                     String msgrec= new String(arg1.getPayload());

                        println("RESULT " + arg0 + " - " + msgrec + " ");


                    throw new RuntimeException();

                            }

                        @Override
                     public void deliveryComplete(IMqttDeliveryToken arg0) {


                           println("DELIVERY " + arg0);

                           throw new RuntimeException();
                               }

                          @Override
                      public void connectionLost(Throwable arg0) {
                           throw new RuntimeException();
                         }
                          });


                           if(payloadtemp!=(null) & payloadhum!=(null)){


                     /*DB.execute("INSERT INTO `temperature`(`idsensor`,`temperaturevalue`) VALUES ('"+tempsensor_ID+ "','" +      messageReceived(topic1,messagetemp.getPayload()) + "');");
                    DB.execute("INSERT INTO `light`(`idsensor`,`lightvalue`) VALUES ('"+lightsensor_ID+ "','" +   messageReceived(topic2,messagehum.getPayload())+ "');");
                    */
                    }
                    }
                  else
                   {
                println("Error in the connection :-( ");
                      }
                                 }
                        catch(MqttException mex) {

                      System.out.println("Error: "+mex.getMessage());
                            mex.printStackTrace();
                     }
                           }

                              }

最佳答案

我假设您正在实现PAHO客户端。

那么您需要使用messageArrived回调中的参数来检查是否已了解有关主题...

client.setCallback(new MqttCallback() {

      @Override
      public void messageArrived(String topicInforming, MqttMessage mqttMessage) throws Exception {
            subscriptionMessageCallback.onNewMessage(mqttMessage.toString());
      }
   ....


在此摘要主题中,通知是告诉您是否收到有关温度或湿度的消息,

对于数据库,您将需要一个连接器来处理插入事务...

您需要定义每条消息是否应该在同一张表中,由您自己决定,并取决于体系结构的外观

关于java - 如何保存MQTT订阅的所有主题的数据,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42501746/

10-09 06:47
查看更多