BeginWaitForConnection

BeginWaitForConnection

本文介绍了C#中:异步NamedPipeServerStream管道正在关闭的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

同一主题

我的previous问题:C#:异步NamedPipeServerStream了解现在我有下一个:

 私人无效StartListeningPipes()
{
    尝试
    {
        isPipeWorking = TRUE;
                namedPipeServerStream =新NamedPipeServerStream(PIPENAME,PipeDirection.InOut,1,PipeTransmissionMode.Byte,PipeOptions.Asynchronous,BUFFERSIZE,BUFFERSIZE);
                Console.Write(等待客户端的连接......);
                而(isPipeWorking)
                {
            IAsyncResult的asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback,NULL);
                        Thread.sleep代码(3 * 1000);
                }
        }
        ////捕捉所引发,如果管道损坏或断开的IOException异常。
        赶上(IOException异常E)
        {
        Console.WriteLine(IOException异常:{0}重新启动管道服务器......,e.Message);
                StopListeningPipes();
                StartListeningPipes();
        }
        ////抓的ObjectDisposedException如果服务器已停止。然后什么也不做。
        赶上(的ObjectDisposedException)
        {
        }
}

私人无效WaitForConnectionAsyncCallback(IAsyncResult的结果)
{
    尝试
    {
        namedPipeServerStream.EndWaitForConnection(结果);
        Console.WriteLine(客户端连接。);
        namedPipeServerStream.WaitForPipeDrain();
                byte []的BUFF =新的字节[BUFFERSIZE]
                namedPipeServerStream.Read(BUFF,0,BUFFERSIZE);
                字符串recStr = TrimNulls(BUFF);
                Array.Clear(BUFF,0,buff.Length);
                Console.WriteLine();
                Console.WriteLine('+ recStr +');
    }
    赶上(例外五)
    {
        Console.WriteLine(错误:+ e.Message);
        }
}
 

但我发现了

管道正在关闭的异常每次我从客户端接收邮件

为什么?

我的客户:

 使用(NamedPipeClientStream pipeStream =新NamedPipeClientStream(General.PIPENAME))
{
    尝试
        {
        字节[]字节= General.Iso88591Encoding.GetBytes(sendingMessage);
                pipeStream.Write(字节,0,bytes.Length);
                pipeStream.Flush();
                pipeStream.WaitForPipeDrain();
        }
        赶上(TimeoutException异常)
        {
        Console.WriteLine(超时错误!);
        }
    赶上(例外五)
        {
        Console.WriteLine(的String.Format(错误!,e.Message));
        }
}
 


