我最近为这个问题提供了答案:C# - Realtime console output redirection。
经常发生的事情是,解释事物(这里是“事物”就是我解决类似问题的方式)可以使您加深理解和/或(在这种情况下)“糟糕”的时刻。我意识到实现的解决方案存在一个错误。该bug几乎没有什么实际意义,但是对于我作为开发人员而言,它具有极其重要的意义:知道我的代码有爆炸的潜力,我不能放心。
压缩错误是此问题的目的。很长的介绍,我很抱歉,让我们变得肮脏。
我想建立一个类,使我可以从控制台的标准输出Stream
接收输入。控制台输出流的类型为FileStream
;如果需要,实现可以强制转换为该值。还存在一个相关的StreamReader
可利用。
在此类中,我只需实现一件事即可实现所需的功能:异步“读取当前所有可用数据”操作。读取到流的末尾是不可行的,因为除非进程关闭控制台输出句柄,否则流将不会结束,并且不会这样做,因为它是交互式的并且在继续之前需要输入。
我将使用该假设的异步操作来实现基于事件的通知,这将对我的 call 者更加方便。
该类的公共(public)接口(interface)是这样的:
public class ConsoleAutomator {
public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;
public void StartSendingEvents();
public void StopSendingEvents();
}
StartSendingEvents
和StopSendingEvents
做广告;为了便于讨论,我们可以假设事件总是在不失去一般性的情况下发送的。该类在内部使用以下两个字段:
protected readonly StringBuilder inputAccumulator = new StringBuilder();
protected readonly byte[] buffer = new byte[256];
该类的功能在以下方法中实现。要使球滚动:
public void StartSendingEvents();
{
this.stopAutomation = false;
this.BeginReadAsync();
}
要从
Stream
中读取数据而不会阻塞,并且也不需要回车符,则调用BeginRead
: protected void BeginReadAsync()
{
if (!this.stopAutomation) {
this.StandardOutput.BaseStream.BeginRead(
this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
}
}
最具挑战性的部分:
BeginRead
需要使用缓冲区。这意味着从流中读取时,可读取的字节(“传入块”)可能大于缓冲区。请记住,这里的目标是读取所有块,并为每个块准确地调用一次 call 事件订阅者。
为此,如果
EndRead
之后缓冲区已满,我们不会立即将其内容发送给订阅者,而是将其追加到StringBuilder
。仅在不再需要从流中读取时,才发送回StringBuilder
的内容。 private void ReadHappened(IAsyncResult asyncResult)
{
var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
if (bytesRead == 0) {
this.OnAutomationStopped();
return;
}
var input = this.StandardOutput.CurrentEncoding.GetString(
this.buffer, 0, bytesRead);
this.inputAccumulator.Append(input);
if (bytesRead < this.buffer.Length) {
this.OnInputRead(); // only send back if we 're sure we got it all
}
this.BeginReadAsync(); // continue "looping" with BeginRead
}
在完成不足以填充缓冲区的任何读取之后(在这种情况下,我们知道在上一次读取操作期间没有更多数据要读取),所有累积的数据都将发送到订户:
private void OnInputRead()
{
var handler = this.StandardOutputRead;
if (handler == null) {
return;
}
handler(this,
new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
this.inputAccumulator.Clear();
}
(我知道,只要没有订阅者,数据就会永远积累起来。这是一个有意的决定)。
好的
该方案几乎完美地起作用:
错误的
最后那几乎是一个很大的。考虑当传入的块的长度恰好等于缓冲区的大小时会发生什么。该块将被读取和缓冲,但不会触发该事件。接下来是
BeginRead
,它希望找到属于当前块的更多数据以便将其全部发送回去,但是...流中将不再有数据。实际上,只要将数据按长度恰好等于缓冲区大小的块形式放入流中,数据就会被缓冲,并且事件将永远不会被触发。
实际上,这种情况极不可能发生,特别是因为我们可以为缓冲区大小选择任何数字,但是问题就在那里。
解决方案?
不幸的是,在检查了
FileStream
和StreamReader
的可用方法之后,我找不到任何让我窥视流同时允许在其上使用异步方法的东西。一种“解决方案”是在检测到“缓冲区已满”情况之后,让线程等待
ManualResetEvent
。如果在短时间内未通过异步回调通知事件,则将不会再接收来自流的更多数据,并且应将迄今累积的数据发送给订户。但是,这引入了对另一个线程的需求,需要线程同步,并且明显不够优雅。为
BeginRead
指定一个超时也足够了(不时地回调我的代码,这样我就可以检查是否有数据要发回;大多数情况下,没有任何事情可以做,因此我希望性能受到影响微不足道)。但是看来FileStream
不支持超时。由于我想象带有超时的异步调用是裸Win32中的一个选项,因此另一种方法可能是PInvoke解决问题。但这也是不希望的,因为这会带来复杂性,并且只会给代码带来痛苦。
有解决问题的优雅方法吗?
感谢您有足够的耐心阅读所有这些内容。
更新:
我在最初的写作中肯定没有很好地传达这种情况。从那以后,我对文章进行了相当多的修改,但要确保:
问题是关于如何实现异步“读取当前所有可用数据”操作。
我对那些花时间阅读和回答的人表示歉意,而我没有清楚地表明我的意图。
最佳答案
从理论上讲,我同意杰森的观点。在缓冲区可以均匀地整除大量数据的情况下,您的实现要比逻辑上的漏洞更大。我看到的最大问题是,您的读者必须对文件类型有足够的了解,才能知道如何将数据分成订阅者知道如何处理的“块”。
流对接收或发送的内容没有内在的了解;只是他们传输数据的机制。 NetworkStream可能正在发送HTML或ZIP文件; FileStream可能正在读取文本文件或MP3。拥有此知识的是阅读器(XmlReader,TextReader,Image.FromStream()等)。因此,您的异步阅读器必须至少了解一些有关数据的知识,但是不要对这些知识进行硬编码会很有用。
为了处理“流”数据,增量发送必须单独有用;您必须对所要了解的东西足够了解,所得到的是一个可以单独处理的“块”。我的建议是以封装的方式将这些信息提供给您的异步阅读器,方法是让您的订阅者告诉您,或者通过提供一些与侦听器分开的特定于格式的“块”(因为该阅读器正在侦听控制台输出,并且所有听众应该以同样的方式对待它,第二个计划可能会更好)。
逻辑上的实现:
public class MyStreamManager {
public delegate bool ValidChunkTester(StringBuilder builder);
private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
public event ValidChunkTester IsValidChunk
{ add{validators.Add(value);} remove {validators.Remove(value);}}
public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;
public void StartSendingEvents();
public void StopSendingEvents();
}
...
private void ReadHappened(IAsyncResult asyncResult)
{
var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
if (bytesRead == 0) {
this.OnAutomationStopped();
return;
}
var input = this.StandardOutput.CurrentEncoding.GetString(
this.buffer, 0, bytesRead);
this.inputAccumulator.Append(input);
if (validators.Any() && StandardOutputRead !-= null
&& validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
this.OnInputRead(); // send when all listeners can work with the buffer contents
}
this.BeginReadAsync(); // continue "looping" with BeginRead
}
...
此模型要求订阅者不要修改StringBuilder;否则,不能更改。您可以为他们提供一些不变的东西,以供他们选择。示例侦听器可能是:
public bool IsACompleteLine(StringBuilder builder)
{
return builder.Contains(Environment.NewLine);
}
要么:
public bool Contains256Bytes(StringBuilder builder)
{
return builder.Length >= 256;
}
...你明白了。从概念上讲,确定要释放给侦听器的当前缓冲区是否有值(value)的事件与侦听器本身是分开的,但不必具体如此,因此它将支持单个特定于输出的测试或多个基于侦听器的测试。