昨天整理旧代码的时候,发现了一个线程池创建单例的工具类,写法如下:
public class ThreadPoolUtil {
/**
* 使用有界队列,避免OOM
*/
private static ExecutorService consumerExecutor;
private ThreadPoolUtil() {
}
/**
* 获取线程池实例
*
* @return 线程池对象
*/
public static ExecutorService getInstance() {
if (null == consumerExecutor) {
System.out.println(System.currentTimeMillis() + " 初始化:" + Thread.currentThread().getName());
consumerExecutor = new ThreadPoolExecutor(15, 20, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.DiscardPolicy());
}
return consumerExecutor;
}
}
乍一看确实没有问题,在非多线程的环境下,运行也是良好,但是实际情况是,在多线程环境下,这段代码会重复初始化多次,下面举个例子说明
公司的业务是物联网方面的,服务端使用了netty 编写的tcp server,维持和设备通信,这个server大致如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ByteBuf byteBuf = (ByteBuf) msg;
// byte[] data = new byte[byteBuf.readableBytes()];
// byteBuf.readBytes(data);
// logger.info("data:{}", ByteUtil.byteToHexString(data, " "));
// uid = new String(data);
// logger.info("data:{}", uid);
// if (AttributeKey.exists(ctx.channel().id().asShortText())) {
// ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().asShortText())).set(uid);
// } else {
// ctx.channel().attr(AttributeKey.newInstance(ctx.channel().id().asShortText())).set(uid);
// }
// ChannelHanlderGroup.getChannelGroup().add(ctx.channel());
// logger.info("number:{}", ChannelHanlderGroup.getChannelGroup().size());
ThreadPoolUtil.getInstance().execute(new Runnable() {
@Override
public void run() {
System.out.println("举个例子,处理耗时逻辑");
}
});
}
因为设备端有超时和断线重连的逻辑,当我把netty server重启之后,瞬时间会有大量的链接上来,会重复初始化多次,下面给个实例demo和运行结果 模拟设备连接的客户端:
public class NettyEchoClient {
private static Logger logger = LoggerFactory.getLogger(NettyEchoClient.class);
private final String host;
private final int port;
private byte[] client;
private String deviceNo;
public NettyEchoClient(String host, int port, String deviceNo) {
this.host = host;
this.port = port;
this.client = ByteUtil.StringToByte(deviceNo);
this.deviceNo = deviceNo;
}
public void start() {
EventLoopGroup group = null;
Channel ch = null;
try {
InputStream inStream = null;
Certificate certificate = null;
try {
inStream = this.getClass().getResourceAsStream("/server.crt");
CertificateFactory cf = CertificateFactory.getInstance("X.509");
certificate = cf.generateCertificate(inStream);
} finally {
if (inStream != null) {
inStream.close();
}
}
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("cert", certificate);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("sunx509");
tmf.init(ks);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, tmf.getTrustManagers(), null);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
ChannelPipeline p = ch.pipeline();
p.addLast("ssl", new SslHandler(sslEngine));
p.addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS));
p.addLast(new EchoClientHandler());
}
});
ChannelFuture cf = b.connect(this.host, this.port).sync();
cf.addListener(new ConnectionListener(this.host, this.port, this.deviceNo));
cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
} catch (Throwable e) {
e.printStackTrace();
} finally {
if (group != null) {
try {
group.shutdownGracefully().sync(); // 释放线程池资源
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public byte[] rep_login = {
(byte) 0x02, (byte) 0x00, //帧控制,包序号
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,//源地址
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,//目的地
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, //时间戳
(byte) 0x00, (byte) 0x03,//长度
(byte) 0x00,//主命令
(byte) 0x00,//子命令
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,
(byte) 0x01, (byte) 0x00,
(byte) 0x01
};
public static byte[] rep_xt = {
(byte) 0x02, (byte) 0x00, //帧控制,包序号
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,//源地址
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00,//目的地
(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, //时间戳
(byte) 0x00, (byte) 0x02,//长度
(byte) 0x00,//主命令
(byte) 0x01,//子命令
};
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ArrayUtils.replace(rep_login, client, 2);
// System.out.println(ByteUtil.byteToHexString(rep_login, " "));
//crc 数据
byte[] crc = CRC16.getCRC(rep_login);
byte[] response = ByteUtil.convertData(crc, rep_login);
ctx.writeAndFlush(Unpooled.copiedBuffer(response));
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
byte[] data = new byte[in.readableBytes()];
in.readBytes(data);
ByteUtil.decoding(data);//解包
// if (data[27] == (byte) 0x00 && data[28] == (byte) 0x00) {
// Runnable runnable = () -> {
// boolean mark = true;
// while (mark) {
// //如果heart 为true
// try {
// ArrayUtils.replace(rep_xt, client, 2);
//
// byte[] crc = CRC16.getCRC(rep_xt);
// byte[] response = ByteUtil.convertData(crc, rep_xt);
//
// ctx.writeAndFlush(Unpooled.copiedBuffer(response));
// ReferenceCountUtil.release(response);
// //每隔510秒毫秒检测一次
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// mark = false;
// e.printStackTrace();
// }
// }
// };
// Thread thread = new Thread(runnable);
// thread.start();
// }
//System.out.println(ByteUtil.byteToHexString(data, " "));
ReferenceCountUtil.release(in);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (IdleState.WRITER_IDLE.equals(event.state())) {
//如果写通道处于空闲状态,就发送心跳命令
ArrayUtils.replace(rep_xt, client, 2);
byte[] crc = CRC16.getCRC(rep_xt);
byte[] response = ByteUtil.convertData(crc, rep_xt);
ctx.writeAndFlush(Unpooled.copiedBuffer(response));
ReferenceCountUtil.release(response);
} else {
System.out.println("不再发送心跳请求了!");
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(cause.getMessage(), cause);
cause.printStackTrace();
ctx.close();
}
}
}
public static void main(String[] args) throws Exception {
int count = 200;
ExecutorService pools = Executors.newFixedThreadPool(count);
Runnable runnable = () -> {
try {
new NettyEchoClient("127.0.0.1", 8283, "0103" + bytesToHex(genRandomNum(6))).start();
} catch (Exception e) {
e.printStackTrace();
}
};
for (int i = 1; i <= count; i++) {
pools.execute(runnable);
Thread.sleep(150);
}
}
运行结果如下:
可以看到,单例被初始化了多次
后来修改为加上两步验证锁,问题解决
public class ThreadPoolUtil {
/**
* 使用有界队列,避免OOM
*/
private static volatile ExecutorService consumerExecutor;
private ThreadPoolUtil() {
}
/**
* 获取线程池实例
*
* @return 线程池对象
*/
public static ExecutorService getInstance() {
if (null == consumerExecutor) {
synchronized (Object.class) {
if (consumerExecutor == null) {
System.out.println(System.currentTimeMillis() + " 初始化:" + Thread.currentThread().getName());
consumerExecutor = new ThreadPoolExecutor(15, 20, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.DiscardPolicy());
}
}
}
return consumerExecutor;
}
}