我最近为这个问题提供了答案:C# - Realtime console output redirection

经常发生的事情是,解释事物(这里是“事物”就是我解决类似问题的方式)可以使您加深理解和/或(在这种情况下)“糟糕”的时刻。我意识到实现的解决方案存在一个错误。该bug几乎没有什么实际意义,但是对于我作为开发人员而言,它具有极其重要的意义:知道我的代码有爆炸的潜力,我不能放心。

压缩错误是此问题的目的。很长的介绍,我很抱歉,让我们变得肮脏。

我想建立一个类,使我可以从控制台的标准输出Stream接收输入。控制台输出流的类型为FileStream;如果需要,实现可以强制转换为该值。还存在一个相关的StreamReader可利用。

在此类中,我只需实现一件事即可实现所需的功能:异步“读取当前所有可用数据”操作。读取到流的末尾是不可行的,因为除非进程关闭控制台输出句柄,否则流将不会结束,并且不会这样做,因为它是交互式的并且在继续之前需要输入。

我将使用该假设的异步操作来实现基于事件的通知,这将对我的 call 者更加方便。

该类的公共(public)接口(interface)是这样的:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}
StartSendingEventsStopSendingEvents做广告;为了便于讨论,我们可以假设事件总是在不失去一般性的情况下发送的。

该类在内部使用以下两个字段:
    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

该类的功能在以下方法中实现。要使球滚动:
    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

要从Stream中读取数据而不会阻塞,并且也不需要回车符,则调用BeginRead:
    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

最具挑战性的部分:
BeginRead需要使用缓冲区。这意味着从流中读取时,可读取的字节(“传入块”)可能大于缓冲区。
请记住,这里的目标是读取所有块,并为每个块准确地调用一次 call 事件订阅者。

为此,如果EndRead之后缓冲区已满,我们不会立即将其内容发送给订阅者,而是将其追加到StringBuilder。仅在不再需要从流中读取时,才发送回StringBuilder的内容。
    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

在完成不足以填充缓冲区的任何读取之后(在这种情况下,我们知道在上一次读取操作期间没有更多数据要读取),所有累积的数据都将发送到订户:
    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this,
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(我知道,只要没有订阅者,数据就会永远积累起来。这是一个有意的决定)。

好的

该方案几乎完美地起作用:
  • 异步功能而不会产生任何线程
  • 非常方便调用代码(只需订阅一个事件)
  • 每次可以读取数据时,永远不会发生一个以上的事件。
  • 几乎与缓冲区大小
  • 无关

    错误的

    最后那几乎是一个很大的。考虑当传入的块的长度恰好等于缓冲区的大小时会发生什么。该块将被读取和缓冲,但不会触发该事件。接下来是BeginRead,它希望找到属于当前块的更多数据以便将其全部发送回去,但是...流中将不再有数据。

    实际上,只要将数据按长度恰好等于缓冲区大小的块形式放入流中,数据就会被缓冲,并且事件将永远不会被触发。

    实际上,这种情况极不可能发生,特别是因为我们可以为缓冲区大小选择任何数字,但是问题就在那里。

    解决方案?

    不幸的是,在检查了FileStreamStreamReader的可用方法之后,我找不到任何让我窥视流同时允许在其上使用异步方法的东西。

    一种“解决方案”是在检测到“缓冲区已满”情况之后,让线程等待ManualResetEvent。如果在短时间内未通过异步回调通知事件,则将不会再接收来自流的更多数据,并且应将迄今累积的数据发送给订户。但是,这引入了对另一个线程的需求,需要线程同步,并且明显不够优雅。

    BeginRead指定一个超时也足够了(不时地回调我的代码,这样我就可以检查是否有数据要发回;大多数情况下,没有任何事情可以做,因此我希望性能受到影响微不足道)。但是看来FileStream不支持超时。

    由于我想象带有超时的异步调用是裸Win32中的一个选项,因此另一种方法可能是PInvoke解决问题。但这也是不希望的,因为这会带来复杂性,并且只会给代码带来痛苦。

    有解决问题的优雅方法吗?

    感谢您有足够的耐心阅读所有这些内容。

    更新:

    我在最初的写作中肯定没有很好地传达这种情况。从那以后,我对文章进行了相当多的修改,但要确保:

    问题是关于如何实现异步“读取当前所有可用数据”操作。

    我对那些花时间阅读和回答的人表示歉意,而我没有清楚地表明我的意图。

    最佳答案

    从理论上讲,我同意杰森的观点。在缓冲区可以均匀地整除大量数据的情况下,您的实现要比逻辑上的漏洞更大。我看到的最大问题是,您的读者必须对文件类型有足够的了解,才能知道如何将数据分成订阅者知道如何处理的“块”。

    流对接收或发送的内容没有内在的了解;只是他们传输数据的机制。 NetworkStream可能正在发送HTML或ZIP文件; FileStream可能正在读取文本文件或MP3。拥有此知识的是阅读器(XmlReader,TextReader,Image.FromStream()等)。因此,您的异步阅读器必须至少了解一些有关数据的知识,但是不要对这些知识进行硬编码会很有用。

    为了处理“流”数据,增量发送必须单独有用;您必须对所要了解的东西足够了解,所得到的是一个可以单独处理的“块”。我的建议是以封装的方式将这些信息提供给您的异步阅读器,方法是让您的订阅者告诉您,或者通过提供一些与侦听器分开的特定于格式的“块”(因为该阅读器正在侦听控制台输出,并且所有听众应该以同样的方式对待它,第二个计划可能会更好)。

    逻辑上的实现:

    public class MyStreamManager {
        public delegate bool ValidChunkTester(StringBuilder builder);
    
        private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
        public event ValidChunkTester IsValidChunk
        { add{validators.Add(value);} remove {validators.Remove(value);}}
    
        public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;
    
    
        public void StartSendingEvents();
        public void StopSendingEvents();
    }
    
    ...
    
    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }
    
        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);
    
        if (validators.Any() && StandardOutputRead !-= null
                && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
            this.OnInputRead(); // send when all listeners can work with the buffer contents
        }
    
        this.BeginReadAsync(); // continue "looping" with BeginRead
    }
    
    ...
    

    此模型要求订阅者不要修改StringBuilder;否则,不能更改。您可以为他们提供一些不变的东西,以供他们选择。示例侦听器可能是:
    public bool IsACompleteLine(StringBuilder builder)
    {
        return builder.Contains(Environment.NewLine);
    }
    

    要么:
    public bool Contains256Bytes(StringBuilder builder)
    {
        return builder.Length >= 256;
    }
    

    ...你明白了。从概念上讲,确定要释放给侦听器的当前缓冲区是否有值(value)的事件与侦听器本身是分开的,但不必具体如此,因此它将支持单个特定于输出的测试或多个基于侦听器的测试。

    08-27 18:14
    查看更多