很久没po文章了,但是看到.Net里关于ActiveMQ发送复杂类型的文章确实太少了,所以贴出来和大家分享
发布:
//消息发布
public class Publisher
{
private IConnection _connection;
private ISession _session;
private IMessageProducer _producer; /// <summary>
/// 初始化
/// </summary>
/// <param name="brokerUrl">广播地址</param>
/// <param name="queueDestination">队列目标</param>
public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
{
try
{
IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
_connection = connectionFactory.CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
IDestination destination = _session.GetTopic(queueDestination);
_producer = _session.CreateProducer(destination);
}
catch (Exception e)
{
Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
}
} public void Close()
{
_session.Close();
_connection.Close();
} /// <summary>
/// 发送普通字符串消息
/// </summary>
/// <param name="text">字符串</param>
public void SendText(string text)
{
ITextMessage objecto = _producer.CreateTextMessage(text);
_producer.Send(objecto);
} /// <summary>
/// 发送对象消息
/// </summary>
/// <param name="mapMessages">MapMessage对象</param>
/// <returns></returns>
public bool SendObject(List<MapMessage> mapMessages)
{
bool result = true;
if (mapMessages == null || mapMessages.Count < 0) return false;
foreach (var mapMessage in mapMessages)
{
var message = _producer.CreateMapMessage();
ActiveCommon.SetMapMessage<MapMessage>(message, mapMessage);
try
{
_producer.Send(message);
result = true;
}
catch (Exception e)
{
Log.Error($"activemq发送美好异常:{e.InnerException.ToString()}");
result = false;
}
}
return result;
} /// <summary>
/// 获取对象XML结果
/// </summary>
/// <param name="m">对象</param>
/// <returns></returns>
public string GetXmlStr(object m)
{
return _producer.CreateXmlMessage(m).Text;
}
}
订阅:
//消息订阅
class Subsriber
{
private IConnection _connection;
private ISession _session;
private IMessageConsumer _consumer; /// <summary>
/// 初始化
/// </summary>
/// <param name="brokerUrl">广播地址</param>
/// <param name="queueDestination">队列目标</param>
public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic")
{
try
{
IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl);
_connection = connectionFactory.CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
IDestination destination = _session.GetTopic(queueDestination);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += _consumer_Listener; }
catch (Exception e)
{
Log.Error($"activemq初始化异常:{e.InnerException.ToString()}");
} } private void _consumer_Listener(IMessage message)
{
var model = ActiveCommon.GetMapMessageByIMapMessage((IMapMessage)message);
Log.Infor($"订阅接收:{_session.CreateXmlMessage(model).Text}");
}
}
复杂类型处理:
public class ActiveCommon
{
//设置Message的Body信息
public static void SetMapMessage<T>(IMapMessage mapMessage, T messages)
{
if (mapMessage == null || object.Equals(messages, null))
{
return;
} foreach (var propertyInfo in messages.GetType().GetProperties())
{
if (propertyInfo.PropertyType.Name == "String")
mapMessage.Body.SetString(propertyInfo.Name, Convert.ToString(propertyInfo.GetValue(messages, null)));
else
mapMessage.Body.SetInt(propertyInfo.Name, Convert.ToInt16(propertyInfo.GetValue(messages, null)));
}
} public static MapMessage GetMapMessageByIMapMessage(IMapMessage mapMessage)
{
if (mapMessage == null)
{
return null;
} var MapMessage = new MapMessage();
foreach (var propertyInfo in MapMessage.GetType().GetProperties())
{
propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name], null);
} return MapMessage;
} public static T GetMapMessageByIMapMessage<T>(IMapMessage mapMessage, T MapMessage)
{
if (mapMessage == null || object.Equals(MapMessage, null))
{
return default(T);
} foreach (var propertyInfo in MapMessage.GetType().GetProperties())
{
propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name.ToUpper()], null);
} return MapMessage;
}
}
重点是跨站点和跨服务器传输的时候,需要通过Message的Body去设置传输参数