问题描述
同一主题我的previous问题:C#:异步NamedPipeServerStream了解现在我有下一个:
私人无效StartListeningPipes()
{
尝试
{
isPipeWorking = TRUE;
namedPipeServerStream =新NamedPipeServerStream(PIPENAME,PipeDirection.InOut,1,PipeTransmissionMode.Byte,PipeOptions.Asynchronous,BUFFERSIZE,BUFFERSIZE);
Console.Write(等待客户端的连接......);
而(isPipeWorking)
{
IAsyncResult的asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback,NULL);
Thread.sleep代码(3 * 1000);
}
}
////捕捉所引发,如果管道损坏或断开的IOException异常。
赶上(IOException异常E)
{
Console.WriteLine(IOException异常:{0}重新启动管道服务器......,e.Message);
StopListeningPipes();
StartListeningPipes();
}
////抓的ObjectDisposedException如果服务器已停止。然后什么也不做。
赶上(的ObjectDisposedException)
{
}
}
私人无效WaitForConnectionAsyncCallback(IAsyncResult的结果)
{
尝试
{
namedPipeServerStream.EndWaitForConnection(结果);
Console.WriteLine(客户端连接。);
namedPipeServerStream.WaitForPipeDrain();
byte []的BUFF =新的字节[BUFFERSIZE]
namedPipeServerStream.Read(BUFF,0,BUFFERSIZE);
字符串recStr = TrimNulls(BUFF);
Array.Clear(BUFF,0,buff.Length);
Console.WriteLine();
Console.WriteLine('+ recStr +');
}
赶上(例外五)
{
Console.WriteLine(错误:+ e.Message);
}
}
但我发现了
管道正在关闭的异常
每次我从客户端接收邮件
为什么?
我的客户:
使用(NamedPipeClientStream pipeStream =新NamedPipeClientStream(General.PIPENAME))
{
尝试
{
字节[]字节= General.Iso88591Encoding.GetBytes(sendingMessage);
pipeStream.Write(字节,0,bytes.Length);
pipeStream.Flush();
pipeStream.WaitForPipeDrain();
}
赶上(TimeoutException异常)
{
Console.WriteLine(超时错误!);
}
赶上(例外五)
{
Console.WriteLine(的String.Format(错误!,e.Message));
}
}
最后code,此刻是:
///<总结>
///收听管道客户端连接创建新NamedPipeServerStream
///< /总结>
私人无效ListenForPipeClients()
{
如果(!this.isListeningToClients)
返回;
尝试
{
PipeSecurity PS =新PipeSecurity();
PipeAccessRule面值=新PipeAccessRule(所有人,PipeAccessRights.ReadWrite,System.Security.AccessControl.AccessControlType.Allow);
ps.AddAccessRule(PAR);
pipeClientConnection =新NamedPipeServerStream(General.PIPENAME,PipeDirection.InOut,1,PipeTransmissionMode.Byte,PipeOptions.Asynchronous,General.BUFFERSIZE,General.BUFFERSIZE,PS);
Console.Write(等待客户端的连接......);
/*namedPipeServerStream.WaitForConnection();
OnPipeConnected(namedPipeServerStream); * /
IAsyncResult的结果= pipeClientConnection.BeginWaitForConnection(OnPipeConnected,pipeClientConnection);
}
赶上(的ObjectDisposedException)
{
////抓的ObjectDisposedException如果服务器已停止。然后什么也不做。
}
赶上(例外五)
{
Console.WriteLine(错误occures:{0}重新启动管道服务器......,e.Message);
this.logger.Add(LogLevel.Warning,的String.Format(错误occures:{0}重新启动管道服务器...,e.Message));
ListenForPipeClients();
}
}
///<总结>
在客户端连接动作///异步回调
///< /总结>
///< PARAM NAME =asyncResult>异步结果< /参数>
私人无效OnPipeConnected(IAsyncResult的asyncResult)
{
使用(VAR康恩=(NamedPipeServerStream)asyncResult.AsyncState)
{
尝试
{
conn.EndWaitForConnection(asyncResult);
Console.WriteLine(客户端连接。);
PipeClientConnection clientConnection =新PipeClientConnection(康涅狄格州,notifierSenderCache,defaultStorageTime);
}
赶上(例外五)
{
Console.WriteLine(e.Message);
this.logger.Add(LogLevel.Warning,e.Message);
}
}
ListenForPipeClients();
}
看来,你需要一个单独的 NamedPipeServerStream
为每一个客户。 (请注意,我不是一个发现这一点,看到其他的答案。)我想像的工作服务器端会是这个样子(草案code):
而(this.isServerRunning)
{
VAR pipeClientConnection =新NamedPipeServerStream(...);
尝试
{
pipeClientConnection.WaitForConnection();
}
抓住(...)
{
...
继续;
}
ThreadPool.QueueUserWorkItem(州=>
{
//我们需要一个单独的变量这里,以免使拉姆达捕获pipeClientConnection变量,这是不推荐的在多线程的情况
使用(VAR pipeClientConn =(NamedPipeServerStream)状态)
{
//做的东西
...
}
},pipeClientConnection);
}
作为一个侧面说明,因为有人指出,在评论你的问题,你就是在浪费内存,通过调用启动一个新的异步调用每3秒 BeginWaitForConnection
在循环中(其中,这不会浪费内存的唯一情况是,当新的连接在间隔3秒小制作,但我怀疑,你可以知道这是肯定的)。你看,基本上每3秒你启动一个新的异步调用,不管最后一个人是否仍在审理中或已完成。此外,它 - 再次 - 并没有考虑到,你需要一个单独的 NamedPipeServerStream
为每个客户帐户
要解决这个问题,就需要消除环路,以及产业链的BeginWaitForConnection调用使用回调方法。这是你必须使用.NET时,经常看到异步I / O类似的模式。草案code:
私人无效StartListeningPipes()
{
如果(!this.isServerRunning)
{
返回;
}
VAR pipeClientConnection =新NamedPipeServerStream(...);
尝试
{
pipeClientConnection.BeginWaitForConnection(asyncResult =>
{
//注意lambda的身体不是外try ... catch块的一部分!
使用(VAR康恩=(NamedPipeServerStream)asyncResult.AsyncState)
{
尝试
{
conn.EndWaitForConnection(asyncResult);
}
抓住(...)
{
...
}
//我们有一个建立连接,时间来等待新的,而这个线程执行其业务的客户端
//这可能看起来像一个递归调用,但它不是:请记住,我们正处在一个lambda EX pression
//如果这困扰你,只是拉姆达导出到一个名为私有方法,就像你在你的问题做了
StartListeningPipes();
//做生意的客户端
conn.WaitForPipeDrain();
...
}
},pipeClientConnection);
}
抓住(...)
{
...
}
}
控制流将是这样的:
- [主线程] StartListeningPipes():创建NamedPipeServerStream,发起BeginWaitForConnection()
- [线程池线程1]客户端#1连接,BeginWaitForConnection()回调:EndWaitForConnection(),然后StartListeningPipes()
- [线程池线程1] StartListeningPipes():创建新的NamedPipeServerStream,BeginWaitForConnection()调用
- [线程池线程1]回BeginWaitForConnection()回调:让正事与连接的客户端(#1)
- [线程池线程2]客户端#2连接,BeginWaitForConnection()回调:...
- ...
我觉得这是一个很大比使用阻塞I / O更加困难 - 其实,我不是很肯定我得到了它的权利,请如果你看到任何错误指出来 - 这也是很多较为混乱
要暂停服务器在任一例子,你显然会设置 this.isServerRunning
标志假
。
My previous question on the same theme: C#: Asynchronous NamedPipeServerStream understandingNow I have next:
private void StartListeningPipes()
{
try
{
isPipeWorking = true;
namedPipeServerStream = new NamedPipeServerStream(PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, BUFFERSIZE, BUFFERSIZE);
Console.Write("Waiting for client connection...");
while(isPipeWorking)
{
IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null);
Thread.Sleep(3*1000);
}
}
//// Catch the IOException that is raised if the pipe is broken or disconnected.
catch (IOException e)
{
Console.WriteLine("IOException: {0}. Restart pipe server...", e.Message);
StopListeningPipes();
StartListeningPipes();
}
//// Catch ObjectDisposedException if server was stopped. Then do nothing.
catch (ObjectDisposedException)
{
}
}
private void WaitForConnectionAsyncCallback(IAsyncResult result)
{
try
{
namedPipeServerStream.EndWaitForConnection(result);
Console.WriteLine("Client connected.");
namedPipeServerStream.WaitForPipeDrain();
byte[] buff = new byte[BUFFERSIZE];
namedPipeServerStream.Read(buff, 0, BUFFERSIZE);
string recStr = TrimNulls(buff);
Array.Clear(buff, 0, buff.Length);
Console.WriteLine();
Console.WriteLine("'"+recStr+"'");
}
catch (Exception e)
{
Console.WriteLine("Error: " + e.Message);
}
}
But I'm getting
The pipe is being closed Exception
everytime I receive a message from client
Why?
My client:
using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(General.PIPENAME))
{
try
{
byte[] bytes = General.Iso88591Encoding.GetBytes(sendingMessage);
pipeStream.Write(bytes, 0, bytes.Length);
pipeStream.Flush();
pipeStream.WaitForPipeDrain();
}
catch (TimeoutException)
{
Console.WriteLine("Timeout error!");
}
catch (Exception e)
{
Console.WriteLine(string.Format("Error! ", e.Message));
}
}
Final code at the moment is:
/// <summary>
/// Create new NamedPipeServerStream for listening to pipe client connection
/// </summary>
private void ListenForPipeClients()
{
if (!this.isListeningToClients)
return;
try
{
PipeSecurity ps = new PipeSecurity();
PipeAccessRule par = new PipeAccessRule("Everyone", PipeAccessRights.ReadWrite, System.Security.AccessControl.AccessControlType.Allow);
ps.AddAccessRule(par);
pipeClientConnection = new NamedPipeServerStream(General.PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, General.BUFFERSIZE, General.BUFFERSIZE, ps);
Console.Write("Waiting for client connection...");
/*namedPipeServerStream.WaitForConnection();
OnPipeConnected(namedPipeServerStream);*/
IAsyncResult result = pipeClientConnection.BeginWaitForConnection(OnPipeConnected, pipeClientConnection);
}
catch (ObjectDisposedException)
{
//// Catch ObjectDisposedException if server was stopped. Then do nothing.
}
catch (Exception e)
{
Console.WriteLine("Error occures: {0}. Restart pipe server...", e.Message);
this.logger.Add(LogLevel.Warning, string.Format("Error occures: {0}. Restart pipe server...", e.Message));
ListenForPipeClients();
}
}
/// <summary>
/// Async callback on client connected action
/// </summary>
/// <param name="asyncResult">Async result</param>
private void OnPipeConnected(IAsyncResult asyncResult)
{
using (var conn = (NamedPipeServerStream)asyncResult.AsyncState)
{
try
{
conn.EndWaitForConnection(asyncResult);
Console.WriteLine("Client connected.");
PipeClientConnection clientConnection = new PipeClientConnection(conn, notifierSenderCache, defaultStorageTime);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
this.logger.Add(LogLevel.Warning, e.Message);
}
}
ListenForPipeClients();
}
It appears that you need a separate NamedPipeServerStream
for each client. (Note that I was not the one to discover this, see the other answers.) I'd imagine the working server-side would look something like this (draft code):
while(this.isServerRunning)
{
var pipeClientConnection = new NamedPipeServerStream(...);
try
{
pipeClientConnection.WaitForConnection();
}
catch(...)
{
...
continue;
}
ThreadPool.QueueUserWorkItem(state =>
{
// we need a separate variable here, so as not to make the lambda capture the pipeClientConnection variable, which is not recommended in multi-threaded scenarios
using(var pipeClientConn = (NamedPipeServerStream)state)
{
// do stuff
...
}
}, pipeClientConnection);
}
As a side note, as it was pointed out in a comment to your question, you're wasting memory with initiating a new async call every 3 seconds by calling BeginWaitForConnection
in a loop (the only case where this wouldn't waste memory is when new connections are made in intervals smaller than 3 seconds, but I doubt that you can know this for sure). You see, basically every 3 seconds you're initiating a new async call, regardless of whether the last one is still pending or has completed. Furthermore, it - once again - does not take into account that you need a separate NamedPipeServerStream
for each client.
To fix this issue, you need to eliminate the loop, and "chain" the BeginWaitForConnection calls using the callback method. This is a similar pattern you'll see quite often in async I/O when using .NET. Draft code:
private void StartListeningPipes()
{
if(!this.isServerRunning)
{
return;
}
var pipeClientConnection = new NamedPipeServerStream(...);
try
{
pipeClientConnection.BeginWaitForConnection(asyncResult =>
{
// note that the body of the lambda is not part of the outer try... catch block!
using(var conn = (NamedPipeServerStream)asyncResult.AsyncState)
{
try
{
conn.EndWaitForConnection(asyncResult);
}
catch(...)
{
...
}
// we have a connection established, time to wait for new ones while this thread does its business with the client
// this may look like a recursive call, but it is not: remember, we're in a lambda expression
// if this bothers you, just export the lambda into a named private method, like you did in your question
StartListeningPipes();
// do business with the client
conn.WaitForPipeDrain();
...
}
}, pipeClientConnection);
}
catch(...)
{
...
}
}
The control flow will be something like this:
- [main thread] StartListeningPipes(): created NamedPipeServerStream, initiated BeginWaitForConnection()
- [threadpool thread 1] client #1 connecting, BeginWaitForConnection() callback: EndWaitForConnection() then StartListeningPipes()
- [threadpool thread 1] StartListeningPipes(): created new NamedPipeServerStream, BeginWaitForConnection() call
- [threadpool thread 1] back to the BeginWaitForConnection() callback: getting down to business with the connected client (#1)
- [threadpool thread 2] client #2 connecting, BeginWaitForConnection() callback: ...
- ...
I think that this is a lot more difficult than using blocking I/O - in fact, I'm not quite certain I got it right, please point it out if you see any mistakes - and it's also a lot more confusing.
To pause the server in either examples, you obviously would set the this.isServerRunning
flag to false
.
这篇关于C#中:异步NamedPipeServerStream管道正在关闭的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!