我正在尝试将消息推送到azure服务总线上的某个主题,但当我这样做时,会收到以下异常:
System.InvalidOperationException was unhandled by user code
HResult=-2146233079
Message=The only supported IsolationLevel is 'IsolationLevel.Serializable'.
Source=Microsoft.ServiceBus
StackTrace:
Server stack trace:
at Microsoft.ServiceBus.Messaging.Sbmp.SbmpResourceManager.EnlistAsyncResult..ctor(SbmpResourceManager resourceManager, Transaction transaction, IRequestSessionChannel channel, SbmpMessageCreator messageCreator, Action`1 partitionInfoSetter, TimeSpan timeout, AsyncCallback callback, Object state)
at Microsoft.ServiceBus.Messaging.Sbmp.SbmpResourceManager.BeginEnlist(Transaction transaction, IRequestSessionChannel channel, SbmpMessageCreator messageCreator, Action`1 partitionInfoSetter, TimeSpan timeout, AsyncCallback callback, Object state)
at Microsoft.ServiceBus.Messaging.Sbmp.SbmpTransactionalAsyncResult`1.<>c__DisplayClass38.<GetAsyncSteps>b__32(TIteratorAsyncResult thisPtr, TimeSpan t, AsyncCallback c, Object s)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.EnumerateSteps(CurrentThreadType state)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.Start()
Exception rethrown at [0]:
at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageSender.EndSendCommand(IAsyncResult result)
at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageSender.OnEndSend(IAsyncResult result)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.EnumerateSteps(CurrentThreadType state)
Exception rethrown at [1]:
at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.RunSynchronously()
at Microsoft.ServiceBus.Messaging.MessageSender.Send(TrackingContext trackingContext, IEnumerable`1 messages, TimeSpan timeout)
at CryptoArb.Infrastructure.AzureStorage.ServiceBus.AzureServiceBusService.Send(String label, Object message)
服务总线代码如下:
public class AzureServiceBusService : IServiceBusService
{
public ISettingsService SettingsService { get; set; }
private NamespaceManager _namespaceManager;
private string _topic;
private TopicClient _topicClient;
private TopicDescription _topicDescription;
private string _connectionString;
private SubscriptionClient _subscriptionClient;
private string _subscriptionName = "AllMessages";
public string Topic
{
get { return _topic; }
set
{
_topic = value;
_topicDescription = null;
_topicClient = null;
}
}
public SubscriptionClient SubscriptionClient
{
get
{
if (_subscriptionClient != null) return _subscriptionClient;
if (!NamespaceManager.SubscriptionExists(TopicDescription.Path, _subscriptionName))
NamespaceManager.CreateSubscription(TopicDescription.Path, _subscriptionName);
_subscriptionClient = SubscriptionClient.CreateFromConnectionString(ConnectionString, TopicDescription.Path,
_subscriptionName);
return _subscriptionClient;
}
}
internal string ConnectionString
{
get
{
if (String.IsNullOrWhiteSpace(_connectionString))
_connectionString = SettingsService.ConnectionStrings[SettingsService.MainServiceBusConfigName].ConnectionString;
return _connectionString;
}
}
internal TopicClient TopicClient
{
get {
return _topicClient ??
(_topicClient = TopicClient.CreateFromConnectionString(ConnectionString, TopicDescription.Path));
}
}
internal TopicDescription TopicDescription
{
get
{
if (_topicDescription != null) return _topicDescription;
if (!NamespaceManager.TopicExists(_topic))
NamespaceManager.CreateTopic(_topic);
_topicDescription = NamespaceManager.GetTopic(_topic);
return _topicDescription;
}
}
internal NamespaceManager NamespaceManager
{
get {
if (_namespaceManager == null)
{
_namespaceManager = NamespaceManager.CreateFromConnectionString(ConnectionString);
_namespaceManager.Settings.RetryPolicy = RetryExponential.Default;
}
return _namespaceManager;
}
}
public AzureServiceBusService()
{
_topic = "default";
}
public AzureServiceBusService(string topic)
{
_topic = topic;
}
public void Send(string label, object message)
{
var brokeredMessage = new BrokeredMessage(message)
{
Label = label,
};
brokeredMessage.Properties["messageType"] = message.GetType().AssemblyQualifiedName;
TopicClient.Send(brokeredMessage);
}
public ServiceBusMessage Receive()
{
var receivedMessage = SubscriptionClient.Receive();
if (receivedMessage == null)
return null;
else
{
try
{
Type messageBodyType = null;
if (receivedMessage.Properties.ContainsKey("messageType"))
messageBodyType = Type.GetType(receivedMessage.Properties["messageType"].ToString());
if (messageBodyType == null)
{
//Should never get here as a messagebodytype should
//always be set BEFORE putting the message on the queue
receivedMessage.DeadLetter();
}
var method = typeof(BrokeredMessage).GetMethod("GetBody", new Type[] { });
var generic = method.MakeGenericMethod(messageBodyType);
var messageBody = generic.Invoke(receivedMessage, null);
var serviceBusMessage = new ServiceBusMessage()
{
Body = messageBody,
MessageId = receivedMessage.MessageId
};
receivedMessage.Complete();
return serviceBusMessage;
}
catch (Exception e)
{
receivedMessage.Abandon();
return null;
}
}
}
public string SubscriptionName
{
get { return _subscriptionName; }
set { _subscriptionName = value; }
}
}
我已经确保连接到服务总线的连接字符串是有效的,并且检查已经到位,以确保主题和订阅在尝试使用它们之前存在。
为什么我会有例外?
最佳答案
确保您不在另一个交易下。
要用自己的隔离级别隔离代码,可以用
using (var transaction = new TransactionScope(TransactionScopeOption.RequiresNew,
new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}))
{
...sending to the queue
transaction.Complete();
}