.net 微服务实践
l? 前言
本文記錄了我的一次.net core 微服務架構實踐經驗,以及所用到的技術
l? 優點
每個服務聚焦于一塊業務,無論在開發階段或是部署階段都是獨立的,更適合被各個小團隊開發維護,團隊對服務的整個生命周期負責,工作在獨立的上下文之中。
如果某一項服務的性能達到瓶頸,我們只需要增加該服務負載節點,能夠針對系統的瓶頸服務更有效的使用資源。
服務A可以使用.net實現 ,服務B可以使用java實現,技術選型靈活,系統不會長期限制在某個技術棧上。
松耦合、高內聚,代碼容易理解,開發效率高,更好維護。
高可用,每個服務可以啟動多個實例負載,單個實例掛了有足夠的響應時間來修復
l? 缺點
系統規模龐大,運維要求高,需要devops技巧(Jenkins,Kubernetes等等)
跨服務需求需要團隊之間的協作
跨服務的調用(http/rpc)增加了系統的延遲
l? Docker
docker是目前普遍使用的容器化技術,在此架構中我們的應用程序將部署在docker容器里面,通過docker發布應用 需要先編寫一個dockerfile,如下
#引入鏡像 .net core 3.1 FROM mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim AS base #設定工作目錄 WORKDIR /app #在容器中程序使用的端口,一定要和程序啟動使用的端口對應上 EXPOSE 80 #復制文件到工作目錄 COPY . . #環境變量 此變量會覆蓋appsetting.json 內的同名變量 ENV Ip "" ENV Port "" #啟動程序 ENTRYPOINT ["dotnet", "Union.UserCenter.dll"]?docker build 命令 將我們的發布目錄打包一個docker鏡像,例如? ? docker build -t test .? ? ,test是鏡像名稱
docker run 命令啟動我們打包的鏡像,例如?docker run -d -p 5002:80 --name="test1" -e Ip="192.168.0.164" -e Port="5002"? test ,-e 表示傳遞環境變量
更多docker命令 請查閱:https://www.runoob.com/docker/docker-command-manual.html
docker官網:https://www.docker.com
部署方便:只需要一個簡單的 docker run命令,就可以啟動一個應用實例了
部署安全:打包鏡像的時候已經打包了應用所需環境,運行環境不會出現任何問題
隔離性好:同一臺機器我可以部署java的應用和.net的應用,互不影響
快速回滾:只要鏡像存在可以快速回滾到任一版本
成本低:一臺機器可以運行很多實例,很容易就可以實現高可用和橫向擴展
?
?
? 經測試docker for windows不適合部署生產環境,還是得在liunx系統上跑, .net framework 無法在docker上部署
Docker compose :Docker官方提供的管理工具,可以簡單的配置一組容器啟動參數、啟動順序、依賴關系
Kubernetes :容器數量很多之后會變得難以管理,可以引入Kubernetes對容器進行自動管理,熟練運用有一定難度,中文社區:https://www.kubernetes.org.cn/k8s
l? RPC 遠程過程調用
為什么要有RPC
按照微服務設計思想,服務A只專注于服務A的業務,但是需求上肯定會有服務A需要調用服務B來完成一個業務處理的情況,使用http調用其他服務效率相對較低,所以引入了RPC。
gRPC vs thrift ?評測:https://www.cnblogs.com/softidea/p/7232035.html
這里使用thrift,thrift 官網:http://thrift.apache.org
Thrift 采用IDL(Interface Definition Language)來定義通用的服務接口,然后通過Thrift提供的編譯器,可以將服務接口編譯成不同語言編寫的代碼,通過這個方式來實現跨語言的功能,語法請自行百度
下載thrift 代碼生成器??http://thrift.apache.org/download?,thrift-0.13.0.exe 這個文件
執行命令 thrift.exe --gen netcore xxxxxxx.thrift ,生成C# 服務接口代碼
?
引用官方提供的.net 庫,可以去官網下載,找不到的可以直接 nuget引用?Examda.Thrift,這是我為了方便使用上傳的
添加生成的代碼到我們的服務端里,然后自己實現 thrift文件定義的接口
using System.Threading; using System.Threading.Tasks; using Union.UnionInfo.Service.Interface; using static Examda.Contract.UnionInfo.UnionInfoService;namespace Union.UnionInfo.Service {public class UnionInfoServiceImpl : IAsync{private readonly ILmMembersInfoService _lmMembersInfoService;public UnionInfoServiceImpl(ILmMembersInfoService lmMembersInfoService){_lmMembersInfoService = lmMembersInfoService;}//實現接口public async Task<string> GetUnionIdAsync(string DozDomain, CancellationToken cancellationToken){return (await _lmMembersInfoService.GetMembersInfoByDozDomain(DozDomain)).UnionId;}} }添加一個類繼承 IHostedService?
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; using Thrift; using Thrift.Protocols; using Thrift.Server; using Thrift.Transports; using Thrift.Transports.Server;namespace Examda.Core.Rpc {public class RpcServiceHost : IHostedService{public IConfiguration Configuration { get; }public ITAsyncProcessor Processor { get; }public ILoggerFactory LoggerFactory { get; }public RpcServiceHost(IConfiguration configuration, ITAsyncProcessor processor,ILoggerFactory loggerFactory){Configuration = configuration;Processor = processor;LoggerFactory = loggerFactory;}//public virtual Task StartAsync(CancellationToken cancellationToken){TServerTransport serverTransport = new TServerSocketTransport(Configuration.GetValue<int>("RpcPort"));TBinaryProtocol.Factory factory1 = new TBinaryProtocol.Factory();TBinaryProtocol.Factory factory2 = new TBinaryProtocol.Factory();//UnionInfoService.AsyncProcessor processor = new AsyncProcessor(new UnionInfoServiceImpl());實現的服務這里采用.net core 自帶 DI注入,也可以直接實例化TBaseServer server = new AsyncBaseServer(Processor, serverTransport, factory1, factory2, LoggerFactory);return server.ServeAsync(cancellationToken);}public virtual Task StopAsync(CancellationToken cancellationToken){return Task.CompletedTask;}} }修改ConfigureServices添加如下代碼
//注入rpc服務實現實例services.AddSingleton<ITAsyncProcessor>(provider =>{var lmMembersInfoService = provider.GetService<ILmMembersInfoService>();return new AsyncProcessor(new UnionInfoServiceImpl(lmMembersInfoService));});//監聽rpc端口services.AddHostedService<RpcServiceHost>();服務端就完成了,接下來編寫客戶端調用,修改客戶端ConfigureServices添加如下代碼
//test rpc服務services.AddScoped(provider =>{var examdaConsul = provider.GetService<ExamdaConsul>();Address address = examdaConsul.GetAddress("UnionInfo");//獲取服務地址,這里我封裝了,測試可以先直接寫死var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port);var tProtocol = new TBinaryProtocol(tClientTransport);return new UnionInfoService.Client(tProtocol);}); 控制器內調用示例using System.Threading; using System.Threading.Tasks; using Examda.Contract.UnionInfo; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging;namespace RPCCLIENT.Controllers {[ApiController][Route("[controller]")]public class WeatherForecastController : ControllerBase{private static readonly string[] Summaries = new[]{"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"};private readonly UnionInfoService.Client _rpcClient;private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, UnionInfoService.Client rpcClient){_logger = logger;_rpcClient = rpcClient;}[HttpGet]public async Task<IActionResult> Get(){await _rpcClient.OpenTransportAsync(CancellationToken.None);var order = await _rpcClient.GetUnionIdAsync("wx.hdgk.cn", CancellationToken.None);//rpc調用return Ok(order);}} }l? 服務注冊與發現
為什么要有服務注冊與發現
例如:服務A一開始只有一個實例,此時又啟動了一個服務A的實例,但是調用服務A的服務B并不知道 服務A多了一個實例(或者少了),此時引入服務注冊與發現可以讓服務B得知服務A的變更情況,服務B就知道自己要調用的服務IP:端口 是多少,不需要人工干預
常見的注冊中心
?
?
?
這里使用consul
健康檢查:consul自帶健康檢查,檢查服務是否可用,不可用的服務將從注冊中心剔除,自帶的就是隔一段時間檢測一下端口通不通,并且支持自行擴展健康檢查,可用自己在服務內實現是否健康的邏輯,比如雖然接口是通的,但是我發現自己宿主機cpu過80%了,就返回不健康的狀態
服務注冊:nuget安裝consul,寫一個擴展方法
/// <summary>/// 如果服務同時包含http,rpc調用此方法/// </summary>/// <param name="services"></param>/// <param name="Configuration"></param>/// <param name="ServiceName"></param>/// <param name="Remark"></param>public static void AddExamdaServiceRpc(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark){var Ip = Configuration.GetValue<string>("Ip");var RpcPort = Configuration.GetValue<int>("RpcPort");var RpcAddress = $"{Ip}:{RpcPort}";var consulClient = new ConsulClient(x => x.Address = new Uri(Configuration.GetValue<string>("ConsulUrl")));//請求注冊的 Consul 地址var httpCheck = new AgentServiceCheck(){DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服務啟動多久后注冊Interval = TimeSpan.FromSeconds(20),//健康檢查時間間隔,或者稱為心跳間隔Timeout = TimeSpan.FromSeconds(5),TCP = RpcAddress};var registration = new AgentServiceRegistration(){Checks = new[] { httpCheck },ID = RpcAddress,Name = ServiceName,Address = Ip,Port = RpcPort,Tags = new[] { Remark }};consulClient.Agent.ServiceRegister(registration).Wait();//應用程序退出時AppDomain.CurrentDomain.ProcessExit += (sender, e) =>{consulClient.Agent.ServiceDeregister(registration.ID).Wait();//consul取消注冊服務};}修改ConfigureServices添加如下代碼,啟動
services.AddExamdaServiceRpc(Configuration, "UnionInfo", "聯盟機構信息服務");
?
?
? 安裝consul請自行百度
?服務發現與變更:調用方配置好自己需要調用的服務名稱集合,然后去consul獲取地址列表,然后根據需要調用的服務數量啟動N個線程來輪詢服務最新的地址信息,不用擔心輪詢造成的消耗過大,因為consul提供了Blocking Queries 阻塞查詢的方式,請求發送到consul之后會在consul阻塞(30)秒,期間有變更或者到達30秒了之后才會返回地址列表,然后每一次變更之后的地址列表都會有一個新的版本號。
using Consul; using Microsoft.Extensions.Configuration; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks;namespace Examda.Core.Consul {public class Address{public string Ip { get; set; }public int Port { get; set; }}/// <summary>/// 未實現服務負載均衡,這里隨機選一個/// </summary>public class ExamdaConsul{private object locker = new object();private readonly ConsulClient _consulClient;private IDictionary<string, List<Address>> RpcServices { get; set; }public ExamdaConsul(IConfiguration configuration){RpcServices = new Dictionary<string, List<Address>>();_consulClient = new ConsulClient(c =>{c.Address = new Uri(configuration.GetValue<string>("ConsulUrl"));});foreach (var item in configuration.GetSection("RpcServiceClient").GetChildren().Select(x => x.Value).ToList())//遍歷所需要調用的服務名稱集合{RpcServices.Add(item, null);var res = _consulClient.Catalog.Service(item).Result;RpcServices[item] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList();Task.Factory.StartNew(() =>{var queryOptions = new QueryOptions { WaitTime = TimeSpan.FromSeconds(30) };//阻塞時間queryOptions.WaitIndex = res.LastIndex;while (true){GetAgentServices(queryOptions, item);}});}}private void GetAgentServices(QueryOptions queryOptions, string serviceName){var res = _consulClient.Catalog.Service(serviceName, null, queryOptions).Result;if (queryOptions.WaitIndex != res.LastIndex){lock (locker){queryOptions.WaitIndex = res.LastIndex;var currentServices = RpcServices[serviceName];RpcServices[serviceName] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList();}}}/// <summary>/// 獲取服務可用地址/// </summary>/// <param name="serviceName"></param>/// <returns></returns>public Address GetAddress(string serviceName){for (int i = 0; i < 3; i++){Random r = new Random();int index = r.Next(RpcServices.Count);try{return RpcServices[serviceName][index];}catch{Thread.Sleep(10);continue;}}return null;}} } 然后注入一個ExamdaConsul類的單例,講寫死的服務地址改成從consul獲取 //注入consul客戶端 單例services.AddSingleton<ExamdaConsul>();//注入UnionInfo rpc客戶端 線程單例services.AddScoped(provider =>{var examdaConsul = provider.GetService<ExamdaConsul>();Address address = examdaConsul.GetAddress("UnionInfo");//從consul獲取服務地址var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port);var tProtocol = new TBinaryProtocol(tClientTransport);return new UnionInfoService.Client(tProtocol);});consul 官網:https://www.consul.io
l? API網關
所有的請求都先經過網關,由轉發到對應的服務,對比了 ocelot 和 Bumblebee 兩個c#寫的網關。選擇使用了Bumblebee。
Ocelot性能比較低,吞吐比直接訪問降低四倍,但是文檔很全面,功能集成很多,不需要自己擴展什么。
Bumblebee?我做測試發現Bumblebee 性能很優秀,尷尬的是這個幾乎沒什么人用,很多功能需要自己擴展,作者官網http://beetlex.io/?Bumblebee 文檔:http://doc.beetlex.io/#29322e3796694434894fc2e6e8747626
這里使用Bumblebee ,使用方法可以看作者的文檔
健康檢查:不健康的節點將不會被轉發請求
限流:例如限制某個節點最多300rps,如果此節點并發了1000個請求,大概會有700個左右請求網關會直接返回錯誤,不會轉發到具體的服務,可以起到擋洪作用,避免節點直接掛了。
路由:我是這么設置的 例如 http://192.168.0.164/Course/Tool/GetUserInfo ,Course一級是服務名稱 tool 是服務的控制器名稱 getuserinfo是方法名稱
負載均衡:服務多個節點負載,網關可以設置負載均衡策略
?
注冊到網關:redis發布訂閱實現,添加一個擴展方法
public static void AddExamdaService(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark){var Ip = Configuration.GetValue<string>("Ip");var Port = Configuration.GetValue<int>("Port");var Address = $"http://{Ip}:{Port}";services.AddSingleton(new Redis(Configuration.GetValue<string>("Redis")));ServiceProvider serviceProvider = services.BuildServiceProvider();Redis redis = serviceProvider.GetService<Redis>();redis.Publish("ApiGetewap", JsonConvert.SerializeObject(new { Address, ServiceName, Remark }));AppDomain.CurrentDomain.ProcessExit += (sender, e) =>{redis.Publish("ApiGetewapExit", JsonConvert.SerializeObject(new { Address, ServiceName, Remark }));};}網關訂閱這個頻道
g = new OverrideApiGetewap();g.HttpOptions(o =>{o.Port = 80;o.LogToConsole = true;o.LogLevel = BeetleX.EventArgs.LogType.Error;});g.Open();var sub = Program.redis.GetSubscriber();//注冊服務sub.Subscribe("ApiGetewap",(chanel,message)=> {var service = JsonConvert.DeserializeObject<Service>(message);var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark);route.AddServer(service.Address, 0);});//服務退出sub.Subscribe("ApiGetewapExit", (chanel, message) => {var service = JsonConvert.DeserializeObject<Service>(message);var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark);route.RemoveServer(service.Address);});修改ConfigureServices添加如下代碼,啟動。這樣網關也能動態的發現我們的服務了
//注冊此服務到網關services.AddExamdaService(Configuration, "Course", "聯盟我的課程服務");
異常流量拉黑:例如某個ip 10s內請求數量超過300 將他拉黑 30 分鐘,這里使用redis實現計數器
自己寫的簡陋版本
//請求完成觸發的事件,不會阻塞請求g.RequestIncrement += (sender, e) =>{Task.Factory.StartNew(() =>{var db = Program.redis.GetDatabase();var counter = db.KeyExists(e.Request.RemoteIPAddress);//判斷該ip是否存在計數器if (counter){var count = db.StringIncrement(e.Request.RemoteIPAddress);//計數器加1if (count > 300){db.StringSet("BlackList_" + e.Request.RemoteIPAddress, "", new TimeSpan(0, 1, 0), flags: StackExchange.Redis.CommandFlags.FireAndForget);//拉黑半個小時,不等待返回值}}else{db.StringIncrement(e.Request.RemoteIPAddress, flags: StackExchange.Redis.CommandFlags.FireAndForget);//創建計數器db.KeyExpire(e.Request.RemoteIPAddress, new TimeSpan(0, 0, 10), flags: StackExchange.Redis.CommandFlags.FireAndForget);//設置10s過期}});}; class OverrideApiGetewap : Bumblebee.Gateway{//請求管道的第一個事件protected override void OnHttpRequest(object sender, EventHttpRequestArgs e){if (!e.Request.Path.Contains("/__system/bumblebee") && e.Request.Path != "/")//排除掉訪問網關ui的{var db = Program.redis.GetDatabase();var isBlack = db.KeyExists("BlackList_" + e.Request.RemoteIPAddress);if (isBlack){e.Response.Result(new JsonResult("你被拉黑了"));e.Cancel = true;//取消請求}else{base.OnHttpRequest(sender, e);}//base.OnHttpRequest(sender, e);}else{base.OnHttpRequest(sender, e);}}}熔斷器:當某個請求轉發下游服務返回錯誤次數或者超時次數達到閥值時自動熔斷該節點,暫未實現
接口驗簽:客戶端請求都帶上用 url時間戳 參數加密的簽名,網關進行驗證,確保是合法的客戶端
網關自帶UI
l? 鏈路追蹤 性能監控
Skywalking 官網:http://skywalking.apache.org/?
每個請求的鏈路,每一個步驟的耗時都可以查到,如下圖的一個請求執行了很多次sql,每個步驟的sql語句都可以看到,集成很簡單,使用官方提供的.net探針集成到各個服務就好了,無代碼入侵。
?
?
?
?
有一個很強大的ui界面,也可以提供報警等功能,ui可以查看到響應很慢的接口,平均響應時間,以及每個服務的關聯關系,但是有個問題我沒有解決,RPC鏈路追蹤不到。
? ? ?可以自行去官方查閱使用文檔
l? 分布式日志收集框架
實例太多了,不可能使用單機日志,需要一個分布式日志收集框架把所有日志收集到一起,可以考慮使用java的elk 或者 .net core 的Exceptionless
l? 分布式事務
跨服務之間調用并且涉及到事務的處理方式,還在想怎么弄
l? 配置中心
各個實例逐個配置太麻煩了,特別是如果更改了數據庫地址,每一個服務的所有實例都要改,改死去,并且重啟實例也不現實,一定要支持配置熱更新,試了下攜程的Apollo有點消耗資源
l? CI/CD
? 將源碼管理做一個開發分支,一個測試分支,一個發布分支,開發只動開發分支,開發完成后提交代碼,由測試合并到測試分支,并通知Jenkins生成鏡像并發布到測試站點,測試通過之后由運維合并到發布分支,或手動或自動通過Jenkins發布,應該保證 測試分支與發布分支的版本能對應docker鏡像倉庫的每一個版本,個人見解。
l ?例:XXXX服務的項目源碼結構
?記錄與分享自己的一次微服務實踐,以上均為個人見解,不對的地方或者好的建議歡迎來信 289501868@qq.com
總結
以上是生活随笔為你收集整理的.net 微服务实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ASP.NET Core Web API
- 下一篇: 全局思维