Netty框架的使用
1 TCP开发范例
发送地址---192.168.31.241
发送端口号---9223
发送数据
{
"userid":"[email protected]",
"devicetype":3,
"accounttype":0,
"username":"",
"password":"e10adc3949ba59abbe56e057f20f883e",
"meiid":1000217,
"deviceid":"864376025909275"
}
接受数据
{
"message":"登录成功",
"sessionkey":"EF81E1BD132D40DE8F1707A521D8B5A6",
"mainsn":"C001B00010000002",
"code":0
}
2 上代码
1 业务层代码
public class MainActivity extends Activity { private Base1106Entity entity1106;// 登录云棒协议 public static final int RESPONSE_SUCCESS = 0x401;
public static final int RESPONSE_FAIL = 0x402;
public static final int RESPONSE_TIMEOUT = 0x403;
public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; //心跳超时
public static final int NOT_LOGIN= 0x411; //用户未登录 public Handler mHandler = new Handler() { @Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case RESPONSE_SUCCESS:
IEntity entity = (IEntity) msg.obj;
if (entity != null) {
responseSuccess((IEntity) msg.obj);
} else {
responseFail(-1, "返回数据为空!");
}
break;
case RESPONSE_FAIL:// 请求失败
if (msg != null && msg.obj != null)
responseFail(-10001, (String) msg.obj);
break;
case RESPONSE_TIMEOUT:// 请求超时
if (msg != null && msg.obj != null)
responseFail(-10000, (String) msg.obj);
break;
case NOT_LOGIN:// 用户未登录
if (msg != null && msg.obj != null)
responseFail(-10002, (String) msg.obj);
break;
}
}
}; @Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main); Button login = (Button)findViewById(R.id.login);
login.setOnClickListener(new View.OnClickListener() { @Override
public void onClick(View v) {
reqEntity1106();
}
}); } public void reqEntity1106() {
entity1106 = new Base1106Entity();
entity1106.setMeiid(1000217);
entity1106.setUserid("[email protected]");
entity1106.setUsername("");
entity1106.setPassword("e10adc3949ba59abbe56e057f20f883e");
entity1106.setAccounttype( 0 );
entity1106.setDevicetype(3);
entity1106.setDeviceid("864376025909275");
entity1106.setHandler(mHandler);
ClientConnectFactory.getInstance().sendEntity(entity1106);
} public void responseSuccess(IEntity entity) {
Toast.makeText(MainActivity.this, ((Base1106Entity)entity).toString(), Toast.LENGTH_LONG).show();
} public void responseFail(int code, String msg) {
Toast.makeText(MainActivity.this, msg, Toast.LENGTH_SHORT).show();
} }
public class MeiApp extends Application{ public static Context mContext; @Override
public void onCreate() {
super.onCreate();
mContext = this; ClientConnectFactory.getInstance().init(mContext);
} }
2 业务通讯层代码
public interface IClientConnect { public void isConnect(String netType); public void sendAgain(); public void sendMsgFail(String netType, byte[] msg); public void connectFail(String netType); // 根据实体发送数据
public void sendEntity(IEntity entity); public void sendByte(byte[] b); // 关闭
public void isClose(); // 清除当前数据
public void isClearMsg(); public void callBack(PackageHeader header, byte[] data, String desc, int type); public void callBack(IEntity entity, String desc);
}
public abstract class BaseClientMgr extends Subject implements IClientConnect { protected boolean isRunning; // 当前是否正在连接
protected boolean isSending; // 是否正在发送 线程是否被占用
private int mPort; // 连接服务器的端口号
private int mCommunication; // 通讯类型
private int heartTimeOutCount = 0; // 记录心跳超时次数
protected int function = 1200; // 关闭连接功能号 public static final int RESPONSE_SUCCESS = 0x401;
public static final int RESPONSE_FAIL = 0x402;
public static final int RESPONSE_TIMEOUT = 0x403;
public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; // 心跳超时
public static final int NOT_LOGIN = 0x411; // 用户未登录 private String mConnectKey = "BasicServicesMgr";
private String mHost; // 连接服务器的IP地址
protected ArrayList<IEntity> mEntityMsg = null; // 待发送消息集合 protected Context mContext; // Context对象
protected CommunicationThreadManager mManager; // 该通讯层管理器
protected ParseByteThread mParseByteThread = null; // 数据解析线程
protected ExecutorService executor; // 线程连接池 protected BaseClientMgr(String host, int port, String key) {
init(host, port, key);
} // 初始化
private void init(String host, int port, String key) {
this.mContext = MeiApp.mContext;
isRunning = false;
isSending = false;
mHost = host;
mPort = port;
mConnectKey = key;
mEntityMsg = new ArrayList<IEntity>();
executor = Executors.newFixedThreadPool(10);
mParseByteThread = new ParseByteThread(this);
executor.execute(mParseByteThread);
} protected Handler basicHandler = new Handler() { @Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case ClientConstants.REQUEST:
// 发送请求 连接占用
if (mEntityMsg != null && mEntityMsg.size() > 0) {
isSending = true;
// 清除handler的消息
basicHandler.removeMessages(ClientConstants.REQUEST);
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
// 请求类型 当为网络请求时判断网络状态 建立连接
// 检查连接是否可用
if (isRunning) {
// 直接发送消息
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
} else {
// 建立连接
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
Message msgCreate = Message.obtain();
msgCreate.what = ClientConstants.REQUEST_CREATE_CONNECT;
msgCreate.arg1 = 0;
basicHandler.sendMessage(msgCreate);
} }
break;
case ClientConstants.REQUEST_CREATE_CONNECT:
// 建立连接
Log.i("mbk", "建立连接!");
isConnect("netty"); break;
case ClientConstants.REQUEST_SEND_MESSAGE:
// 发送消息
Log.i("mbk", "发送消息!");
if (isRunning) {
if (mEntityMsg.size() > 0) {
Log.i("mbk", "发送数据!");
sendData(mEntityMsg.get(0));
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
// 设置请求超时
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_TIMEOUT, 3000);
} else {
Log.i("mbk", "数据发送完成!");
isSending = false;
}
} else {
// 重新建立连接
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
basicHandler.sendEmptyMessage(ClientConstants.REQUEST_CREATE_CONNECT);
}
break;
case ClientConstants.REQUEST_SEND_HEARTBEAT:
Log.i("mbk", "发送心跳!");
mManager.sendHeart(function);
heartTimeOutCount++;
Log.i("lzy02", "heartTimeOutCount---------------" + heartTimeOutCount);
if (heartTimeOutCount >= 3) {// 大于等于3则认为与云棒无连接
callBack(null, null, "心跳超时!", REQUEST_HEARTBEAT_TIMEOUT);
}
// // 发送心跳
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 3000); break;
case ClientConstants.REQUEST_TIMEOUT:// 请求超时
Log.i("mbk", "请求超时!");
isRunning = false;
callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
break; }
}
}; public void sendHeartbeat(int function) {
this.function = function;
} public void sendData(IEntity entity) {
sendByte(ClientSocketUtils.sendDatas(mEntityMsg.get(0)));
} // 建立连接
@Override
public void isConnect(String netType) {
UdpEntity udpEntity = null;
int type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY;
if (netType.equals("netty")) {
// 建立一个netty连接
type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY; mManager = new CommunicationThreadManager(mContext, null, mConnectKey, "192.168.31.241", mPort, type, mCommunicationCallBack); Log.i("mbk", "发送地址---" + "192.168.31.241");
Log.i("mbk", "发送端口号---" + mPort); /*
* if (udpEntity != null) { Log.i("lzy02",
* "udpEntity---209----------udpEntity=="+udpEntity.getYunbangIp());
* mManager = new CommunicationThreadManager(mContext, null, mConnectKey,
* "192.168.31.241", mPort, type, mCommunicationCallBack);
* //Toast.makeText(mContext, "已通过Netty发送 ", Toast.LENGTH_SHORT).show();
* Log.i("mbk","netty发送云棒IP号---" + udpEntity.getYunbangIp()); } else {
* Log.i("lzy02", "udpEntity---211----------udpEntity == null");
* callBack(null, null, "无法连接netty!", RESPONSE_FAIL); }
*/
// 使用netty是时候 清理p2p
P2pClearUp();
} else { }
Log.i("mbk", "初始化 连接服务器!" + netType);
} @Override
public void sendByte(byte[] b) {
try {
if (mManager != null) {
mManager.sendDataToServer(new SendData(b));
} else {
isClose();
}
} catch (InterruptedException e) {
isClose();
}
} // 服务端回调
private CommunicationCallBack mCommunicationCallBack = new CommunicationCallBack() { @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Log.i("mbk", "--------------------------请求异常--------------------------" + mCommunication);
isRunning = false;
callBack(null, null, "请求异常!", RESPONSE_FAIL); } @Override
public void connected(ChannelHandlerContext ctx) {
Log.i("mbk", "--------------------------连接成功--------------------------" + mCommunication);
// mChx = ctx;
isRunning = true;
sendAgain();
} @Override
public void connectFailure(Exception e) {
Log.i("mbk", "--------------------------连接服务器失败--------------------------" + mCommunication);
isRunning = false;
callBack(null, null, "连接服务器失败!", RESPONSE_FAIL);
} @Override
public void channelRead(ChannelHandlerContext ctx, byte[] msg) {
Log.i("mbk", "--------------------------服务端返回--------------------------" + mCommunication);
if (mParseByteThread != null) {
mParseByteThread.sendParseByte(msg);
}
} @Override
public void communicationOutTime() {
Log.i("mbk", "--------------------------连接超时--------------------------" + mCommunication);
isRunning = false;
callBack(null, null, "连接超时!", RESPONSE_TIMEOUT);
} @Override
public void questTimeOut() {
Log.i("mbk", "--------------------------请求超时--------------------------" + mCommunication);
isRunning = false;
callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
}
}; @Override
public void sendAgain() {
// 连接成功 发起请求
Log.i("mbk", "连接成功,数据重新发送!"); // basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_MESSAGE, 500);
} // 接收需要发送的实体
@Override
public void sendEntity(IEntity entity) {
if (mEntityMsg != null && entity != null) {
mEntityMsg.add(entity);
if (!isSending) {
// 启动一个发送
Log.i("mbk", "发起请求!REQUEST_NET");
basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
}
}
// if (mEntityMsg != null && mEntityMsg.size() == 2) {
// mEntityMsg.remove(1);
// } } @Override
public void callBack(PackageHeader header, byte[] data, String desc, int type) {
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT); switch (type) {
case RESPONSE_SUCCESS:
heartTimeOutCount = 0;
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 20000);
switch (header.getFunction()) {
case 9998:
Log.i("mbk", "服务端关闭!");
isClose();
break;
case 9999:
Log.i("mbk", "成功返回一个心跳!");
break;
case 999:
Log.i("mbk", "未知错误!");
callBack(null, null, "未知错误", RESPONSE_FAIL);
break;
default:
responseSuccess(header, data, desc, type);
break;
}
break;
case REQUEST_HEARTBEAT_TIMEOUT:// 心跳超时3次认为与云棒无连接
/*
* Intent m2Intent = new Intent(MeiConfigs.NETWORK_PROMPT);
* m2Intent.putExtra("islogin", "3003");
* MeiApp.mContext.sendBroadcast(m2Intent);
*/
break;
case RESPONSE_FAIL:
responseFail(header, data, desc, type);
break;
case RESPONSE_TIMEOUT:
responseFail(header, data, desc, type);
break;
}
} // 请求成功
public void responseSuccess(PackageHeader header, byte[] data, String desc, int type) { try {
if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
IEntity entity = mEntityMsg.get(0);
if (data != null && data.length > 0) {
entity.onDecode(new String(data, "utf-8"));
// Log.i("mbk","云棒返回---" + "---" + new String(data, "utf-8"));
// 请求成功 Log.i("lzy02", "1--------------" + entity.getCode());
Log.i("mbk", "返回一条数据!");
Message msg = Message.obtain();
msg.obj = entity;
msg.arg1 = header.getFunction();
msg.what = type;
entity.getHandler().sendMessage(msg);
}
}
} catch (Exception e) {
e.printStackTrace();
isClose();
}
if (mEntityMsg != null && mEntityMsg.size() > 0) {
mEntityMsg.remove(0);
}
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
isSending = false;
if (mEntityMsg.size() > 0) {
basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
}
} // 请求失败
public void responseFail(PackageHeader header, byte[] data, String desc, int type) {
Log.i("mbk", "请求失败! " + desc);
Message msg = Message.obtain();
msg.obj = desc;
msg.arg1 = 0;
msg.what = type;
if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
mEntityMsg.get(0).getHandler().sendMessage(msg);
}
isClose();
} // 请求本地缓存返回
@Override
public void callBack(IEntity entity, String desc) {
Log.i("mbk", "回一返个缓存数据! ");
if ("cache".equals(desc)) {
if (entity != null && entity.getHandler() != null) {
Message msg = Message.obtain();
msg.obj = entity;
msg.what = RESPONSE_SUCCESS;
entity.getHandler().sendMessage(msg);
}
}
} public void P2pClearUp() {
if (mManager != null) {
mManager.p2pCleanup();
}
} @Override
public void isClose() {
Log.i("mbk", "关闭连接!" + isRunning);
if (mManager != null) {
if (isRunning) {
try {
mManager.sendDataToServer(new SendData(ClientSocketUtils.sendExit(function)));
} catch (InterruptedException e) {
}
} else {
mManager.closeTheadManager();
mManager = null;
}
}
if (mParseByteThread != null)
mParseByteThread.closeThread();
if (mEntityMsg != null) {
mEntityMsg.clear();
}
P2pClearUp();
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
isRunning = false;
isSending = false;
} @Override
public void sendMsgFail(String netType, byte[] msg) {
} @Override
public void connectFail(String netType) {
} @Override
public void isClearMsg() {
if (mEntityMsg != null) {
mEntityMsg.clear();
}
} }
public class BasicServicesMgr extends BaseClientMgr { public static BasicServicesMgr instance = null; public static BasicServicesMgr getInstance() {
if (instance == null) {
instance = new BasicServicesMgr();
}
return instance;
} private BasicServicesMgr() {
super( "192.168.43.1", 9223, ClientConnectorManager.BASIC_SERVICES_MGR_KEY);
} //接收需要发送的实体
@Override
public void sendEntity(IEntity entity) {
if (entity != null) { // 请求列表每次最多保存两个请求
if (mEntityMsg != null && mEntityMsg.size() == 2) {
mEntityMsg.remove(1);
}
mEntityMsg.add(entity);
if (!isSending) {
// 启动一个发送
isSending = true;
basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
} }
}
}
public interface Observer { //更新接口
public void update(IEntity state);
}
class ParseByteThread implements Runnable { private byte[] bufHeader = null;
private byte[] readData = null;
private PackageHeader header = null;
private int headerLenth = PackageHeader.headerLenth;
private int readDataLenth = 0;
private int sLength = 0;// 添加到数组的长度
private Handler fileParseHandler = null;
private IClientConnect connect; public static final int RESPONSE_SUCCESS = 0x401;
public static final int RESPONSE_FAIL = 0x402;
public static final int RESPONSE_TIMEOUT = 0x403;
/** 心跳超时 */
public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410;
/** 用户未登录 */
public static final int NOT_LOGIN= 0x411; public Handler getFileParseHandler() {
return this.fileParseHandler;
} public void sendParseByte(byte[] msg) {
if (fileParseHandler != null) {
Message msgData = Message.obtain();
msgData.obj = msg;
fileParseHandler.sendMessage(msgData);
}
} public ParseByteThread(IClientConnect connect) {
readDataLenth = 0;
sLength = 0;
headerLenth = PackageHeader.headerLenth;
bufHeader = new byte[PackageHeader.headerLenth];
readData = null;
header = new PackageHeader();
this.connect = connect;
} public void setFileParseHandler(Handler fileParseHandler) {
this.fileParseHandler = fileParseHandler;
} public void closeThread(){
readDataLenth = 0;
sLength = 0;
headerLenth = PackageHeader.headerLenth;
bufHeader = new byte[PackageHeader.headerLenth];
readData = null;
header = new PackageHeader();
}
@Override
public void run() { Looper.prepare();
fileParseHandler = new Handler() {
public void handleMessage(Message data) {
synchronized (data) {
byte[] msg = (byte[]) data.obj;
if (msg == null) {
return;
}
int msgLength = msg.length;
int useLength = 0;// 已经使用的长度
while (msgLength - useLength > 0) {
// 读取包头
if (readDataLenth == 0) {
if (msgLength - useLength >= headerLenth - sLength) {
// 读取了一个完整的包头
System.arraycopy(msg, useLength, bufHeader, sLength, headerLenth - sLength);
useLength += (headerLenth - sLength);
sLength = 0;
header.setPackageHeader(bufHeader);
if (header.getFunction() > 10000 || header.getFunction() < 999) {
// 包头不符合,跳出循环 放弃整包
connect.callBack(null, null, "包头不符合", RESPONSE_FAIL);
break;
}
if (header.getFunction() != 9999 && header.getFunction() != 9998) {
readDataLenth = (int) header.getInclusionLenth();
readData = null;
readData = new byte[readDataLenth];
} else if (header.getFunction() == 9999) {
// 发送心跳包
connect.callBack(header, readData, "", RESPONSE_SUCCESS);
} else if (header.getFunction() == 9998) {
msgLength = 0;
useLength = 0;
connect.callBack(header, readData, "", RESPONSE_SUCCESS);
}
} else { System.arraycopy(msg, useLength, bufHeader, sLength, msgLength - useLength);
sLength += (msgLength - useLength);
break;
}
}
// 读取包体
else {
if (msgLength - useLength >= readDataLenth - sLength) {
// 读取了一个完整的包体
System.arraycopy(msg, useLength, readData, sLength, readDataLenth - sLength);
useLength += (readDataLenth - sLength);
sLength = 0;
readDataLenth = 0;
bufHeader = null;
bufHeader = new byte[PackageHeader.headerLenth];
// 解析成功 返回数据
try {
connect.callBack(header, readData, "", RESPONSE_SUCCESS);
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.arraycopy(msg, useLength, readData, sLength, msgLength - useLength);
sLength += (msgLength - useLength);
break;
}
}
}
}
}
};
Looper.loop();
}
}
public abstract class Subject { //用来保存注册的观察者对象
private List<Observer> list = new ArrayList<Observer>(); private Handler subHandler = new Handler(MeiApp.mContext.getMainLooper()) {
public void handleMessage(Message msg) {
if (list != null && list.size() > 0) {
for (int i = 0; i < list.size(); i++) {
list.get(i).update((IEntity) msg.obj);
}
}
}
}; //注册观察者对象
public void attach(Observer observer) {
if (list != null) {
list.add(observer);
}
} //删除观察者对象
public void detach(Observer observer) {
if (list != null && list.size() > 0 && observer != null) {
list.remove(observer); }
} //删除观察者对象
public void clear() { if (list != null && list.size() > 0) {
list.clear();
} } //通知所有注册的观察者对象
public void nodifyObservers(final IEntity newState) { new Thread(new Runnable() { @Override
public void run() {
Message msg = Message.obtain();
msg.obj = newState;
subHandler.sendMessage(msg); }
}).start(); }
}
代码见https://github.com/huanyi0723/NettyTest