本文介绍了什么是处理基于行的网络I / O流的好方法吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请注意:让我appologize对这个问题的长度,我不得不把大量的信息到它。我希望不会引起太多人简单地浏览一遍,并作出假设。请的全部读出。谢谢你。

Note: Let me appologize for the length of this question, i had to put a lot of information into it. I hope that doesn't cause too many people to simply skim it and make assumptions. Please read in its entirety. Thanks.

我有数据来在超过一个插座的流。此数据线为主。

I have a stream of data coming in over a socket. This data is line oriented.

我使用.NET的APM(异步编程方法)(的BeginRead等)。使用基于流的I / O本precludes因为异步I / O缓冲区是基于。有可能重新打包的数据并将其发送到一个流,诸如存储器流,但也有问题存在,以及

I am using the APM (Async Programming Method) of .NET (BeginRead, etc..). This precludes using stream based I/O because Async I/O is buffer based. It is possible to repackage the data and send it to a stream, such as a Memory stream, but there are issues there as well.

问题是,我的输入流(这是我无法控制)不给我就流了多久的任何信息。它仅仅是换行线看起来像这样流:

The problem is that my input stream (which i have no control over) doesn't give me any information on how long the stream is. It simply is a stream of newline lines looking like this:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

因此​​,使用APM,因为我不知道任何给定的数据集将有多长,它很可能是数据块将穿越缓冲区边界需要多次读取,但那些多次读取也将跨越多个数据块

So, using APM, and since i don't know how long any given data set will be, it is likely that blocks of data will cross buffer boundaries requiring multiple reads, but those multiple reads will also span multiple blocks of data.

例如:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

我的第一个念头是使用StringBuilder,只是缓冲线到SB追加。这部作品在一定程度上,但我发现它难以提取的数据块。我试图用一个StringReader读取newlined数据,但没有办法知道你是否都拿到了完整的线与否,StringReader在返回添加的最后一个块,随后年底的部分线路由返回null aftewards。没有办法知道是否返回什么数据的完整newlined线。

My first thought was to use a StringBuilder and simply append the buffer lines to the SB. This works to some extent, but i found it difficult to extract blocks of data. I tried using a StringReader to read newlined data but there was no way to know whether you were getting a complete line or not, as StringReader returns a partial line at the end of the last block added, followed by returning null aftewards. There isn't a way to know if what was returned was a full newlined line of data.

例如:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

更糟糕的是,如果我只是不停地追加到数据,缓冲区变得越来越大,而且因为这可能会在同一时间,这不是一个很好的解决方案,数周或数个月。

What's worse, is that if I just keep appending to the data, the buffers get bigger and bigger, and since this could run for weeks or months at a time that's not a good solution.

我的下一个想法是因为我读他们从SB删除数据块。这需要写我自己的ReadLine函数,但我卡在锁定期间,数据读取和写入。此外,数据的较大块(其可包含数百个数据的读出的和兆字节)规定扫描寻找新行整个缓冲区。这不是有效,pretty难看。

My next thought was to remove blocks of data from the SB as I read them. This required writing my own ReadLine function, but then I'm stuck locking the data during reads and writes. Also, the larger blocks of data (which can consist of hundreds of reads and megabytes of data) require scanning the entire buffer looking for newlines. It's not efficient and pretty ugly.

我在找的东西,有一个StreamReader /作家与异步I / O的方便简单。

I'm looking for something that has the simplicity of a StreamReader/Writer with the convenience of async I/O.

我的下一个想法是使用一个MemoryStream,和写入数据的块内存流,然后一个StreamReader附加到流并使用输入行,但同样我有问题,与知道,如果在缓冲区中的最后一次读取是整条生产线或没有,再加上它甚至很难从流中删除过时的数据。

My next thought was to use a MemoryStream, and write the blocks of data to a memory stream then attach a StreamReader to the stream and use ReadLine, but again I have issues with knowing if a the last read in the buffer is a complete line or not, plus it's even harder to remove the "stale" data from the stream.

我也想过使用线程具有同步读取。这具有的优点在于使用一个StreamReader,它将始终从一个的ReadLine()返回一个全线,除了在连接中断的情况。然而,这具有消除连接问题,以及某些类型的网络问题可以导致对在延长的时间周期雄阻断套接字。我使用的异步IO,因为我不想占用一个线程程序阻止数据接收的使用寿命。

