本文介绍了如何避免处理 SubscriberSocket的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
这个实现很糟糕,因为它会在第一个订阅终止时dispose
SubscriberSocket
.当我运行它时,似乎没有任何发布.[这里使用 Rx 3.0.0]
This implementation is bad since it will end up disposing
the SubscriberSocket
when the first subscription terminates. When I run it, nothing seems to publish. [This uses Rx 3.0.0]
如何修改Receive
函数来解决这个问题?
How to modify Receive
function to fix this problem?
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive.Linq;
namespace App1
{
class MainClass
{
// publisher for testing, should be an external data publisher in real environment
public static Thread StartPublisher(PublisherSocket s)
{
s.Bind("inproc://test");
var thr = new Thread(() =>
{
Console.WriteLine("Start publishing...");
while (true)
{
Thread.Sleep(500);
bool more = false;
s.SendFrame("hello", more);
}
});
thr.Start();
return thr;
}
public static IObservable<string> Receive(SubscriberSocket subp)
{
return Observable
.Create<string>(o =>
Observable.Using<string, SubscriberSocket>(() =>
{
subp.Connect("inproc://test");
subp.Subscribe("");
return subp;
}, sub =>
Observable
.FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
h => sub.ReceiveReady += h,
h => sub.ReceiveReady -= h)
.Select(x => sub.ReceiveFrameString()))
.Subscribe(o));
}
public static void Main(string[] args)
{
var sub = new SubscriberSocket();
var pub = new PublisherSocket();
StartPublisher(pub);
Receive(sub).Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
}
推荐答案
在我看来,你应该这样做:
It seems to me that you should be doing something like this:
void Main()
{
var address = "inproc://test";
var pub = new PublisherSocket();
pub.Bind(address);
var pubSubscription =
Observable
.Interval(TimeSpan.FromMilliseconds(500.0))
.Subscribe(n => pub.SendFrame("hello", false));
Receive(address).Subscribe(Console.WriteLine);
Console.ReadLine();
}
public static IObservable<string> Receive(string address)
{
return Observable
.Create<string>(o =>
Observable.Using<string, SubscriberSocket>(() =>
{
var ss = new SubscriberSocket();
ss.Connect(address);
ss.Subscribe("");
return ss;
}, ss =>
Observable
.FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
h => ss.ReceiveReady += h,
h => ss.ReceiveReady -= h)
.Select(x => ss.ReceiveFrameString()))
.Subscribe(o));
}
首先,这意味着在 Observable.Create
中创建 SubscriberSocket
- 这是它所属的地方.
Primarily, this means creating the SubscriberSocket
inside the Observable.Create
- which is where it belongs.
这篇关于如何避免处理 SubscriberSocket的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!