本文介绍了如何避免处理 SubscriberSocket的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个实现很糟糕,因为它会在第一个订阅终止时disposeSubscriberSocket.当我运行它时,似乎没有任何发布.[这里使用 Rx 3.0.0]

This implementation is bad since it will end up disposing the SubscriberSocket when the first subscription terminates. When I run it, nothing seems to publish. [This uses Rx 3.0.0]

如何修改Receive函数来解决这个问题?

How to modify Receive function to fix this problem?

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

using NetMQ;
using NetMQ.Sockets;

using System.Reactive.Linq;

namespace App1
{
    class MainClass
    {
        // publisher for testing, should be an external data publisher in real environment
        public static Thread StartPublisher(PublisherSocket s)
        {
            s.Bind("inproc://test");
            var thr = new Thread(() =>
            {
                Console.WriteLine("Start publishing...");
                while (true)
                {
                    Thread.Sleep(500);
                    bool more = false;
                    s.SendFrame("hello", more);
                }
            });

            thr.Start();
            return thr;
        }

        public static IObservable<string> Receive(SubscriberSocket subp)
        {
            return Observable
                .Create<string>(o =>
                    Observable.Using<string, SubscriberSocket>(() =>
                    {
                        subp.Connect("inproc://test");
                        subp.Subscribe("");
                        return subp;
                    }, sub =>
                        Observable
                        .FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
                            h => sub.ReceiveReady += h,
                            h => sub.ReceiveReady -= h)
                         .Select(x => sub.ReceiveFrameString()))
                .Subscribe(o));
        }

        public static void Main(string[] args)
        {
            var sub = new SubscriberSocket();
            var pub = new PublisherSocket();

            StartPublisher(pub);
            Receive(sub).Subscribe(Console.WriteLine);

            Console.ReadLine();
        }
    }
}

推荐答案

在我看来,你应该这样做:

It seems to me that you should be doing something like this:

void Main()
{
    var address = "inproc://test";

    var pub = new PublisherSocket();
    pub.Bind(address);

    var pubSubscription =
        Observable
            .Interval(TimeSpan.FromMilliseconds(500.0))
            .Subscribe(n => pub.SendFrame("hello", false));

    Receive(address).Subscribe(Console.WriteLine);

    Console.ReadLine();
}

public static IObservable<string> Receive(string address)
{
    return Observable
        .Create<string>(o =>
            Observable.Using<string, SubscriberSocket>(() =>
            {
                var ss = new SubscriberSocket();
                ss.Connect(address);
                ss.Subscribe("");
                return ss;
            }, ss =>
                Observable
                .FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
                    h => ss.ReceiveReady += h,
                    h => ss.ReceiveReady -= h)
                 .Select(x => ss.ReceiveFrameString()))
        .Subscribe(o));
}

首先,这意味着在 Observable.Create 中创建 SubscriberSocket - 这是它所属的地方.

Primarily, this means creating the SubscriberSocket inside the Observable.Create - which is where it belongs.

这篇关于如何避免处理 SubscriberSocket的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-16 14:12