我正在尝试在持久性数据模式下创建android mqtt客户端。客户端连接正常,并在存在活动网络连接时发送数据。但是,如果网络出现故障,我认为离线期间生成的消息不会发送到服务器。我已经尝试过QOS 1和QOS 2。

我应该捕获未发送的消息并将其存储在SQLite中,然后在网络再次启动时重试吗?

MqttConnection.java

  public class MqttConnection implements MqttCallback {

      private final static String TAG = MqttConnection.class.getName();
      private static MqttConnection instance;
      private MqttAndroidClient client;
      private Context context;
      private String mDeviceId;
      private volatile boolean isConnecting = false;
      private MqttDefaultFilePersistence mDataStore = null; // Defaults to FileStore
      private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore

      private MqttConnection(Context context) {
          this.context = context;
          this.client = null;
      }

      public static MqttConnection getInstance(Context context) {
          Log.d(TAG, ".getInstance() entered");
          if (instance == null) {
              instance = new MqttConnection(context);
          }
          return instance;
      }

      public void connect() {
          if (client != null) {
              if (isConnecting) {
                  Log.d(TAG, "Mqtt is connecting");
              }
              client = null;
          }
          mDeviceId = "Someid"
          setConnectingState(true);
          try {
              mDataStore = new MqttDefaultFilePersistence(context.getCacheDir().getAbsolutePath());
              mMemStore = new MemoryPersistence();
              // Construct the MqttClient instance
              if (mDataStore != null) {
                  client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mDataStore);
              } else {
                  client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mMemStore);
              }
              client.setCallback(this);
              MqttActionListener actionLister = new MqttActionListener(context, Constants.ActionStateStatus.CONNECTING);
              MqttConnectOptions options = new MqttConnectOptions();
              options.setCleanSession(false);
              options.setKeepAliveInterval(1200);
              client.connect(options, context, actionLister);
          } catch (MqttException e) {
              Log.e(TAG, "Exception caught while attempting to connect to server", e.getCause());
          }
      }


      public void disconnect() {
          Log.d(TAG, "Disconnected");
      }

      public void subscribe() {
          Log.d(TAG, "subscribe");
      }

      public void unsubscribe() {
          Log.d(TAG, "unsubscribe");
      }

      public void publish(String message) {
  //        Log.d(TAG, ".publish() entered");
          String topic = mDeviceId;
          // check if client is connected
          if (isMqttConnected()) {
              // create a new MqttMessage from the message string
              MqttMessage mqttMsg = new MqttMessage(message.getBytes());
              // set retained flag
              mqttMsg.setRetained(true);
              // set quality of service
              mqttMsg.setQos(1);
              try {
                  // create ActionListener to handle message published results
                  MqttActionListener listener = new MqttActionListener(context, Constants.ActionStateStatus.PUBLISH);
  //                Log.d(TAG, ".publish() - Publishing " + message + " to: " + topic + ", with QoS: " + qos + " with retained flag set to " + retained);
                  client.publish(topic, mqttMsg, context, listener);
              } catch (MqttPersistenceException e) {
                  Log.e(TAG, "MqttPersistenceException caught while attempting to publish a message", e.getCause());
              } catch (MqttException e) {
                  Log.e(TAG, "MqttException caught while attempting to publish a message", e.getCause());
              }
          } else {
              connectionLost(null);
          }
      }

      @Override
      public void connectionLost(Throwable throwable) {
          setConnectingState(false);
          Log.d(TAG, "connectionLost");
          if (isMqttConnected()) {
              Log.d(TAG, "Mqtt is connected");
          } else {
              Log.d(TAG, "Mqtt conenction is lost");
              reconnect();
          }
      }

      private void reconnect() {
          if (isConnecting) {
              Log.d(TAG, "Mqtt is reconnecting.. hang on");
              return;
          }
          if (isOnline()) {
              Log.d(TAG, "We are online so should restart connection");
              connect();
              return;
          } else {
**Log.d(TAG, "we are offline.. should i store the data in SQLite and resend them ???");**
              return;
          }
      }

      @Override
      public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
          Log.d(TAG, "messageArrived");
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
          Log.d(TAG, "deliveryComplete");
      }

      private boolean isMqttConnected() {
  //        Log.d(TAG, ".isMqttConnected() entered");
          boolean connected = false;
          try {
              if ((client != null) && (client.isConnected())) {
                  connected = true;
              }
          } catch (Exception e) {
              // swallowing the exception as it means the client is not connected
          }
  //        Log.d(TAG, ".isMqttConnected() - returning " + connected);
          return connected;
      }

      synchronized void setConnectingState(boolean isConnecting) {
          this.isConnecting = isConnecting;
      }

      private boolean isOnline() {
          ConnectivityManager cm =
                  (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
          NetworkInfo netInfo = cm.getActiveNetworkInfo();
          return netInfo != null && netInfo.isConnectedOrConnecting();
      }
  }

最佳答案

首先,根据此https://stackoverflow.com/a/31826706/4615587,只有已经在运行中的消息才以QoS 1/2模式保留。这很有意义,因为它必须与DeliveryTokens一起使用以确保QoS得以维持。

因此,您将需要实现自己的脱机缓冲区。我已经在一个项目中使用持久作业队列https://github.com/path/android-priority-jobqueue做到了这一点。

-诺瓦

10-04 20:41