问题描述
我目前正在使用java.nio.channel.Selectors&用于将打开1对多连接以继续流式传输到服务器的应用程序的SocketChannel。我的应用程序有三个线程:StreamWriteWorker - 对SocketChannel执行写操作,StreamReadWorker - 从缓冲区读取字节并解析内容,StreamTaskDispatcher - 执行Selector对readyOps的选择,并为工作线程调度新的runnable。
I am currently using the java.nio.channel.Selectors & SocketChannels for a application that will open 1-to-many connections for continues Streaming to a Server. I have three threads for my application: StreamWriteWorker - performs write operation to the SocketChannel, StreamReadWorker - reads bytes from the buffer and parse content, and StreamTaskDispatcher - performs Selector's selection for readyOps and dispatches new runnables for the worker threads.
问题 - 选择器选择方法的调用仅在第一次调用时返回值> 0(有效的readyOps);我能够在所有就绪通道上执行一次写入和发送数据,但是Selector选择方法的以下所有调用都返回0.
Problem - Invocation on the Selector's selection method only returns a value > 0 (valid readyOps) on the first invocation; I am able to perform a write and send data on all ready channels that one time, but all of the following invocation of the Selector's selection method returns 0.
问题:做每次读/写后我都需要在SocketChannel上调用close(我希望不是!)?如果没有可能导致SocketChannel无法用于任何读/写操作的原因?
Question: Do I need to invoke close on the SocketChannel after every Read/Write (I hope not!)? If not what could be the cause for the SocketChannels not being available of for any Read/Write Ops?
我很抱歉我无法发布代码,但我希望我有解释问题显然足以让别人帮忙。我已经搜索了答案,我发现你关闭后不能重复使用SocketChannel连接,但我的频道不应该关闭,服务器永远不会收到EOF流结果。
I am sorry I cannot post the code, but I hope I have explained the problem clearly enough for someone to help. I have searched for answers and I see you cannot reuse a SocketChannel connection after it close, but my channel should not be close, the server never receives EOF stream result.
I取得了一些进展,并发现由于json解析错误,服务器应用程序上没有发生写操作。所以现在我的客户端应用程序代码上的SocketChannel在处理读取操作后就可以进行另一次写操作了。我想这是SocketChannels的TCP特性。但是,SocketChannel不可用于服务器应用程序端的另一个读取操作。这是SocketChannels的正常行为吗?我是否需要在读取操作后关闭客户端上的连接并建立新连接?
I made some progress and figured out that the write operation was not occurring on the server app due to json parsing error. So now my SocketChannel on the client app code becomes ready for another write operation after it process a read operation. I guess this is the TCP nature of SocketChannels. However, the SocketChannel does not become available for another read operation on the server app side,. Is this normal behavior for SocketChannels? Do I need to close the connection on the client side after the read operation and establish a new connection?
以下是我要做的代码示例:
Here is a code sample of what I am trying to do:
package org.stream.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.RandomStringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;
public class ClientServerTest {
private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();
private class StreamWriteTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int results = 0;
while (buffer.hasRemaining()) {
try {
results = sc.write(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
key.attach(data);
selector.wakeup();
return;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
selector.wakeup();
}
}
private class StreamReadTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;
private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}
private boolean checkUUID(byte[] data) {
return uuidToSize.containsKey(new String(data));
}
@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
byte[] data = (byte[]) key.attachment();
if (data != null) {
buffer.put(data);
}
int count = 0;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
if (checkUUID(data)) {
key.interestOps(SelectionKey.OP_READ);
key.attach(data);
} else {
System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
key.interestOps(SelectionKey.OP_WRITE);
key.attach(null);
}
}
if (count == -1) {
try {
sc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
selector.wakeup();
}
}
private class ClientWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9001));
sc.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
sc = (SocketChannel) key.channel();
if (!sc.finishConnect()) {
continue;
}
sc.register(selector, SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
key.interestOps(0);
executor.execute(new StreamReadTask(buffer, key, selector));
}
if (key.isWritable()) {
key.interestOps(0);
if(key.attachment() == null){
key.attach(dataQueue.take());
}
executor.execute(new StreamWriteTask(buffer, key, selector));
}
}
}
} catch (IOException ex) {
// Handle Exception
}catch(InterruptedException ex){
}
}
}
private class ServerWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(9001));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
DataHandler handler = new DataHandler();
while (selector.isOpen()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
handler.readSocket(buffer, key);
}
if (key.isWritable()) {
handler.writeToSocket(buffer, key);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class DataHandler {
private JsonObject parseData(StringBuilder builder) {
if (!builder.toString().endsWith("}")) {
return null;
}
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(builder.toString());
return obj;
}
private void readSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
int count = Integer.MAX_VALUE;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
e.printStackTrace();
}
if (count == 0) {
buffer.flip();
StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
.attachment() : new StringBuilder();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
System.out.println(buffer);
CharBuffer charBuffer = decoder.decode(buffer);
String content = charBuffer.toString();
charBuffer = null;
builder.append(content);
System.out.println(content);
JsonObject obj = parseData(builder);
if (obj == null) {
key.attach(builder);
key.interestOps(SelectionKey.OP_READ);
} else {
System.out.println("data ~~~~~~~ " + builder.toString());
JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
key.attach(uuid.toString().getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}
if (count == -1) {
key.attach(null);
sc.close();
}
}
private void writeToSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int writeAttempts = 0;
while (buffer.hasRemaining()) {
int results = sc.write(buffer);
writeAttempts++;
System.out.println("Write Attempt #" + writeAttempts);
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.attach(data);
key.interestOps(SelectionKey.OP_WRITE);
break;
}
}
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}
public ClientServerTest() {
for (int index = 0; index < 1000; index++) {
JsonObject obj = new JsonObject();
String uuid = UUID.randomUUID().toString();
uuidToSize.put(uuid, uuid.length());
obj.addProperty("uuid", uuid);
String data = RandomStringUtils.randomAlphanumeric(10000);
obj.addProperty("event", data);
dataQueue.add(obj.toString().getBytes());
}
Thread serverWorker = new Thread(new ServerWorker());
serverWorker.start();
Thread clientWorker = new Thread(new ClientWorker());
clientWorker.start();
}
/**
* @param args
*/
public static void main(String[] args) {
ClientServerTest test = new ClientServerTest();
for(;;){
}
}
}
推荐答案
-
处理
OP_CONNECT
是尝试finishConnect()
一次,如果成功,请取消注册OP_CONNECT
并注册OP_READ
或OP_WRITE
,可能是后者,因为您是客户。在非阻塞模式下循环和睡眠是没有意义的。如果finishConnect()
返回false,OP_CONNECT
将再次触发。
The correct way to process
OP_CONNECT
is to attemptfinishConnect()
once, and if it succeeds deregisterOP_CONNECT
and registerOP_READ
orOP_WRITE
, probably the latter as you are a client. Looping and sleeping in non-blocking mode doesn't make sense. IffinishConnect()
returns false,OP_CONNECT
will fire again.
您处理!key.isAcceptable()
,!key.isReadable()
,以及!key.isWriteable()
绝对没有任何意义。如果密钥可以接受,请致电 accept()
。如果它是可读的,请调用 read()
。如果它是可写的,请调用 write()
。就这么简单。
Your processing of !key.isAcceptable()
, !key.isReadable()
, and !key.isWriteable()
makes absolutely zero sense whatsoever. If the key is acceptable, call accept()
. If it's readable, call read()
. If it's writeable, call write()
. It's as simple as that.
您需要注意,通道几乎总是可写的,除了它们的套接字发送缓冲区已满的短暂时期。所以只有在你有东西要写的时候才能注册 OP_WRITE
,或者在之后你已经尝试写入并获得零回报;然后当 OP_WRITE
触发时,重试写入并取消注册 OP_WRITE
,除非你得到另一个零。
You need to be aware that channels are almost always writeable, except for the brief periods when their socket send buffer is full. So only register for OP_WRITE
when you have something to write, or better still after you've tried a write and got a zero return; then when OP_WRITE
fires, retry the write and deregister OP_WRITE
unless you got another zero.
你的 ByteBuffer
对你来说太经济了。实际上,每个频道需要一个。您可以将其保存为密钥附件,以便在需要时将其恢复。否则,您没有任何方法可以累积部分读取,这些读取肯定会发生,或者任何重试写入的方式。
You are being far too economical with your ByteBuffer
. In practice you need one per channel. You can save it as the key attachment so you can get it back when you need it. Otherwise you don't have any way of accumulating partial reads, which are certain to happen, or any way of retrying writes either.
这篇关于java.nio选择器和SocketChannel用于继续流式传输的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!