最后code,此刻是:

  ///<总结>
        ///收听管道客户端连接创建新NamedPipeServerStream
        ///< /总结>
        私人无效ListenForPipeClients()
        {
            如果(!this.isListeningToClients)
                返回;

            尝试
            {
                PipeSecurity PS =新PipeSecurity();
                PipeAccessRule面值=新PipeAccessRule(所有人,PipeAccessRights.ReadWrite,System.Security.AccessControl.AccessControlType.Allow);
                ps.AddAccessRule(PAR);
                pipeClientConnection =新NamedPipeServerStream(General.PIPENAME,PipeDirection.InOut,1,PipeTransmissionMode.Byte,PipeOptions.Asynchronous,General.BUFFERSIZE,General.BUFFERSIZE,PS);
                Console.Write(等待客户端的连接......);
                /*namedPipeServerStream.WaitForConnection();
                OnPipeConnected(namedPipeServerStream); * /
                IAsyncResult的结果= pipeClientConnection.BeginWaitForConnection(OnPipeConnected,pipeClientConnection);
            }
            赶上(的ObjectDisposedException)
            {
                ////抓的ObjectDisposedException如果服务器已停止。然后什么也不做。
            }
            赶上(例外五)
            {
                Console.WriteLine(错误occures:{0}重新启动管道服务器......,e.Message);
                this.logger.Add(LogLevel.Warning,的String.Format(错误occures:{0}重新启动管道服务器...,e.Message));
                ListenForPipeClients();
            }
        }

        ///<总结>
        在客户端连接动作///异步回调
        ///< /总结>
        ///< PARAM NAME =asyncResult>异步结果< /参数>
        私人无效OnPipeConnected(IAsyncResult的asyncResult)
        {
            使用(VAR康恩=(NamedPipeServerStream)asyncResult.AsyncState)
            {
                尝试
                {
                    conn.EndWaitForConnection(asyncResult);
                    Console.WriteLine(客户端连接。);
                    PipeClientConnection clientConnection =新PipeClientConnection(康涅狄格州,notifierSenderCache,defaultStorageTime);
                }
                赶上(例外五)
                {
                    Console.WriteLine(e.Message);
                    this.logger.Add(LogLevel.Warning,e.Message);
                }
            }

            ListenForPipeClients();
        }
 

解决方案

看来,你需要一个单独的 NamedPipeServerStream 为每一个客户。 (请注意,我不是一个发现这一点,看到其他的答案。)我想像的工作服务器端会是这个样子(草案code):

 而(this.isServerRunning)
{
     VAR pipeClientConnection =新NamedPipeServerStream(...);

     尝试
     {
         pipeClientConnection.WaitForConnection();
     }
     抓住(...)
     {
         ...
         继续;
     }

     ThreadPool.QueueUserWorkItem(州=>
          {
               //我们需要一个单独的变量这里,以免使拉姆达捕获pipeClientConnection变量,这是不推荐的在多线程的情况
               使用(VAR pipeClientConn =(NamedPipeServerStream)状态)
               {
                    //做的东西
                    ...
               }
          },pipeClientConnection);
}
 

作为一个侧面说明,因为有人指出,在评论你的问题,你就是在浪费内存,通过调用启动一个新的异步调用每3秒 BeginWaitForConnection 在循环中(其中,这不会浪费内存的唯一情况是,当新的连接在间隔3秒小制作,但我怀疑,你可以知道这是肯定的)。你看,基本上每3秒你启动一个新的异步调用,不管最后一个人是否仍在审理中或已完成。此外,它 - 再次 - 并没有考虑到,你需要一个单独的 NamedPipeServerStream 为每个客户帐户

要解决这个问题,就需要消除环路,以及产业链的BeginWaitForConnection调用使用回调方法。这是你必须使用.NET时,经常看到异步I / O类似的模式。草案code:

 私人无效StartListeningPipes()
{
    如果(!this.isServerRunning)
    {
        返回;
    }

    VAR pipeClientConnection =新NamedPipeServerStream(...);

    尝试
    {
        pipeClientConnection.BeginWaitForConnection(asyncResult =>
            {
                //注意lambda的身体不是外try ... catch块的一部分!
                使用(VAR康恩=(NamedPipeServerStream)asyncResult.AsyncState)
                {
                    尝试
                    {
                        conn.EndWaitForConnection(asyncResult);
                    }
                    抓住(...)
                    {
                        ...
                    }

                    //我们有一个建立连接,时间来等待新的,而这个线程执行其业务的客户端
                    //这可能看起来像一个递归调用,但它不是:请记住,我们正处在一个lambda EX pression
                    //如果这困扰你,只是拉姆达导出到一个名为私有方法,就像你在你的问题做了
                    StartListeningPipes();

                    //做生意的客户端
                    conn.WaitForPipeDrain();
                    ...
                }
            },pipeClientConnection);
    }
    抓住(...)
    {
        ...
    }
}
 

控制流将是这样的:

  • [主线程] StartListeningPipes():创建NamedPipeServerStream,发起BeginWaitForConnection()
  • [线程池线程1]客户端#1连接,BeginWaitForConnection()回调:EndWaitForConnection(),然后StartListeningPipes()
  • [线程池线程1] StartListeningPipes():创建新的NamedPipeServerStream,BeginWaitForConnection()调用
  • [线程池线程1]回BeginWaitForConnection()回调:让正事与连接的客户端(#1)
  • [线程池线程2]客户端#2连接,BeginWaitForConnection()回调:...
  • ...

我觉得这是一个很大比使用阻塞I / O更加困难 - 其实,我不是很肯定我得到了它的权利,请如果你看到任何错误指出来 - 这也是很多较为混乱

要暂停服务器在任一例子,你显然会设置 this.isServerRunning 标志

My previous question on the same theme: C#: Asynchronous NamedPipeServerStream understandingNow I have next:

private void StartListeningPipes()
{
    try
    {
        isPipeWorking = true;
                namedPipeServerStream = new NamedPipeServerStream(PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, BUFFERSIZE, BUFFERSIZE);
                Console.Write("Waiting for client connection...");
                while(isPipeWorking)
                {
            IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null);
                        Thread.Sleep(3*1000);
                }
        }
        //// Catch the IOException that is raised if the pipe is broken or disconnected.
        catch (IOException e)
        {
        Console.WriteLine("IOException: {0}. Restart pipe server...", e.Message);
                StopListeningPipes();
                StartListeningPipes();
        }
        //// Catch ObjectDisposedException if server was stopped. Then do nothing.
        catch (ObjectDisposedException)
        {
        }
}

private void WaitForConnectionAsyncCallback(IAsyncResult result)
{
    try
    {
        namedPipeServerStream.EndWaitForConnection(result);
        Console.WriteLine("Client connected.");
        namedPipeServerStream.WaitForPipeDrain();
                byte[] buff = new byte[BUFFERSIZE];
                namedPipeServerStream.Read(buff, 0, BUFFERSIZE);
                string recStr = TrimNulls(buff);
                Array.Clear(buff, 0, buff.Length);
                Console.WriteLine();
                Console.WriteLine("'"+recStr+"'");
    }
    catch (Exception e)
    {
        Console.WriteLine("Error: " + e.Message);            
        }
}

But I'm getting

The pipe is being closed Exception everytime I receive a message from client

Why?

My client:

 using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(General.PIPENAME))
{
    try
        {
        byte[] bytes = General.Iso88591Encoding.GetBytes(sendingMessage);
                pipeStream.Write(bytes, 0, bytes.Length);
                pipeStream.Flush();
                pipeStream.WaitForPipeDrain();
        }
        catch (TimeoutException)
        {
        Console.WriteLine("Timeout error!");
        }
    catch (Exception e)
        {
        Console.WriteLine(string.Format("Error! ", e.Message));
        }
}


Final code at the moment is:

/// <summary>
        /// Create new NamedPipeServerStream for listening to pipe client connection
        /// </summary>
        private void ListenForPipeClients()
        {
            if (!this.isListeningToClients)
                return;

            try
            {
                PipeSecurity ps = new PipeSecurity();
                PipeAccessRule par = new PipeAccessRule("Everyone", PipeAccessRights.ReadWrite, System.Security.AccessControl.AccessControlType.Allow);
                ps.AddAccessRule(par);
                pipeClientConnection = new NamedPipeServerStream(General.PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, General.BUFFERSIZE, General.BUFFERSIZE, ps);
                Console.Write("Waiting for client connection...");
                /*namedPipeServerStream.WaitForConnection();
                OnPipeConnected(namedPipeServerStream);*/
                IAsyncResult result = pipeClientConnection.BeginWaitForConnection(OnPipeConnected, pipeClientConnection);
            }
            catch (ObjectDisposedException)
            {
                //// Catch ObjectDisposedException if server was stopped. Then do nothing.
            }
            catch (Exception e)
            {
                Console.WriteLine("Error occures: {0}. Restart pipe server...", e.Message);
                this.logger.Add(LogLevel.Warning, string.Format("Error occures: {0}. Restart pipe server...", e.Message));
                ListenForPipeClients();
            }
        }

        /// <summary>
        /// Async callback on client connected action
        /// </summary>
        /// <param name="asyncResult">Async result</param>
        private void OnPipeConnected(IAsyncResult asyncResult)
        {
            using (var conn = (NamedPipeServerStream)asyncResult.AsyncState)
            {
                try
                {
                    conn.EndWaitForConnection(asyncResult);
                    Console.WriteLine("Client connected.");
                    PipeClientConnection clientConnection = new PipeClientConnection(conn, notifierSenderCache, defaultStorageTime);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                    this.logger.Add(LogLevel.Warning, e.Message);
                }
            }

            ListenForPipeClients();
        }
