我正在尝试在持久性数据模式下创建android mqtt客户端。客户端连接正常,并在存在活动网络连接时发送数据。但是,如果网络出现故障,我认为离线期间生成的消息不会发送到服务器。我已经尝试过QOS 1和QOS 2。
我应该捕获未发送的消息并将其存储在SQLite中,然后在网络再次启动时重试吗?
MqttConnection.java
public class MqttConnection implements MqttCallback {
private final static String TAG = MqttConnection.class.getName();
private static MqttConnection instance;
private MqttAndroidClient client;
private Context context;
private String mDeviceId;
private volatile boolean isConnecting = false;
private MqttDefaultFilePersistence mDataStore = null; // Defaults to FileStore
private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore
private MqttConnection(Context context) {
this.context = context;
this.client = null;
}
public static MqttConnection getInstance(Context context) {
Log.d(TAG, ".getInstance() entered");
if (instance == null) {
instance = new MqttConnection(context);
}
return instance;
}
public void connect() {
if (client != null) {
if (isConnecting) {
Log.d(TAG, "Mqtt is connecting");
}
client = null;
}
mDeviceId = "Someid"
setConnectingState(true);
try {
mDataStore = new MqttDefaultFilePersistence(context.getCacheDir().getAbsolutePath());
mMemStore = new MemoryPersistence();
// Construct the MqttClient instance
if (mDataStore != null) {
client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mDataStore);
} else {
client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mMemStore);
}
client.setCallback(this);
MqttActionListener actionLister = new MqttActionListener(context, Constants.ActionStateStatus.CONNECTING);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(1200);
client.connect(options, context, actionLister);
} catch (MqttException e) {
Log.e(TAG, "Exception caught while attempting to connect to server", e.getCause());
}
}
public void disconnect() {
Log.d(TAG, "Disconnected");
}
public void subscribe() {
Log.d(TAG, "subscribe");
}
public void unsubscribe() {
Log.d(TAG, "unsubscribe");
}
public void publish(String message) {
// Log.d(TAG, ".publish() entered");
String topic = mDeviceId;
// check if client is connected
if (isMqttConnected()) {
// create a new MqttMessage from the message string
MqttMessage mqttMsg = new MqttMessage(message.getBytes());
// set retained flag
mqttMsg.setRetained(true);
// set quality of service
mqttMsg.setQos(1);
try {
// create ActionListener to handle message published results
MqttActionListener listener = new MqttActionListener(context, Constants.ActionStateStatus.PUBLISH);
// Log.d(TAG, ".publish() - Publishing " + message + " to: " + topic + ", with QoS: " + qos + " with retained flag set to " + retained);
client.publish(topic, mqttMsg, context, listener);
} catch (MqttPersistenceException e) {
Log.e(TAG, "MqttPersistenceException caught while attempting to publish a message", e.getCause());
} catch (MqttException e) {
Log.e(TAG, "MqttException caught while attempting to publish a message", e.getCause());
}
} else {
connectionLost(null);
}
}
@Override
public void connectionLost(Throwable throwable) {
setConnectingState(false);
Log.d(TAG, "connectionLost");
if (isMqttConnected()) {
Log.d(TAG, "Mqtt is connected");
} else {
Log.d(TAG, "Mqtt conenction is lost");
reconnect();
}
}
private void reconnect() {
if (isConnecting) {
Log.d(TAG, "Mqtt is reconnecting.. hang on");
return;
}
if (isOnline()) {
Log.d(TAG, "We are online so should restart connection");
connect();
return;
} else {
**Log.d(TAG, "we are offline.. should i store the data in SQLite and resend them ???");**
return;
}
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Log.d(TAG, "messageArrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.d(TAG, "deliveryComplete");
}
private boolean isMqttConnected() {
// Log.d(TAG, ".isMqttConnected() entered");
boolean connected = false;
try {
if ((client != null) && (client.isConnected())) {
connected = true;
}
} catch (Exception e) {
// swallowing the exception as it means the client is not connected
}
// Log.d(TAG, ".isMqttConnected() - returning " + connected);
return connected;
}
synchronized void setConnectingState(boolean isConnecting) {
this.isConnecting = isConnecting;
}
private boolean isOnline() {
ConnectivityManager cm =
(ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo netInfo = cm.getActiveNetworkInfo();
return netInfo != null && netInfo.isConnectedOrConnecting();
}
}
最佳答案
首先,根据此https://stackoverflow.com/a/31826706/4615587,只有已经在运行中的消息才以QoS 1/2模式保留。这很有意义,因为它必须与DeliveryTokens一起使用以确保QoS得以维持。
因此,您将需要实现自己的脱机缓冲区。我已经在一个项目中使用持久作业队列https://github.com/path/android-priority-jobqueue做到了这一点。
-诺瓦