我目前正在为NamedPipeServerStream / NamedPipeClientStream写一个小的包装,完全基于Event,反对使用AsyncCallbacks

我公开了几乎所有可能的同步和异步方法(连接/等待连接,编写等),因此,例如,如果消费者想要启动服务器实例并在客户端连接时发送消息,则可以完全同步路线并做类似...

var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");


或类似...的异步方式

var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately


但是,我正在努力寻找一种很好的方法来阅读邮件。当前,当读取一条消息时,我会触发一个MessageAvailable事件,并将该消息作为参数之一传入。

我只是无法提出实现同步读取的正确方法。

我考虑的是以下内容:

具有GetNextMessage()同步方法来获取消息。在内部,这可以通过两种不同的方式处理:


我可以保留所有尚未使用的消息的IEnumerable<Message>。因此,只要对方发送了一条消息,我就会从流中读取该消息并将其存储在内存中,以便以后可以由GetNextMessage()使用。优点是,一旦写入消息,它就几乎释放了流,因此它不会阻止另一端发送其他消息。缺点是我绝对无法控制要保存的消息数或大小。我的IEnumerable<Message>可能最终拥有价值10GB的未消耗消息,并且由于无法强迫使用者检索消息,因此我无能为力。
我可以认为我只将一条消息存储在内部缓冲区中,并且一旦通过GetNextMessage()消耗了一条消息,就再也不会开始读取。如果我这样做,则将阻止另一端写其他消息,直到消耗掉前一条消息为止。更确切地说,另一端将能够写入直到流满为止。可以是多个小的完整消息或单个不完整消息。对于单个消息不完整的情况,我认为这是一种较差的方法,因为在发送消息的第一部分和后续部分之间,另一端可能最终断开连接,整个消息将丢失。


更麻烦的是,在上述两种方法中,总是有消费者使用事件来接收消息的事件(记住事件包含收到的消息),因此不需要GetNextMessage()。我要么需要完全停止在事件中发送消息,要么找到一种方法,如果消息是通过事件使用的,则不将事件推送到内部缓冲区。虽然我可以轻松判断是否有事件处理程序,但无法知道消息是否在那里实际处理过(即,考虑一个实现该事件的类并监听该事件,但对该事件不执行任何操作)。我在这里看到的唯一真实的方法是从事件中删除消息,迫使消费者始终调用GetNextMessage(),但是对其他想法持开放态度。

这两种方法都存在另一个问题,那就是如果使用WriteAsync()(或从不同线程使用Write()),我无法控制消息的发送顺序。

谁能想到解决这个问题的更好方法?

最佳答案

我建议采用以下方法。创建界面:

public interface ISubscription : IDisposable {
    Message NextMessage(TimeSpan? timeout);
}

public class Message {

}


然后像这样实现:

public class NamedPipeServer {
    public void StartAndWait() {

    }

    public ISubscription StartAndSubscribe() {
        // prevent race condition before Start and subscribing to MessageAvailable
        var subscription = new Subscription(this);
        StartAndWait();
        return subscription;
    }

    public ISubscription Subscribe() {
        // if user wants to subscribe and some point after start - why not
        return new Subscription(this);
    }

    public event Action<Message> MessageAvailable;

    private class Subscription : ISubscription {
        // buffer
        private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
            new ConcurrentQueue<Message>());

        private readonly NamedPipeServer _server;

        public Subscription(NamedPipeServer server) {
            // subscribe to event
            _server = server;
            _server.MessageAvailable += OnMessageAvailable;
        }

        public Message NextMessage(TimeSpan? timeout) {
            // this is blocking call
            if (timeout == null)
                return _queue.Take();
            else {
                Message tmp;
                if (_queue.TryTake(out tmp, timeout.Value))
                    return tmp;
                return null;
            }
        }

        private void OnMessageAvailable(Message msg) {
            // add to buffer
            _queue.Add(msg);
        }

        public void Dispose() {
            // clean up
            _server.MessageAvailable -= OnMessageAvailable;
            _queue.CompleteAdding();
            _queue.Dispose();
        }
    }
}


客户端然后调用SubscribeStartAndSubscribe

var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();


这样,如果没有人订阅,则不会缓冲任何消息。而且,如果有人订阅然后不消费-这是他们的问题,而不是您的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制_queue阻止集合的大小,然后将其添加到将达到限制的块中,从而阻止您的MessageAvailable事件,但是我不建议这样做。

08-04 12:44