1、 为什么要用polly
前面的项目中,一个服务调用另一个(Zhengwei.Identity调用Zhengwei.Use.Api)服务时是直接调用的,在这个调用的过程中可能会发生各种瞬态故障,这里的说的瞬态故障包含了程序发生的异常和出现不符合开发者预期的结果。所谓瞬态故障,就是说故障不是必然会发生的,而是偶然可能会发生的,比如网络偶尔会突然出现不稳定或无法访问这种故障。Polly对于这些故障会有自己的处理策略
2、 Polly 的七种策略
- 重试:出现故障自动重试。
- 熔断:当系统遇到严重问题时,快速回馈失败比让用户/调用者等待要好,限制系统出错的体量,有助于系统恢复。比如,当我们去调一个第三方的 API,有很长一段时间 API 都没有响应,可能对方服务器瘫痪了。如果我们的系统还不停地重试,不仅会加重系统的负担,还会可能导致系统其它任务受影响。所以,当系统出错的次数超过了指定的阈值,就要中断当前线路,等待一段时间后再继续。
- 超时:当系统超过一定时间的等待,我们就几乎可以判断不可能会有成功的结果。比如平时一个网络请求瞬间就完成了,如果有一次网络请求超过了 30 秒还没完成,我们就知道这次大概率是不会返回成功的结果了。因此,我们需要设置系统的超时时间,避免系统长时间做无谓的等待。
- 隔离:当系统的一处出现故障时,可能促发多个失败的调用,很容易耗尽主机的资源(如 CPU)。下游系统出现故障可能导致上游的故障的调用,甚至可能蔓延到导致系统崩溃。所以要将可控的操作限制在一个固定大小的资源池中,以隔离有潜在可能相互影响的操作。
- 回退:有些错误无法避免,就要有备用的方案。这个就像浏览器不支持一些新的 CSS 特性就要额外引用一个 polyfill 一样。一般情况,当无法避免的错误发生时,我们要有一个合理的返回来代替失败,比如很常见的一个场景是,当用户没有上传头像时,我们就给他一个默认头像
- 缓存:一般我们会把频繁使用且不会怎么变化的资源缓存起来,以提高系统的响应速度。如果不对缓存资源的调用进行封装,那么我们调用的时候就要先判断缓存中有没有这个资源,有的话就从缓存返回,否则就从资源存储的地方(比如数据库)获取后缓存起来,再返回,而且有时还要考虑缓存过期和如何更新缓存的问题。Polly 提供了缓存策略的支持,使得问题变得简单
- 策略包:一种操作会有多种不同的故障,而不同的故障处理需要不同的策略。这些不同的策略必须包在一起,作为一个策略包,才能应用在同一种操作上。这就是文章开头说的 Polly 的弹性,即各种不同的策略能够灵活地组合起来。
3、 polly在项目中集成
polly主要做什么的了解后我们就开始在项目中进行集成吧。
a. 在解决方案中新建一个类库,名叫:Resilience,引用NuGet包Pollyt和Newtonsoft.Json包。
b. 在该类库中创建接口IHttpClient.cs,接口中有三个方法, 这个接口的主要目的是为了替换之前(Zhengwei.Identity服务中UserService.cs类CheckOrCreate方法中在调用Zhengwei.Use.Api服务)使用的System.Net.Http. HttpClient对象,代码如下:
public interface IHttpClient { Task<HttpResponseMessage> PostAsync<T>(string url, T item, string authorizationToken = null, string requestId = null, string authorizationMethod = "Beare"); Task<HttpResponseMessage> DoPostPutAsync(HttpMethod method, string url, Func<HttpRequestMessage> requestMessageAction, string authorizationToken = null, string requestId = null, string authorizationMethod = "Beare"); Task<HttpResponseMessage> PostAsync(string url, Dictionary<string, string> form, string authorizationToken = null, string requestId = null, string authorizationMethod = "Beare"); }
c. 在该类库中新建类ResilienceHttpClient.cs来实现IHttpClient.cs接口,代码如下
public class ResilienceHttpClient : IHttpClient { //根据url origin去创建policy private HttpClient _httpClient; //把policy打包成组合policy wraper,进行本地缓存。 private readonly Func<string, IEnumerable<Policy>> _policyCreator; private readonly ConcurrentDictionary<string, PolicyWrap> _policyWraps; private ILogger<ResilienceHttpClient> _logger; private IHttpContextAccessor _httpContextAccessor; public ResilienceHttpClient(Func<string, IEnumerable<Policy>> policyCreator, ILogger<ResilienceHttpClient> logger, IHttpContextAccessor httpContextAccessor) { _httpClient = new HttpClient(); _policyWraps = new ConcurrentDictionary<string, PolicyWrap>(); _policyCreator = policyCreator; _logger = logger; _httpContextAccessor = httpContextAccessor; } public async Task<HttpResponseMessage> PostAsync<T>(string url, T item, string authorizationToken=null, string requestId = null, string authorizationMethod = "Beare") { Func<HttpRequestMessage> func = () => CreateHttpRequestMessage(HttpMethod.Post, url, item); return await DoPostPutAsync(HttpMethod.Post, url, func, authorizationToken, requestId, authorizationMethod); } public async Task<HttpResponseMessage> PostAsync(string url, Dictionary<string, string> form, string authorizationToken=null, string requestId = null, string authorizationMethod = "Beare") { Func<HttpRequestMessage> func = () => CreateHttpRequestMessage(HttpMethod.Post, url, form); return await DoPostPutAsync(HttpMethod.Post, url,func, authorizationToken, requestId, authorizationMethod); } public Task<HttpResponseMessage> DoPostPutAsync(HttpMethod method,string url,Func<HttpRequestMessage> requestMessageAction,string authorizationToken=null, string requestId = null, string authorizationMethod = "Beare") { if(method != HttpMethod.Post && method != HttpMethod.Put) { throw new ArgumentException("Value must be either post or put", nameof(method)); } var origin = GetOriginFromUri(url); return HttpInvoker(origin,async () => { HttpRequestMessage requestMessage = requestMessageAction(); SetAuthorizationHeader(requestMessage); if (authorizationToken != null) { requestMessage.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue(authorizationMethod, authorizationToken); } if(requestId != null) { requestMessage.Headers.Add("x-requestid", requestId); } var response = await _httpClient.SendAsync(requestMessage); if(response.StatusCode == System.Net.HttpStatusCode.InternalServerError || response.StatusCode == System.Net.HttpStatusCode.BadRequest) { throw new HttpRequestException(); } return response; }); } private HttpRequestMessage CreateHttpRequestMessage<T>(HttpMethod method,string url,T item) { var requestMessage = new HttpRequestMessage(method, url); requestMessage.Content = new StringContent(JsonConvert.SerializeObject(item), System.Text.Encoding.UTF8, "application/json"); return requestMessage; } private HttpRequestMessage CreateHttpRequestMessage<T>(HttpMethod method, string url, Dictionary<string, string> form) { var requestMessage = new HttpRequestMessage(method, url); requestMessage.Content = new FormUrlEncodedContent(form); return requestMessage; } private async Task<T> HttpInvoker<T>(string origin,Func<Task<T>> action) { var normalizedOrigin = NormalizeOrigin(origin); if(!_policyWraps.TryGetValue(normalizedOrigin,out PolicyWrap policyWrap)) { policyWrap=Policy.WrapAsync(_policyCreator(normalizedOrigin).ToArray()); _policyWraps.TryAdd(normalizedOrigin, policyWrap); } return await policyWrap.ExecuteAsync(action, new Context(normalizedOrigin)); } private static string NormalizeOrigin(string origin) { return origin?.Trim()?.ToLower(); } private static string GetOriginFromUri(string uri) { var url = new Uri(uri); var origin = $"{url.Scheme}://{url.DnsSafeHost}:{url.Port}"; return origin; } private void SetAuthorizationHeader(HttpRequestMessage requestMessage) { var authorizationHeader = _httpContextAccessor.HttpContext.Request.Headers["Authorization"]; if(!string.IsNullOrEmpty(authorizationHeader)) { requestMessage.Headers.Add("Authorization", new List<string>() { authorizationHeader }); } } }
代码解析:
在这个类的HttpInvoker()方法中我们创建了一个policyWrap对象,通过这个对象来执行我的http请求,这里执行的http请求实际上还是使用的我们System.Net.Http. HttpClient对象中的SendAsync()方法,见代码:var response = await _httpClient.SendAsync(requestMessage);可以理解为我们只是加入了我们的polly机制,将请求包装了一下。
随后对两将请求异常进行了处理:InternalServerError、BadRequest,并抛出HttpRequestException异常。
在创建policyWrap对象使用了如下的代码policyWrap=Policy.WrapAsync(_policyCreator(normalizedOrigin).ToArray());
WrapAsync方法中实际上是要传入一个Policy的数组。这个policy数组是在调用的调用的时候创建的,请看类ResilienceClientFactory .cs
d. 在使用polly的项目中,也就是Zhengwei.Identity的项目中来新建类ResilienceClientFactory.cs,我将他放在了新建的code文件夹中。当然在这之前我们要将NuGet包Polly引入进来,ResilienceClientFactory.cs代码如下:
public class ResilienceClientFactory { private ILogger<ResilienceHttpClient> _logger; private IHttpContextAccessor _httpContextAccessor; private int _retryCount; private int _exceptionCountBreaking; public ResilienceClientFactory(int exceptionCountBreaking,int retryCount,ILogger<ResilienceHttpClient> logger, IHttpContextAccessor httpContextAccessor) { _exceptionCountBreaking = exceptionCountBreaking; _retryCount = retryCount; _logger = logger; _httpContextAccessor = httpContextAccessor; } public ResilienceHttpClient GetResilienceHttpClient() => new ResilienceHttpClient(origin =>CreatePolicy(origin), _logger,_httpContextAccessor); private Policy[] CreatePolicy(string origin) { return new Policy[] { Policy.Handle<HttpRequestException>() .WaitAndRetryAsync (_retryCount, retryAttempt=>TimeSpan.FromSeconds(Math.Pow(2,retryAttempt)), (exception, timeSpan, retryCount, context) => { var msg = $"第{retryCount} 次重试"+ $"of{context.PolicyKey} " +$"at {context.ExecutionKey}, " +$"due to: {exception}"; _logger.LogWarning(msg); _logger.LogDebug(msg); }), Policy.Handle<HttpRequestException>().CircuitBreakerAsync( _exceptionCountBreaking, TimeSpan.FromMinutes(1), (excption, duration) => { _logger.LogTrace("熔断器打开"); },()=>{ _logger.LogTrace("熔断器关闭"); }) }; } }
代码解析:
在这个类中我们看到了Policy对象数组的创建,一个对象其实就是一种策略,我们定义了两种策略,一是出错重试(WaitAndRetryAsync);二是超时熔断(CircuitBreakerAsync).在项目启动时我们会调用GetResilienceHttpClient()方法,也就是new ResilienceHttpClient(origin =>CreatePolicy(origin), _logger,_httpContextAccessor)段代码,我们将policy对象数组传入到ResilienceHttpClient对象中,在这个对象中又创建了policyWrap对象,在调用policyWrap对象的ExecuteAsync()方法。方法中需求传入我们的http请求,这样policy 所定义的几种策略就和http请求产生了关联,至于如何关联的那就是polly组件源码内可以看到的了,这里不深入解读。希望有机会给大家读下polly的源码。
E. 在项目启动时注册并初始化ResilienceClientFactory对象,并注册全局的IhttpClient,代码如下:
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.AddIdentityServer() .AddExtensionGrantValidator<SmsAuthCodeValidator>() .AddDeveloperSigningCredential() .AddInMemoryClients(Config.GetClients()) .AddInMemoryIdentityResources(Config.GetIdentityResources()) .AddInMemoryApiResources(Config.GetApiResources()); services.Configure<ServiceDisvoveryOptions>(Configuration.GetSection("ServiceDiscovery")); services.AddSingleton<IDnsQuery>(p => { var s = p.GetRequiredService<IOptions<ServiceDisvoveryOptions>>().Value; return new LookupClient(s.Consul.DnsEndpoint.ToIpEndPoint()); }); //注册全局单例ResilienceClientFactory services.AddSingleton(typeof(ResilienceClientFactory), p => { var logger = p.GetRequiredService<ILogger<ResilienceHttpClient>>(); var httpcontextAccesser = p.GetRequiredService<IHttpContextAccessor>(); var retryCount = 5; var exceptionCountAlloweBeforeBreaking = 5; return new ResilienceClientFactory(exceptionCountAlloweBeforeBreaking,retryCount,logger, httpcontextAccesser); }); //services.AddSingleton(new HttpClient()); //注册全局的IHttpClient services.AddSingleton<IHttpClient>(p=> { return p.GetRequiredService<ResilienceClientFactory>().GetResilienceHttpClient(); } ); services.AddScoped<IAuthCodeService, AuthCodeService>() .AddScoped<IUserService, UserService>(); services.AddMvc(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseIdentityServer(); app.UseMvc(); } }
F.在项目中进行使用
在之前的Zhengwei.Identity项目中的UserService.cs类中CheckOrCreate方法中我们在进行http请求是使用的是System.Net.Http. HttpClient对象。现在将其注掉,用使我用定义的IhttpClient对象,并调用其中的方法PostAsync.代码如下:
public class UserService : IUserService { private ILogger<ResilienceHttpClient> _logger; //private string _userServiceUrl = "http://localhost:33545"; private string _userServiceUrl; //private HttpClient _httpClient; private IHttpClient _httpClient; public UserService(IHttpClient httpClient ,IOptions<Dtos.ServiceDisvoveryOptions> serOp ,IDnsQuery dnsQuery , ILogger<ResilienceHttpClient> logger) { _logger = logger; _httpClient = httpClient; var address = dnsQuery.ResolveService("service.consul",serOp.Value.ServiceName); var addressList = address.First().AddressList; var host = addressList.Any() ? addressList.First().ToString() : address.First().HostName; var port = address.First().Port; _userServiceUrl = $"http://{host}:{port}"; } public async Task<int> CheckOrCreate(string phone) { var from = new Dictionary<string, string> { { "phone", phone } }; // var content = new FormUrlEncodedContent(from); try { var response = await _httpClient.PostAsync(_userServiceUrl + "/api/users/check-or-create", from, null); if (response.StatusCode == System.Net.HttpStatusCode.OK) { var userId = await response.Content.ReadAsStringAsync(); int.TryParse(userId, out int intuserId); return intuserId; } } catch (Exception ex) { _logger.LogError("checkorcreate 重试失败" + ex.Message + ex.StackTrace); throw ex; } return 0; } }
g.代码全部完成了,我们将请求的连接故意改成错语的,然后开始测试我们的polly是否起作用了,再将打开我们的postman。请求连接http://localhost:4157/connect/token。
在VS的输出控制台会看到如下的日志信息说明我们的polly出错重试策略起了作用。