解决方案

It appears that you need a separate NamedPipeServerStream for each client. (Note that I was not the one to discover this, see the other answers.) I'd imagine the working server-side would look something like this (draft code):

while(this.isServerRunning)
{
     var pipeClientConnection = new NamedPipeServerStream(...);

     try
     {
         pipeClientConnection.WaitForConnection();
     }
     catch(...)
     {
         ...
         continue;
     }

     ThreadPool.QueueUserWorkItem(state =>
          {
               // we need a separate variable here, so as not to make the lambda capture the pipeClientConnection variable, which is not recommended in multi-threaded scenarios
               using(var pipeClientConn = (NamedPipeServerStream)state)
               {
                    // do stuff
                    ...
               }
          }, pipeClientConnection);
}

As a side note, as it was pointed out in a comment to your question, you're wasting memory with initiating a new async call every 3 seconds by calling BeginWaitForConnection in a loop (the only case where this wouldn't waste memory is when new connections are made in intervals smaller than 3 seconds, but I doubt that you can know this for sure). You see, basically every 3 seconds you're initiating a new async call, regardless of whether the last one is still pending or has completed. Furthermore, it - once again - does not take into account that you need a separate NamedPipeServerStream for each client.

To fix this issue, you need to eliminate the loop, and "chain" the BeginWaitForConnection calls using the callback method. This is a similar pattern you'll see quite often in async I/O when using .NET. Draft code:

