SocketListenerPusher.java代码如下:

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.ScheduledThreadPoolExecutor;
  8. import java.util.concurrent.TimeUnit;
  9. import org.apache.commons.configuration.ConfigurationException;
  10. import org.directwebremoting.impl.DaemonThreadFactory;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
  14. import com.shihuan.dragonkeeper.global.ConfigFile;
  15. public class SocketListenerPusher implements Runnable {
  16. protected static Logger logger = LoggerFactory.getLogger(SocketListenerPusher.class);
  17. public static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
  18. private ServerSocket serverSocket;
  19. private ExecutorService pool;
  20. public SocketListenerPusher() {
  21. int port = 0;
  22. int poolsize = 0;
  23. try {
  24. port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
  25. poolsize = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "poolsize"));
  26. serverSocket = new ServerSocket();
  27. serverSocket.setReuseAddress(true);
  28. serverSocket.bind(new InetSocketAddress(port));
  29. pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * poolsize);
  30. //下面两句循环执行run()方法, 相当于while(true){...}
  31. ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
  32. executor.scheduleAtFixedRate(this, 1L, 1L, TimeUnit.MILLISECONDS);
  33. } catch (NumberFormatException e) {
  34. logger.error(e.getMessage(), e);
  35. e.printStackTrace();
  36. } catch (ConfigurationException e) {
  37. logger.error(e.getMessage(), e);
  38. e.printStackTrace();
  39. } catch (IOException e) {
  40. logger.error(e.getMessage(), e);
  41. e.printStackTrace();
  42. }
  43. }
  44. public void run() {
  45. Socket socket = null;
  46. try {
  47. socket = serverSocket.accept();
  48. pool.execute(new SocketListenerHandler(socket));
  49. } catch (IOException e) {
  50. System.out.println("线程池被关闭!!!!!!!!!!!");
  51. pool.shutdown();
  52. logger.error(e.getMessage(), e);
  53. e.printStackTrace();
  54. }
  55. }

