(给DotNet加星标,提升.Net技能)

转自:码农阿宇

CAP是什么?

CAP是一套分布式事务的解决方案,是.NET Core Community中的第一个千星项目(目前已经1656 Star),具有轻量级、易使用、高性能等特点。

本文主要针对易用性这一点,展开叙述,一起看看CAP如何结合EF Core和RabbitMQ带领小白轻松走入分布式消息队列的世界。

首先,你需要搭建一套RabbitMQ系统,搭建过程在此不再叙述,如果大家觉得麻烦,可以用我搭好的。

HostName: coderayu.cn  UserName:guest Password:guest  (仅仅可用作实验,数据丢失不负责)

创建ASP.NET Core 项目,并引入Nuget包

你可以运行以下下命令在你的项目中安装 CAP。

PM> Install-Package DotNetCore.CAP

如果你的消息队列使用的是 Kafka 的话,你可以:

PM> Install-Package DotNetCore.CAP.Kafka

如果你的消息队列使用的是 RabbitMQ 的话,你可以:

PM> Install-Package DotNetCore.CAP.RabbitMQ

CAP 提供了 Sql Server, MySql, PostgreSQL 的扩展作为数据库存储:

// 按需选择安装你正在使用的数据库

PM> Install-Package DotNetCore.CAP.SqlServer

PM> Install-Package DotNetCore.CAP.MySql

PM> Install-Package DotNetCore.CAP.PostgreSql

创建DbContext

因为我采用的是EF Core,所以首先要创建一个DbContext上下文,代码如下:

public class CapDbContext:DbContext{    public CapDbContext(DbContextOptions options) : base(options)    {    }}

Startup配置

首先需要在ConfigureServices函数中进行相关服务的注入,对应的操作和功能解释如下:

public void ConfigureServices(IServiceCollection services){    //注入DbContext上下文,如果用的是Mysql可能还需要添加Pomelo.EntityFrameworkCore.MySql这个Nuget包    services.AddDbContext<CapDbContext>(options =>    options.UseMySql("Server=127.0.0.1;Database=testcap;UserId=root;Password=;"));    //配置CAP    services.AddCap(x =>    {        x.UseEntityFramework<CapDbContext>();        //启用操作面板        x.UseDashboard();        //使用RabbitMQ        x.UseRabbitMQ(rb =>        {            //rabbitmq服务器地址            rb.HostName = "coderayu.cn";            rb.UserName = "guest";            rb.Password = "guest";            //指定Topic exchange名称,不指定的话会用默认的            rb.ExchangeName = "cap.text.exchange";        });        //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。        x.SucceedMessageExpiredAfter = 24 * 3600;        //设置失败重试次数        x.FailedRetryCount = 5;    });services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);}

最后还要再Congiure中启用CAP中间件

public void Configure(IApplicationBuilder app, IHostingEnvironment env){    if (env.IsDevelopment())    {        app.UseDeveloperExceptionPage();    }    //启用cap中间件    app.UseCap();    app.UseMvc();}

利用EF Core生成CAP数据库

再程序包管理控制台中依此输入以下命令行

PM> Add-Migration Init

PM> update-database

如果成成功执行,那么打开数据库,就可以看到用来存储CAP发送和接收数据的表格了。

表格中每列的含义如下:

消息的发送和订阅

我们直接在ValuesController的基础上进行改造。

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发送

private readonly ICapPublisher _publisher;public ValuesController(ICapPublisher publisher){    _publisher = publisher;}

发送消息

[HttpGet]public string Get(string message){    //"cap.test.queue"是发送的消息RouteKey,可以理解为消息管道的名称    _publisher.Publish("cap.test.queue",message);    return "发送成功";}

订阅消息

//"cap.test.queue"为发送消息时的RauteKey,也可以模糊匹配//详情-four-dotnet.html[CapSubscribe("cap.test.queue")]public void HandleMessage(string message){    Console.Write(DateTime.Now.ToString()+"收到消息:"+message);}

启动程序后,首先看到CAP启动成功

紧随其后,消费者也就是我们的订阅方法在RabbitMQ服务器上注册成功。

发送消息,发送成功,如下

发送后,立即在控制台看到了订阅方法输出的结果。

消息的失败重试

在订阅方法中,如果抛出异常,那么CAP就会认为该条消息处理失败,会自动进行重试,重试次数在前方已经进行了配置。

我们把订阅方法做一个改动,打印接收的信息到控制台中,并抛出异常

//"cap.test.queue"为发送消息时的RauteKey,也可以模糊匹配//详情-four-dotnet.html[CapSubscribe("cap.test.queue")]public void HandleMessage(string message){    Console.WriteLine(DateTime.Now.ToString()+"收到消息:"+message);    throw new Exception("测试失败重试");}

可以看到,立即进行了三次重试

可是在前面,我们设置的失败重试次数是5次,为什么这里只重试三次吗?是不是要叫晓东过来改BUG了呢转动眼睛?当然不是。

观察发现,CAP重试的前三次是立即进行的,而后面的重试,是每隔一段时间进行的,当在分布式通讯的过程中,可能出现了问题确实不会立即修复解决,可能过了一定时间,系统就自动恢复了,如网络抖动。

CAP仪表盘

发送成功了五条消息,成功接收处理了三条,两条处理失败,处理失败的任务,我们可以直接在面板中进行重新消费,可谓非常方便。

同时,处理失败的消息,点击消息的编号后,可以查看到消息的内容和异常原因。

CAP如此强大,让消息队列这种高大上产品操作So Easy,学会了CAP,也可以吹牛说,我也懂分布式任务处理啦大声笑。

感谢晓东开发出如此强大的项目,同时感谢.NET Core Community。

参考 CAP Github wiki:

Demo代码:

推荐阅读

看完本文有收获?请转发分享给更多人

关注「DotNet」加星标,提升.Net技能

喜欢就点一下「好看」呗~