I also thought about using a thread with synchronous reads. This has the advantage that using a StreamReader, it will always return a full line from a ReadLine(), except in broken connection situations. However this has issues with canceling the connection, and certain kinds of network problems can result in hung blocking sockets for an extended period of time. I'm using async IO because i don't want to tie up a thread for the life of the program blocking on data receive.

连接是持久的。和数据将继续流过的时间。期间intial连接,有数据的一个大的流量,而且一旦该流完成插座保持打开等待实时更新。我不知道precisely当初始流量已经完成,因为知道的唯一途径是,没有更多的数据被立即发送。这意味着我不能等到初始数据加载处理之前完成,我是实时pretty多卡处理,因为它的用武之地。

The connection is long lasting. And data will continue to flow over time. During the intial connection, there is a large flow of data, and once that flow is done the socket remains open waiting for real-time updates. I don't know precisely when the initial flow has "finished", since the only way to know is that no more data is sent right away. This means i can't wait for the initial data load to finish before processing, I'm pretty much stuck processing "in real time" as it comes in.

所以,任何人都可以提出一个很好的方法来处理的方式,是不是太复杂,这种情况?我真的希望这是简单而优雅的越好,但我一直想出由于所有的边缘情况越来越复杂的解决方案。我想我要的是某种形式的FIFO中,我可以轻松地追加更多的数据,而在同一时间的数据坡平出它符合一定的标准(即新行终止字符串)。

So, can anyone suggest a good method to handle this situation in a way that isn't overly complicated? I really want this to be as simple and elegant as possible, but I keep coming up with more and more complicated solutions due to all the edge cases. I guess what I want is some kind of FIFO in which i can easily keep appending more data while at the same time poping data out of it that matches certain criteria (ie, newline terminated strings).

推荐答案

这是一个非常有趣的问题。过去对我来说,解决办法是使用一个单独的线程与同步操作,因为你提出。 (我设法大部分与封闭使用锁和大量异常处理的插座的问题各地。)不过,使用内置的异步操作通常是可取的,因为它允许真正的操作系统级的异步I / O,让我明白你的观点。

That's quite an interesting question. The solution for me in the past has been to use a separate thread with synchronous operations, as you propose. (I managed to get around most of the problems with blocking sockets using locks and lots of exception handlers.) Still, using the in-built asynchronous operations is typically advisable as it allows for true OS-level async I/O, so I understand your point.

嗯,我已经走了,写了一个类实现什么,我相信你需要(在一个相对干净的方式,我会说)。让我知道你在想什么。

Well I've gone and written a class for accomplishing what I believe you need (in a relatively clean manner I would say). Let me know what you think.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line,
                // so that the buffer does not have to be expanded in most/all situations.
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

这个类的一个实例应该为每一个的NetworkStream和处理功能来创建每当收到新的数据应该被称为(在的BeginRead的回调方法,调用接下来的BeginRead我会想象之前)。

An instance of this class should be created for each NetworkStream and the Process function should be called whenever new data is received (in the callback method for BeginRead, before you call the next BeginRead I would imagine).

请注意:我只验证了这code测试数据,而不是在网络上传输的实际数据。不过,我没有预料到的任何差异...

Note: I have only verified this code with test data, not actual data transmitted over the network. However, I wouldn't anticipate any differences...

此外,警告该类当然不是线程安全的,但只要是的BeginRead不再执行,直到目前的数据已被处理后(我presume你正在做的),不该有'T有任何问题。

Also, a warning that the class is of course not thread-safe, but as long as BeginRead isn't executed again until after the current data has been processed (as I presume you are doing), there shouldn't be any problems.

希望这对你的作品。让我知道,如果有剩余的问题,我会尝试修改解决方案来处理这些问题。 (也很可能我错过了这个问题的一些细微之处,尽管仔细阅读吧!)

Hope this works for you. Let me know if there are remaining issues and I will try to modify the solution to deal with them. (There could well be some subtlety of the question I missed, despite reading it carefully!)

这篇关于什么是处理基于行的网络I / O流的好方法吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 20:35