private void StartListeningPipes()
{
    if(!this.isServerRunning)
    {
        return;
    }

    var pipeClientConnection = new NamedPipeServerStream(...);

    try
    {
        pipeClientConnection.BeginWaitForConnection(asyncResult =>
            {
                // note that the body of the lambda is not part of the outer try... catch block!
                using(var conn = (NamedPipeServerStream)asyncResult.AsyncState)
                {
                    try
                    {
                        conn.EndWaitForConnection(asyncResult);
                    }
                    catch(...)
                    {
                        ...
                    }

                    // we have a connection established, time to wait for new ones while this thread does its business with the client
                    // this may look like a recursive call, but it is not: remember, we're in a lambda expression
                    // if this bothers you, just export the lambda into a named private method, like you did in your question
                    StartListeningPipes();

                    // do business with the client
                    conn.WaitForPipeDrain();
                    ...
                }
            }, pipeClientConnection);
    }
    catch(...)
    {
        ...
    }
}

The control flow will be something like this:

  • [main thread] StartListeningPipes(): created NamedPipeServerStream, initiated BeginWaitForConnection()
  • [threadpool thread 1] client #1 connecting, BeginWaitForConnection() callback: EndWaitForConnection() then StartListeningPipes()
  • [threadpool thread 1] StartListeningPipes(): created new NamedPipeServerStream, BeginWaitForConnection() call
  • [threadpool thread 1] back to the BeginWaitForConnection() callback: getting down to business with the connected client (#1)
  • [threadpool thread 2] client #2 connecting, BeginWaitForConnection() callback: ...
  • ...

I think that this is a lot more difficult than using blocking I/O - in fact, I'm not quite certain I got it right, please point it out if you see any mistakes - and it's also a lot more confusing.

To pause the server in either examples, you obviously would set the this.isServerRunning flag to false.

这篇关于C#中:异步NamedPipeServerStream管道正在关闭的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 07:41