首先本地安装微软的消息队列服务器。

微软消息队列MessageQueue(MQ)-LMLPHP

基础类:

namespace Core.MessageQueueTest
{
public class TestQueue : IDisposable
{
protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic;
protected MessageQueue queue;
protected TimeSpan timeout;
public TestQueue(string queuePath, int timeoutSeconds)
{
queue = new MessageQueue(queuePath);
timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds)); // Performance optimization since we don't need these features
queue.DefaultPropertiesToSend.AttachSenderId = false;
queue.DefaultPropertiesToSend.UseAuthentication = false;
queue.DefaultPropertiesToSend.UseEncryption = false;
queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None;
queue.DefaultPropertiesToSend.UseJournalQueue = false;
} /// <summary>
/// Derived classes call this from their own Receive methods but cast
/// the return value to something meaningful.
/// </summary>
public virtual object Receive()
{
try
{
using (Message message = queue.Receive(timeout, transactionType))
return message;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
throw new TimeoutException(); throw;
}
} /// <summary>
/// Derived classes may call this from their own Send methods that
/// accept meaningful objects.
/// </summary>
public virtual void Send(object msg)
{
queue.Send(msg, transactionType);
} #region IDisposable Members
public void Dispose()
{
queue.Dispose();
}
#endregion }
}

测试类:

namespace Core.MessageQueueTest
{
public class QueueTestOrder : TestQueue
{
// Path example - FormatName:DIRECT=OS:MyMachineName\Private$\OrderQueueName
private static readonly string queuePath =System.Configuration.ConfigurationSettings.AppSettings["OrderQueuePath"];
private static int queueTimeout = ;
public QueueTestOrder()
: base(queuePath, queueTimeout)
{
// Set the queue to use Binary formatter for smaller foot print and performance
queue.Formatter = new BinaryMessageFormatter();
}
/// <summary>
/// Method to retrieve order messages from Message Queue
/// </summary>
/// <returns>All information for an order</returns>
public new OrderInfo Receive()
{
// This method involves in distributed transaction and need Automatic Transaction type
base.transactionType = MessageQueueTransactionType.Automatic;
return (OrderInfo)((Message)base.Receive()).Body;
} public OrderInfo Receive(int timeout)
{
base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout));
return Receive();
} /// <summary>
/// Method to send asynchronous order to Message Queue
/// </summary>
/// <param name="orderMessage">All information for an order</param>
public void Send(OrderInfo orderMessage)
{
// This method does not involve in distributed transaction and optimizes performance using Single type
base.transactionType = MessageQueueTransactionType.Single;
base.Send(orderMessage);
}
}
[Serializable]
public class OrderInfo
{
public string Id { get; set; }
public string Name { get; set; }
}
}

测试:

 protected void btn_testSendToQueue_Click(object sender, EventArgs e)
{
OrderInfo model = new OrderInfo();
model.Id = "";
model.Name = "测试MQ";
QueueTestOrder order = new QueueTestOrder();
order.Send(model);
} protected void btn_reciveFromQueue_Click(object sender, EventArgs e)
{
QueueTestOrder order = new QueueTestOrder();
OrderInfo model = new OrderInfo();
model = (OrderInfo)order.Receive();
}

测试结果:发送\接收信息 正常

05-04 06:28