问题描述
如何以简单的方式在客户端和服务之间传递审计信息,而不必将该信息添加为所有服务方法的参数?我可以使用消息标题为呼叫设置此数据吗?
How can I pass along auditing information between clients and services in an easy way without having to add that information as arguments for all service methods? Can I use message headers to set this data for a call?
有没有办法允许服务也将其传递给下游,即,如果 ServiceA 调用调用 ServiceC 的 ServiceB,是否可以将相同的审计信息发送到第一个 A,然后在 A 对 B 的调用中,然后在 B 的调用中C?
Is there a way to allow service to pass that along downstream also, i.e., if ServiceA calls ServiceB that calls ServiceC, could the same auditing information be send to first A, then in A's call to B and then in B's call to C?
推荐答案
如果您使用的是 fabric transport 用于远程处理.如果您使用的是 Http 传输,那么您在那里就有标头,就像处理任何 http 请求一样.
There is actually a concept of headers that are passed between client and service if you are using fabric transport for remoting. If you are using Http transport then you have headers there just as you would with any http request.
注意,下面的提议并不是最简单的解决方案,但是一旦它就位就可以解决问题并且很容易使用,但是如果您在整个代码库中寻找简单的方法,这可能不是解决问题的方法走.如果是这种情况,那么我建议您简单地向所有服务方法添加一些常见的审计信息参数.当然,当某些开发人员忘记添加它或在调用下游服务时未正确设置时,最大的警告是.一切都与权衡有关,就像代码中一样:)
Note, below proposal is not the easiest solution, but it solves the issue once it is in place and it is easy to use then, but if you are looking for easy in the overall code base this might not be the way to go. If that is the case then I suggest you simply add some common audit info parameter to all your service methods. The big caveat there is of course when some developer forgets to add it or it is not set properly when calling down stream services. It's all about trade-offs, as alway in code :).
进入兔子洞
在结构传输中有两个类参与通信:IServiceRemotingClient
在客户端,以及 IServiceRemotingListener
在服务端.在来自客户端的每个请求中,消息正文和ServiceRemotingMessageHeaders
被发送.开箱即用的这些标头包括有关哪个接口(即哪个服务)和哪个方法被调用的信息(这也是底层接收器如何解包作为主体的字节数组的方式).对于通过 ActorService 对 Actor 的调用,额外的 Actor 信息也包含在这些标头中.
In fabric transport there are two classes that are involved in the communication: an instance of a IServiceRemotingClient
on the client side, and an instance of IServiceRemotingListener
on the service side. In each request from the client the messgae body and ServiceRemotingMessageHeaders
are sent. Out of the box these headers include information of which interface (i.e. which service) and which method are being called (and that's also how the underlying receiver knows how to unpack that byte array that is the body). For calls to Actors, which goes through the ActorService, additional Actor information is also included in those headers.
棘手的部分是连接到该交换并实际设置然后读取其他标题.请耐心等待,我们需要了解幕后涉及的许多类.
The tricky part is hooking into that exchange and actually setting and then reading additional headers. Please bear with me here, it's a number of classes involved in this behind the curtains that we need to understand.
服务端
当您为您的服务(例如无状态服务)设置 IServiceRemotingListener
时,您通常使用方便的扩展方法,如下所示:
When you setup the IServiceRemotingListener
for your service (example for a Stateless service) you usually use a convenience extension method, like so:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
yield return new ServiceInstanceListener(context =>
this.CreateServiceRemotingListener(this.Context));
}
(另一种方法是实现您自己的侦听器,但这并不是我们在这里真正不想做的,我们只是不想在现有基础设施之上添加东西.见下文对于那种方法.)
这是我们可以提供我们自己的侦听器的地方,类似于幕后的扩展方法.让我们首先看看扩展方法的作用.它会在您的服务项目的程序集级别上寻找特定属性:ServiceRemotingProviderAttribute
.那个是 abstract
,但是你可以使用,如果没有提供,你会得到一个默认实例,是 FabricTransportServiceRemotingProviderAttribute
.在 AssemblyInfo.cs
(或任何其他文件,它是一个程序集属性)中设置:
This is where we can provide our own listener instead, similar to what that extention method does behind the curtains. Let's first look at what that extention method does. It goes looking for a specific attribute on assembly level on your service project: ServiceRemotingProviderAttribute
. That one is abstract
, but the one that you can use, and which you will get a default instance of, if none is provided, is FabricTransportServiceRemotingProviderAttribute
. Set it in AssemblyInfo.cs
(or any other file, it's an assembly attribute):
[assembly: FabricTransportServiceRemotingProvider()]
这个属性有两个有趣的可覆盖方法:
This attribute has two interesting overridable methods:
public override IServiceRemotingListener CreateServiceRemotingListener(
ServiceContext serviceContext, IService serviceImplementation)
public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(
IServiceRemotingCallbackClient callbackClient)
这两个方法负责创建监听器和客户端工厂.这意味着它也由交易的客户端检查.这就是为什么它是服务程序集的程序集级别的属性,客户端也可以将它与 IService
我们想要与之通信的客户端的派生接口.
These two methods are responsible for creating the the listener and the client factory. That means that it is also inspected by the client side of the transaction. That is why it is an attribute on assembly level for the service assembly, the client side can also pick it up together with the IService
derived interface for the client we want to communicate with.
最终创建了一个实例 FabricTransportServiceRemotingListener
,但是在这个实现中我们不能设置我们自己特定的IServiceRemotingMessageHandler
.如果您创建自己的 FabricTransportServiceRemotingProviderAttribute
子类并覆盖它,那么您实际上可以创建一个 FabricTransportServiceRemotingListener
的实例,该实例在构造函数中接收调度程序:
The CreateServiceRemotingListener
ends up creating an instance FabricTransportServiceRemotingListener
, however in this implementation we cannot set our own specific IServiceRemotingMessageHandler
. If you create your own sub class of FabricTransportServiceRemotingProviderAttribute
and override that then you can actually make it create an instance of FabricTransportServiceRemotingListener
that takes in a dispatcher in the constructor:
public class AuditableFabricTransportServiceRemotingProviderAttribute :
FabricTransportServiceRemotingProviderAttribute
{
public override IServiceRemotingListener CreateServiceRemotingListener(
ServiceContext serviceContext, IService serviceImplementation)
{
var messageHandler = new AuditableServiceRemotingDispatcher(
serviceContext, serviceImplementation);
return (IServiceRemotingListener)new FabricTransportServiceRemotingListener(
serviceContext: serviceContext,
messageHandler: messageHandler);
}
}
AuditableServiceRemotingDispatcher
是神奇发生的地方.它是我们自己的 ServiceRemotingDispatcher
子类.覆盖RequestResponseAsync
(忽略HandleOneWay
,服务远程不支持它,如果调用它会抛出一个NotImplementedException
),像这样:
The AuditableServiceRemotingDispatcher
is where the magic happens. It is our own ServiceRemotingDispatcher
subclass. Override the RequestResponseAsync
(ignore HandleOneWay
, it is not supported by service remoting, it throws an NotImplementedException
if called), like this:
public class AuditableServiceRemotingDispatcher : ServiceRemotingDispatcher
{
public AuditableServiceRemotingDispatcher(ServiceContext serviceContext, IService service) :
base(serviceContext, service) { }
public override async Task<byte[]> RequestResponseAsync(
IServiceRemotingRequestContext requestContext,
ServiceRemotingMessageHeaders messageHeaders,
byte[] requestBodyBytes)
{
byte[] userHeader = null;
if (messageHeaders.TryGetHeaderValue("user-header", out auditHeader))
{
// Deserialize from byte[] and handle the header
}
else
{
// Throw exception?
}
byte[] result = null;
result = await base.RequestResponseAsync(requestContext, messageHeaders, requestBodyBytes);
return result;
}
}
另一种更简单但不太灵活的方法是直接在服务中使用我们的自定义调度程序的实例直接创建 FabricTransportServiceRemotingListener
的实例:
Another, easier, but less flexible way, would be to directly create an instance of FabricTransportServiceRemotingListener
with an instance of our custom dispatcher directly in the service:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
yield return new ServiceInstanceListener(context =>
new FabricTransportServiceRemotingListener(this.Context, new AuditableServiceRemotingDispatcher(context, this)));
}
为什么这不那么灵活?好吧,因为使用该属性也支持客户端,如下所示
客户端
好的,现在我们可以在接收消息时读取自定义标题,如何设置它们?让我们看看该属性的另一个方法:
Ok, so now we can read custom headers when receiving messages, how about setting those? Let's look at the other method of that attribute:
public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(IServiceRemotingCallbackClient callbackClient)
{
return (IServiceRemotingClientFactory)new FabricTransportServiceRemotingClientFactory(
callbackClient: callbackClient,
servicePartitionResolver: (IServicePartitionResolver)null,
traceId: (string)null);
}
这里我们不能只注入特定的处理程序或类似的服务,我们必须提供我们自己的自定义工厂.为了不必重新实现 FabricTransportServiceRemotingClientFactory
我只是将它封装在我自己的 IServiceRemotingClientFactory
:
Here we cannot just inject a specific handler or similar as for the service, we have to supply our own custom factory. In order not to have to reimplement the particulars of FabricTransportServiceRemotingClientFactory
I simply encapsulate it in my own implementation of IServiceRemotingClientFactory
:
public class AuditedFabricTransportServiceRemotingClientFactory : IServiceRemotingClientFactory, ICommunicationClientFactory<IServiceRemotingClient>
{
private readonly ICommunicationClientFactory<IServiceRemotingClient> _innerClientFactory;
public AuditedFabricTransportServiceRemotingClientFactory(ICommunicationClientFactory<IServiceRemotingClient> innerClientFactory)
{
_innerClientFactory = innerClientFactory;
_innerClientFactory.ClientConnected += OnClientConnected;
_innerClientFactory.ClientDisconnected += OnClientDisconnected;
}
private void OnClientConnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
{
EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientConnected = this.ClientConnected;
if (clientConnected == null) return;
clientConnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
{
Client = e.Client
});
}
private void OnClientDisconnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
{
EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientDisconnected = this.ClientDisconnected;
if (clientDisconnected == null) return;
clientDisconnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
{
Client = e.Client
});
}
public async Task<IServiceRemotingClient> GetClientAsync(
Uri serviceUri,
ServicePartitionKey partitionKey,
TargetReplicaSelector targetReplicaSelector,
string listenerName,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
{
var client = await _innerClientFactory.GetClientAsync(
serviceUri,
partitionKey,
targetReplicaSelector,
listenerName,
retrySettings,
cancellationToken);
return new AuditedFabricTransportServiceRemotingClient(client);
}
public async Task<IServiceRemotingClient> GetClientAsync(
ResolvedServicePartition previousRsp,
TargetReplicaSelector targetReplicaSelector,
string listenerName,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
{
var client = await _innerClientFactory.GetClientAsync(
previousRsp,
targetReplicaSelector,
listenerName,
retrySettings,
cancellationToken);
return new AuditedFabricTransportServiceRemotingClient(client);
}
public Task<OperationRetryControl> ReportOperationExceptionAsync(
IServiceRemotingClient client,
ExceptionInformation exceptionInformation,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
{
return _innerClientFactory.ReportOperationExceptionAsync(
client,
exceptionInformation,
retrySettings,
cancellationToken);
}
public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientConnected;
public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientDisconnected;
}
此实现只是将任何繁重的工作传递给底层工厂,同时返回它自己的可审计客户端,该客户端类似地封装了 IServiceRemotingClient
:
This implementation simply passes along anything heavy lifting to the underlying factory, while returning it's own auditable client that similarily encapsulates a IServiceRemotingClient
:
public class AuditedFabricTransportServiceRemotingClient : IServiceRemotingClient, ICommunicationClient
{
private readonly IServiceRemotingClient _innerClient;
public AuditedFabricTransportServiceRemotingClient(IServiceRemotingClient innerClient)
{
_innerClient = innerClient;
}
~AuditedFabricTransportServiceRemotingClient()
{
if (this._innerClient == null) return;
var disposable = this._innerClient as IDisposable;
disposable?.Dispose();
}
Task<byte[]> IServiceRemotingClient.RequestResponseAsync(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
{
messageHeaders.SetUser(ServiceRequestContext.Current.User);
messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
return this._innerClient.RequestResponseAsync(messageHeaders, requestBody);
}
void IServiceRemotingClient.SendOneWay(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
{
messageHeaders.SetUser(ServiceRequestContext.Current.User);
messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
this._innerClient.SendOneWay(messageHeaders, requestBody);
}
public ResolvedServicePartition ResolvedServicePartition
{
get { return this._innerClient.ResolvedServicePartition; }
set { this._innerClient.ResolvedServicePartition = value; }
}
public string ListenerName
{
get { return this._innerClient.ListenerName; }
set { this._innerClient.ListenerName = value; }
}
public ResolvedServiceEndpoint Endpoint
{
get { return this._innerClient.Endpoint; }
set { this._innerClient.Endpoint = value; }
}
}
现在,在这里我们实际(并最终)设置了我们想要传递给服务的审核名称.
Now, in here is where we actually (and finally) set the audit name that we want to pass along to the service.
调用链和服务请求上下文
最后一块拼图是 ServiceRequestContext,它是一个自定义类,允许我们处理服务请求调用的环境上下文.这是相关的,因为它为我们提供了一种在调用链中传播上下文信息的简单方法,例如用户或相关 ID(或我们希望在客户端和服务之间传递的任何其他标头信息).实现 ServiceRequestContext
看起来像:
One final piece of the puzzle, the ServiceRequestContext, which is a custom class that allows us to handle an ambient context for a service request call. This is relevant because it gives us an easy way to propagate that context information, like the user or a correlation id (or any other header information we want to pass between client and service), in a chain of calls. The implementation ServiceRequestContext
looks like:
public sealed class ServiceRequestContext
{
private static readonly string ContextKey = Guid.NewGuid().ToString();
public ServiceRequestContext(Guid correlationId, string user)
{
this.CorrelationId = correlationId;
this.User = user;
}
public Guid CorrelationId { get; private set; }
public string User { get; private set; }
public static ServiceRequestContext Current
{
get { return (ServiceRequestContext)CallContext.LogicalGetData(ContextKey); }
internal set
{
if (value == null)
{
CallContext.FreeNamedDataSlot(ContextKey);
}
else
{
CallContext.LogicalSetData(ContextKey, value);
}
}
}
public static Task RunInRequestContext(Func<Task> action, Guid correlationId, string user)
{
Task<Task> task = null;
task = new Task<Task>(async () =>
{
Debug.Assert(ServiceRequestContext.Current == null);
ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
try
{
await action();
}
finally
{
ServiceRequestContext.Current = null;
}
});
task.Start();
return task.Unwrap();
}
public static Task<TResult> RunInRequestContext<TResult>(Func<Task<TResult>> action, Guid correlationId, string user)
{
Task<Task<TResult>> task = null;
task = new Task<Task<TResult>>(async () =>
{
Debug.Assert(ServiceRequestContext.Current == null);
ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
try
{
return await action();
}
finally
{
ServiceRequestContext.Current = null;
}
});
task.Start();
return task.Unwrap<TResult>();
}
}
这最后一部分深受 Stephen Cleary 的回答的影响.它为我们提供了一种简单的方法来处理调用层次结构下的环境信息,天气它们在任务上是同步的还是异步的.现在,有了这个,我们也可以在服务端的 Dispatcher 中设置该信息:
This last part was much influenced by the SO answer by Stephen Cleary. It gives us an easy way to handle the ambient information down a hierarcy of calls, weather they are synchronous or asyncronous over Tasks. Now, with this we have a way of setting that information also in the Dispatcher on the service side:
public override Task<byte[]> RequestResponseAsync(
IServiceRemotingRequestContext requestContext,
ServiceRemotingMessageHeaders messageHeaders,
byte[] requestBody)
{
var user = messageHeaders.GetUser();
var correlationId = messageHeaders.GetCorrelationId();
return ServiceRequestContext.RunInRequestContext(async () =>
await base.RequestResponseAsync(
requestContext,
messageHeaders,
requestBody),
correlationId, user);
}
(GetUser()
和 GetCorrelationId()
只是获取和解包客户端设置的标头的辅助方法)
(GetUser()
and GetCorrelationId()
are just helper methods that gets and unpacks the headers set by the client)
有了这个意味着服务为任何附加调用创建的任何新客户端也将设置 sam 标头,因此在场景 ServiceA -> ServiceB -> ServiceC 中,我们仍将在调用中设置相同的用户从 ServiceB 到 ServiceC.
Having this in place means that any new client created by the service for any aditional call will also have the sam headers set, so in the scenario ServiceA -> ServiceB -> ServiceC we will still have the same user set in the call from ServiceB to ServiceC.
什么?这么容易?是的;)
从服务内部,例如无状态 OWIN web api,您首先捕获用户信息,然后创建 ServiceProxyFactory
的实例并将该调用包装在 ServiceRequestContext
中>:
From inside a service, for instance a Stateless OWIN web api, where you first capture the user information, you create an instance of ServiceProxyFactory
and wrap that call in a ServiceRequestContext
:
var task = ServiceRequestContext.RunInRequestContext(async () =>
{
var serviceA = ServiceProxyFactory.CreateServiceProxy<IServiceA>(new Uri($"{FabricRuntime.GetActivationContext().ApplicationName}/ServiceA"));
await serviceA.DoStuffAsync(CancellationToken.None);
}, Guid.NewGuid(), user);
好的,总结一下 - 您可以连接到服务远程处理来设置您自己的标头.正如我们在上面看到的,需要做一些工作来获得一种机制,主要是创建你自己的底层基础设施的子类.好处是,一旦你有了这个,你就有了一个非常简单的方法来审核你的服务调用.
Ok, so to sum it up - you can hook into the service remoting to set your own headers. As we see above there is some work that needs to be done to get a mechanism for that in place, mainly creating your own subclasses of the underlying infrastructure. The upside is that once you have this in place, then you have a very easy way for auditing your service calls.
这篇关于在 Service Fabric 传输中调用 Reliable Services 时传递用户和审核信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!