SocketListenerHandler.java代码如下:

  1. import java.io.BufferedInputStream;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.io.ObjectInputStream;
  6. import java.net.Socket;
  7. import java.sql.Connection;
  8. import java.sql.SQLException;
  9. import org.apache.commons.configuration.ConfigurationException;
  10. import org.apache.commons.dbutils.DbUtils;
  11. import org.apache.commons.dbutils.QueryRunner;
  12. import org.apache.commons.io.IOUtils;
  13. import org.directwebremoting.Browser;
  14. import org.directwebremoting.ScriptSessions;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import com.alibaba.fastjson.JSON;
  18. import com.shihuan.dragonkeeper.common.dto.DataSourceInfo;
  19. import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
  20. import com.shihuan.dragonkeeper.common.utils.DataSourceMapUtil;
  21. import com.shihuan.dragonkeeper.common.utils.DateFormatterUtil;
  22. import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
  23. import com.shihuan.dragonkeeper.global.ConfigFile;
  24. import com.shihuan.dragonkeeper.server.bean.ActivityServiceBean;
  25. public class SocketListenerHandler implements Runnable {
  26. protected static Logger logger = LoggerFactory.getLogger(SocketListenerHandler.class);
  27. private static String jdbc_CONFIG = ConfigFile.JDBC_CONFIG + ConfigFile.SUFFIX_NAME;
  28. public static final int timeOut = 0*1000 ;  //设置读取操作异常为1秒
  29. private final String dataRealTimeAction_id = "Agentdata_" + Math.random();
  30. private static final String noData = "{'nodata':'心跳信息'}";
  31. private static final String errorData = "{'error':'无法解析的请求'}";
  32. private Socket connectedsocket = null;
  33. public SocketListenerHandler(Socket socket){
  34. this.connectedsocket = socket;
  35. }
  36. @Override
  37. public void run() {
  38. BufferedReader in = null;
  39. String resultData = "";
  40. try {
  41. connectedsocket.setSoTimeout(timeOut);  //表示接收数据时的等待超时数据, 此方法必须在接收数据之前执行才有效. 此外, 当输入流的 read()方法抛出 SocketTimeoutException后, Socket仍然是连接的, 可以尝试再次读数据, 单位为毫秒, 它的默认值为 0(表示会无限等待, 永远不会超时)
  42. connectedsocket.setKeepAlive(false);   //表示对于长时间处于空闲状态的Socket, 是否要自动把它关闭.
  43. in = new BufferedReader(new InputStreamReader(connectedsocket.getInputStream()));
  44. if (in.ready()) {  //判断流中是否有数据
  45. resultData = getNoHeadData(in.readLine());   //从Agent端接收到的数据
  46. logger.info("#### 结果DATA = "+resultData);
  47. if (resultData==null || "".equals(resultData)) {
  48. logger.info(dataRealTimeAction_id + " -->>> " + "内容为空!");
  49. } else if (resultData.charAt(0) != '{') {  //要在客户端定时维持心跳信息
  50. logger.info(dataRealTimeAction_id + " -->>> " + noData);
  51. } else {
  52. ActivityServiceBean asb = JSON.parseObject(resultData, ActivityServiceBean.class);
  53. System.out.println("打印预处理信息Start......");
  54. System.out.println(asb.getProxyname() + " -- " + asb.getIp() + " -- " + asb.getCalltime() + " -- " + asb.getAnswertime() + " -- " + asb.getCpu() + " -- " + asb.getThread() + " -- " + asb.getStatus() + " -- " + asb.getAccessaddress() + " -- " + asb.getAccessfilename() + " -- " + asb.getSql() + " -- " + asb.getContent());
  55. System.out.println("打印预处理信息End......");
  56. //                  parseData(ois);
  57. logger.info(dataRealTimeAction_id + ": 成功处理了接收到的数据!");
  58. }
  59. }
  60. } catch (IOException e) {
  61. logger.error(e.getMessage() + " " + errorData, e);
  62. e.printStackTrace();
  63. } catch (NumberFormatException e) {
  64. logger.error(e.getMessage(), e);
  65. e.printStackTrace();
  66. } finally {
  67. if (in != null) {
  68. try {
  69. in.close();
  70. } catch (IOException e) {
  71. logger.error(e.getMessage(), e);
  72. e.printStackTrace();
  73. }
  74. }
  75. }
  76. }

TestSocketListenerPusher.java请求端代码如下:

  1. import java.io.BufferedOutputStream;
  2. import java.io.IOException;
  3. import java.io.OutputStream;
  4. import java.net.Socket;
  5. import java.net.UnknownHostException;
  6. import java.util.Date;
  7. import org.apache.commons.configuration.ConfigurationException;
  8. import com.alibaba.fastjson.JSON;
  9. import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
  10. import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
  11. import com.shihuan.dragonkeeper.global.ConfigFile;
  12. import com.shihuan.dragonkeeper.server.bean.ActivityServiceBean;
  13. public class TestSocketListenerPusher implements Runnable {
  14. private static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
  15. private Socket socketclient = null;
  16. @Override
  17. public void run() {
  18. String serverip = "";
  19. int port = 0;
  20. OutputStream os = null;
  21. try {
  22. serverip = PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverip");
  23. port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
  24. ActivityServiceBean asb = null;
  25. for (int i=0; i<2; i++) {
  26. asb = new ActivityServiceBean();
  27. asb.setProxyname("testProxyname"+i);
  28. asb.setIp("testIp"+i);
  29. Date curdate = new Date();
  30. asb.setCalltime(curdate);
  31. asb.setAnswertime(curdate);
  32. asb.setCpu("testCpu"+i);
  33. asb.setThread("testThread"+i);
  34. asb.setStatus("testStatus"+i);
  35. asb.setAccessaddress("testAccessaddress"+i);
  36. asb.setAccessfilename("testAccessfilename"+i);
  37. asb.setSql("testSql"+i);
  38. asb.setContent("testContent"+i);
  39. String jsonStr = JSON.toJSONString(asb).trim();
  40. byte[] information = (new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr).getBytes();
  41. System.out.println(information.length);
  42. socketclient = new Socket(serverip, port);
  43. socketclient.setSoTimeout(0);
  44. socketclient.setKeepAlive(false);
  45. os = new BufferedOutputStream(socketclient.getOutputStream());
  46. os.write(information);
  47. os.flush();
  48. System.out.println("Client" + i + " -->>> " + new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr);
  49. os.close();
  50. Thread.sleep(3000);
  51. }
  52. } catch (ConfigurationException e) {
  53. e.printStackTrace();
  54. } catch (UnknownHostException e) {
  55. e.printStackTrace();
  56. } catch (IOException e) {
  57. e.printStackTrace();
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. } finally {
  61. /*
  62. try {
  63. if (os != null) {
  64. os.close();
  65. }
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. */
  70. }
  71. }
  72. public static void main(String[] args) {
  73. Thread t = new Thread(new TestSocketListenerPusher());
  74. t.start();
  75. }
  76. }

源代码在笔者[email protected]邮箱网盘中J2EE代码文件夹里。 

---------------------------------------------------------------------------------- 
如果是按byte[]传输数据的情况,请参考如下代码: 

SimpleSocketServer.java代码如下:

  1. package com.shihuan.socket;
  2. import java.io.BufferedInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.net.InetSocketAddress;
  6. import java.net.ServerSocket;
  7. import java.net.Socket;
  8. public class SimpleSocketServer {
  9. public static void main(String[] args) {
  10. try {
  11. ServerSocket ss = new ServerSocket();
  12. ss.setReuseAddress(true);  //两个进程共用同一个端口的时候,一个进程关闭后,另一个进程还能够立刻重用相同端口
  13. ss.setReceiveBufferSize(128*1024);  //缓冲区中允许接收的最大字节数,默认是8192
  14. ss.bind(new InetSocketAddress(19990));
  15. Socket client = ss.accept();
  16. InputStream in = new BufferedInputStream(client.getInputStream());
  17. byte tmpb = (byte)in.read();
  18. System.out.println("第一个字节的byte值 --->> " + tmpb);
  19. System.out.println("接收字节 --->> " + in.available());
  20. byte[] bc = new byte[in.available()+1];
  21. bc[0] = tmpb;
  22. in.read(bc, 1, in.available());
  23. System.out.println(bc.length);
  24. System.out.println(new String(bc));
  25. in.close();
  26. } catch (IOException e) {
  27. System.out.println(e.getMessage());
  28. e.printStackTrace();
  29. }
  30. }
  31. }

SimpleSocketClient.java代码如下:

  1. package com.shihuan.socket;
  2. import java.io.BufferedOutputStream;
  3. import java.io.IOException;
  4. import java.io.OutputStream;
  5. import java.net.Socket;
  6. import java.net.UnknownHostException;
  7. public class SimpleSocketClient {
  8. public static void main(String[] args) throws UnknownHostException {
  9. try {
  10. Socket s = new Socket("192.168.1.10", 19990);
  11. OutputStream os = new BufferedOutputStream(s.getOutputStream());
  12. String info = "abc!";
  13. info = "大家好!";
  14. byte[] bi = info.getBytes();
  15. os.write(bi);
  16. os.flush();
  17. os.close();
  18. } catch (IOException e) {
  19. System.out.println(e.getMessage());
  20. e.printStackTrace();
  21. }
  22. }
  23. }

稍微复杂一点儿代码示例,处理了粘包问题: 
StartListenerTcpThread.java代码:

  1. import java.io.BufferedInputStream;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.net.InetSocketAddress;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import java.net.SocketAddress;
  8. import java.util.Vector;
  9. import java.util.concurrent.ExcutorService;
  10. import java.util.concurrent.Excutors;
  11. import org.apache.commons.io.IUtils;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
  15. import com.shihuan.dragonkeeper.global.ConfigFile;
  16. public class StartListenerTcpThread implements Runnable {
  17. public static Logger logger = LoggerFactory.getLogger(StartListenerTcpThread.class);
  18. private static ExcutorService Threadpool = Excutors.newCachedThreadPool();
  19. private static boolean businessflag = true;
  20. private static final int receiveBufferSize = 128;
  21. private static Vector<byte[]> tmpbytes = new Vector<byte[]>();
  22. private ServerSocket serverSocket = null;
  23. public StartListenerTcpThread(String ip, int port){
  24. try{
  25. serverSocket = new ServerSocket();
  26. serverSocket.setReuseAddress(true);
  27. serverSocket.setReceiveBufferSize(receiveBufferSize*1024);
  28. serverSocket.setSoTimeout(0);
  29. SocketAddress sa = new InetSocketAddress(port);
  30. serverSocket.bind(sa, 20);
  31. }catch(IOException e){
  32. logger.error(e.getMessage(), e);
  33. }
  34. }
  35. public void run(){
  36. Socket socket = null;
  37. while(true){
  38. if(businessflag){
  39. try{
  40. socket = serverSocket.accept();
  41. System.out.println("New connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
  42. InputStream socketIn = new BufferedInputStream(socket.getInputStream());
  43. byte tmpb = (byte)socketIn.read();
  44. byte[] currentbytes = null;
  45. if(tmpbytes.size() > 0){  //上一次IO流中有未处理的剩余包
  46. int oldBytesLen = tmpbytes.get(0).length;
  47. int socketBytesLen = socketIn.available()+1;
  48. int currentLength = oldByteLen + socketBytesLen;
  49. currentbytes = new byte[currentLength];
  50. System.arraycopy(tmpbytes.get(0), 0, currentbytes, oldBytesLen);
  51. currentbytes[oldBytesLen] = tmpb;
  52. socketIn.read(currentbytes, oldBytesLen+1, socketBytesLen-1);
  53. socketIn.close();
  54. splitInputStreamByte(currentbytes);
  55. }else{  //正常未粘包情况
  56. int socketBytesLen = socketIn.available()+1;
  57. currentbytes = new byte[socketBytesLen];
  58. currentbytes[0] = tmpb;
  59. socketIn.read(currentbytes, 1, socketBytesLen-1);
  60. socketIn.close();
  61. splitInputStreamByte(currentbytes);
  62. }
  63. }catch(IOException e){
  64. logger.error(e.getMessage(), e);
  65. }
  66. }
  67. }
  68. }
  69. /**
  70. * 拆分byte数组并分多线程处理
  71. * @param parambytes 原byte数组
  72. * @return 处理后剩余部分的byte数组
  73. */
  74. private static void splitInputStreamByte(byte[] parambytes) {
  75. if(parambytes != null){
  76. if(parambytes.length > 4){
  77. byte[] head = new byte[4];  //单包长度
  78. System.arraycopy(parambytes, 0, head, 0, 4);
  79. int bodyLength = ByteArrayUtil.getint(head);
  80. if(bodyLength <= parambytes.length-4){
  81. final byte[] body = new byte[bodyLength];
  82. System.arraycopy(parambytes, 4, body, 0, bodyLength);
  83. ThreadPool.execute(new Runnable(){
  84. public void run(){
  85. byte[] processDatas = body;
  86. try{
  87. System.out.println(IOUtils.toString(processDatas, "UTF-8").trim());
  88. }catch(IOException e){
  89. logger.error(e.getMessage(), e);
  90. }
  91. }
  92. });
  93. int resultLen = parambytes.length-4-bodyLength;
  94. if(resultLen == 0){
  95. splitInputStreamByte(null);
  96. }else{
  97. byte[] resultbytes = new byte[resultLen];
  98. System.arraycopy(parambytes, 4+bodyLength, resultbytes, 0, resultLen);
  99. splitInputStreamByte(resultbytes);
  100. }
  101. }else{
  102. tmpbytes.clear();
  103. tmpbytes.add(parambytes);
  104. }
  105. }else{
  106. tmpbytes.clear();
  107. tmpbytes.add(parambytes);
  108. }
  109. }
  110. }
  111. public static void openflag(){
  112. businessflag = true;
  113. }
  114. public static void closeflag(){
  115. businessflag = false;
  116. }
  117. }

TestTcpSocket.java代码:

  1. import java.io.IOException;
  2. import java.io.OutputStream;
  3. import java.net.Socket;
  4. import java.net.UnknownHostException;
  5. import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
  6. import com.shihuan.dragonkeeper.global.ConfigFile;
  7. public class TestTcpSocket implements Runnable{
  8. private Socket socketClient = null;
  9. public void run(){
  10. String serverip = "192.168.1.10";
  11. int port = 19990;
  12. try{
  13. while(true){
  14. System.out.println("SocketClient start......");
  15. String mystr = "hello everyone!";
  16. socketClient = new Socket(serverip, port);
  17. OutputStream os = socketClient.getOutputStream();
  18. byte[] head = ByteArrayUtil.int2byte(mystr.length());
  19. byte[] body = mystr.getBytes();
  20. byte[] total = ByteArrayUtil.byteMerge(head, body);
  21. os.write(total);
  22. os.flush();
  23. os.close();
  24. Thread.sleep(1000);
  25. System.out.println("SocketClient end......");
  26. }
  27. }catch(Exception e){
  28. logger.error(e.getMessage(), e);
  29. }
  30. }
  31. public static void main(String[] args){
  32. Thread t = new Thread(new TestTcpSocket());
  33. t.start();
  34. }
  35. }

下面写ByteArrayUtil.java代码:

  1. package com.shihuan.dragonkeeper.common.utils;
  2. public class ByteArrayUtil {
  3. /**
  4. * 将int型的数据类型转换成byte[]类型
  5. */
  6. public static final byte[] int2byte(int paramInt){
  7. byte[] resultByte = new byte[4];
  8. resultByte[3] = ((byte)(paramInt & 0xFF));
  9. resultByte[2] = ((byte)(paramInt >>> 8 & 0xFF));
  10. resultByte[1] = ((byte)(paramInt >>> 16 & 0xFF));
  11. resultByte[0] = ((byte)(paramInt >>> 24 & 0xFF));
  12. return resultByte;
  13. }
  14. /**
  15. * 将byte型的数据类型转换成int类型
  16. */
  17. public static final int getint(byte[] paramArrayOfByte){
  18. int result = (paramArrayOfByte[0] & 0xFF) << 24 | (paramArrayOfByte[1] & 0xFF) << 16 | (paramArrayOfByte[2] & 0xFF) << 8 | paramArrayOfByte[3] & 0xFF;
  19. return result;
  20. }
  21. /**
  22. * 合并两个byte数组到一个byte数组中
  23. */
  24. public static byte[] byteMerge(byte[] byte1, byte[] byte2){
  25. byte[] result = new byte[byte1.length+byte2.length];
  26. System.arraycopy(byte1, 0, result, 0, byte1.length);
  27. System.arraycopy(byte2, 0, result, byte1.length, byte2.length);
  28. return result;
  29. }
  30. }

http://blog.csdn.net/defonds/article/details/8782785

05-01 00:11