基于NeqMq 4.0.0-rc5版本

发布端:
using (var publisher = new PublisherSocket())
{
publisher.Bind("tcp://*:5556");
var rng = new Random(); while (true)
{
Stopwatch sw = new Stopwatch();
sw.Start(); Console.WriteLine();
Console.WriteLine();
var list = GetList<Messages>(sql); var wards = list.GroupBy(s => s.所属病区).Select(s => s.Key).ToList(); foreach (var item in wards)
{
publisher.SendMoreFrame(item);//发送信封标识,用于消息过滤
var msg = string.Join(";", list.FindAll(s => s.所属病区 == item).Select(s => s.Message));
publisher.SendFrame(msg);
//Console.WriteLine($"{item}");
}
//int zipcode = rng.Next(0, 99999);
//int temperature = rng.Next(-80, 135);
//int relhumidity = rng.Next(0, 90);
//Console.WriteLine($"{zipcode} {temperature} {relhumidity}");
//publisher.SendMoreFrame("AAA");//发送信封标识,用于消息过滤
//publisher.SendFrame($"{zipcode} {temperature} {relhumidity}"); sw.Stop();
Console.WriteLine("用时:" + sw.ElapsedMilliseconds);
Thread.Sleep();
} }
订阅端:
using (var subscriber = new SubscriberSocket())
{
subscriber.Connect("tcp://172.16.131.222:5556");
//subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));
//订阅多个信封 执行多次Subscribe即可
//订阅全部消息 SubscribeToAnyTopic();方法 或 Subscribe("")内参数传空
subscriber.Subscribe(wards[r.Next(, wards.Count - )]);//订阅消息,根据消息信封过滤消息
//subscriber.Subscribe("11384");
//subscriber.SubscribeToAnyTopic();
subscriber.Subscribe(wards[r.Next(, wards.Count - )]);
subscriber.Subscribe(wards[r.Next(, wards.Count - )]); while (true)
{
//subscriber.SkipFrame();//跳过消息信封,只接收消息内容
string results = subscriber.ReceiveFrameString();
Console.WriteLine(results);
}
}
05-07 15:45