分布式事务之消息补偿解决方案

小说:经典原创禅诗:【禅.十字回文诗】作者:陵纯更新时间:2019-03-21字数:35028

刘皓也在伟大航道纵横了一段时间,去的地方见识的也不少,以罗杰那种身体素质的话就算是艾滋病也没办法影响他,因为这种程度的病毒以他的身体素质,以他的抵抗力绝对会将这些病毒压迫住最后排除体外。

罗李华谈:属猪的人2016年运程

这可是生平第一次撞见名头响当当的红人,可得留个纪念,邓亮急忙掏出了手机:“合个影好不好,我是你们的粉丝啊。”
田开疆将手搭在仇天恨肩上,细声却口气坚定地说:「我会设法救你的!」

刘皓双手交叉合十直接夹住了这一道拳斧,不过斧头擅劈,卡普这一招更是势大力沉,刘皓双手夹住这一击之后手臂都出现了伤痕,不过却没有鲜血溢出,显然卡普这一击就算刘皓双手有锋利坚硬无比的庚金之气保护也被他强悍的力量给轰破了。

一、数据库本地事务

先看看数据库事务的定义:单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行

这个比较容易理解,操作过数据库的一般都懂,既是业务需求涉及到多个数据表操作的时候,需要用到事务

要么一起更新,要么一起不更新,不会出现只更新了部分数据表的情况,下边看看数据库事务的使用

1 begin tran
2     begin try 
3         update Table1 set Field = 1 where ID = 1
4         update Table2 set Field = 2 where ID = 1
5     end try
6     begin catch
7         rollback tran
8     end catch
9 commit tran

上实例在小型项目中一般是问题不大的,因为小型项目一般是单机系统,数据库、Web服务大都在一台服务器上,甚至可能只有一个数据库文件,

这种情况下使用本地事务没有一点问题;

但是本地事务有很大的缺陷,因为开启事务一般是锁表的,事务执行期间会一直锁着,其他的操作一般都要排队等待,对性能要求比较高的系统是不能忍受的。

特别是涉及改动不同数据库的操作,这会造成跨库事务,性能更加低

如果还涉及到不在同一台服务器、甚至不同网段部署的数据库,那本地事务简直是系统运行的灾难,是首先需要丢弃的解决方案。

 

那如果遇到上述情况,该怎么做呢,这就涉及到分布式事务了

 

二、分段式事务的补偿机制

如果有海量数据需要处理、或者要求高并发请求的话,同步的事务机制已经是不现实的了,这种情况下必须采用异步事务机制,既分段式的事务

分段式事务一般做法就是把需求任务分段式地完成,通过事务补偿机制来保证业务最终执行成功,补偿机制一般可以归类为2种:

1 )定时任务补偿:

  通过定时任务去跟进后续任务,根据不同的状态表确定下一步的操作,从而保证业务最终执行成功,

  这种办法可能会涉及到很多的后台服务,维护起来也会比较麻烦,这是应该是早期比较流行的做法

2) 消息补偿:

  通过消息中间件触发下一段任务,既通过实时消息通知下一段任务开始执行,执行完毕后的消息回发通知来保证业务最终完成;

  当然这也是异步进行的,但是能保证数据最终的完整性、一致性,也是近几年比较热门的做法

 

定时任务补偿就不说了,这篇文章我们来讨论一下通过消息补偿来完成分布式事务的一般做法

 

三、分布式事务之消息补偿

0)我们以简单的产品下单场景来说明,(不要较真哈)

1)先来看看分布式异步事务处理流程示意图,APP1与APP2需要互相订阅对方消息

2)首先看数据库,2个,一个库存库,一个已下单成功的库

 1 -- 下单通知,主要作用保留已下单操作,消息发送失败可以根据此表重新发送
 2 CREATE TABLE [dbo].[ProductMessage](
 3     [ID] [int] IDENTITY(1,1) NOT NULL,
 4     [Product] [varchar](50) NULL,
 5     [Amount] [int] NULL,
 6     [UpdateTime] [datetime] NULL
 7 ) 
 8 -- 库存
 9 CREATE TABLE [dbo].[ProductStock](
10     [ID] [int] IDENTITY(1,1) NOT NULL,
11     [Product] [varchar](50) NULL,
12     [Amount] [int] NULL
13 )
14 -- 下单成功
15 CREATE TABLE [dbo].[ProductSell](
16     [ID] [int] IDENTITY(1,1) NOT NULL,
17     [Product] [varchar](50) NULL,
18     [Customer] [int] NULL,
19     [Amount] [int] NULL
20 )
21 -- 下单成功消息,主要作用防止重复消费
22 CREATE TABLE [dbo].[ProductMessageApply](
23     [ID] [int] IDENTITY(1,1) NOT NULL,
24     [MesageID] [int] NULL,
25     [CreateTime] [datetime] NULL
26 )

3)项目架构Demo

数据底层访问使用的是Dapper、使用redis作为消息中间件

4)实体层代码

 1     public class ProductMessage
 2     {
 3         [Key]
 4         [IgnoreProperty(true)]
 5         public int ID { get; set; }
 6         public string Product { get; set; }
 7         public int Amount { get; set; }
 8         public DateTime UpdateTime { get; set; }
 9     }
10     public class ProductMessageApply
11     {
12         [Key]
13         [IgnoreProperty(true)]
14         public int ID { get; set; }
15         public int MesageID { get; set; }
16         public DateTime CreateTime { get; set; }
17     }
18     public class ProductSell
19     {
20         [Key]
21         [IgnoreProperty(true)]
22         public int ID { get; set; }
23         public string Product { get; set; }
24         public int Customer { get; set; }
25         public int Amount { get; set; }
26     }
27     public class ProductStock
28     {
29         [Key]
30         [IgnoreProperty(true)]
31         public int ID { get; set; }
32         public string Product { get; set; }
33         public int Amount { get; set; }
34     }

5)服务接口层代码

 1     public interface IProductMessageApplyService
 2     {
 3         void Add(ProductMessageApply entity);
 4         ProductMessageApply Get(int id);
 5     }
 6     public interface IProductMessageService
 7     {
 8         void Add(ProductMessage entity);
 9         IEnumerable<ProductMessage> Gets(object paramPairs = null);
10         void Delete(int id);
11     }
12     public interface IProductSellService
13     {
14         void Add(ProductSell entity);
15     }
16     public interface IProductStockService
17     {
18         void ReduceReserve(int id, int amount);
19     }

6)库存、消息通知

 1     public class ProductMessageService : IProductMessageService
 2     {
 3         private IRepository<ProductMessage> repository;
 4 
 5         public ProductMessageService(IRepository<ProductMessage> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessage entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public IEnumerable<ProductMessage> Gets(object paramPairs = null)
16         {
17             return this.repository.Gets(paramPairs);
18         }
19 
20         public void Delete(int id)
21         {
22             this.repository.Delete(id);
23         }
24     }
25 
26     public class ProductStockService : IProductStockService
27     {
28         private IRepository<ProductStock> repository;
29 
30         public ProductStockService(IRepository<ProductStock> repository)
31         {
32             this.repository = repository;
33         }
34 
35         public void ReduceReserve(int id, int amount)
36         {
37             var entity = this.repository.Get(id);
38             if (entity == null) return;
39 
40             entity.Amount = entity.Amount - amount;
41             this.repository.Update(entity);
42         }
43     }

7)下单、下单成功消息

 1     public class ProductMessageApplyService : IProductMessageApplyService
 2     {
 3         private IRepository<ProductMessageApply> repository;
 4 
 5         public ProductMessageApplyService(IRepository<ProductMessageApply> repository)
 6         {
 7             this.repository = repository;
 8         }
 9 
10         public void Add(ProductMessageApply entity)
11         {
12             this.repository.Add(entity);
13         }
14 
15         public ProductMessageApply Get(int id)
16         {
17             return this.repository.Get(id);
18         }
19     }
20 
21     public class ProductSellService : IProductSellService
22     {
23         private IRepository<ProductSell> repository;
24 
25         public ProductSellService(IRepository<ProductSell> repository)
26         {
27             this.repository = repository;
28         }
29 
30         public void Add(ProductSell entity)
31         {
32             this.repository.Add(entity);
33         }
34     }

8)下单减库存测试

 1 namespace Demo.Reserve.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString()));
 8 
 9             Send();
10             Subscribe();
11            
12             Console.ReadKey();
13         }
14 
15         private static void Send()
16         {
17             var unitOfWork = new UnitOfWork(Enums.Reserve);
18 
19             try
20             {
21                 var productStockRepository = new BaseRepository<ProductStock>(unitOfWork);
22                 var productStockServic = new ProductStockService(productStockRepository);
23                 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
24                 var productMessageService = new ProductMessageService(productMessageRepository);
25 
26                 var id = 1;
27                 var amount = 2;
28                 var productMessage = new ProductMessage()
29                 {
30                     Product = "ProductCode",
31                     Amount = amount,
32                     UpdateTime = DateTime.Now
33                 };
34 
35                 productStockServic.ReduceReserve(id, amount);
36                 productMessageService.Add(productMessage);
37                 unitOfWork.Commit();
38                 Console.WriteLine(string.Format("{0} 减库存完成", DateTime.Now.ToString()));
39                 Thread.Sleep(1000);
40 
41                 var message = JsonConvert.SerializeObject(productMessage);
42                 RedisConfig.Instrace.Publish("channel.Send", message);
43                 Console.WriteLine(string.Format("{0} 发送减库存消息: {1}", DateTime.Now.ToString(), message));
44             }
45             catch (Exception ex)
46             {
47                 //Logger.Error(ex);
48                 unitOfWork.Rollback();
49             }
50         }
51 
52         private static void Subscribe()
53         {
54             var client = RedisConfig.Instrace.NewClient();
55             var subscriber = client.GetSubscriber();
56 
57             subscriber.Subscribe("channel.Success", (chl, message) =>
58             {
59                 try
60                 {
61                     var unitOfWork = new UnitOfWork(Enums.Reserve);
62                     var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
63                     var productMessageService = new ProductMessageService(productMessageRepository);
64 
65                     var messageID = message.ToString().ToInt();
66                     if (messageID > 0)
67                     {
68                         productMessageService.Delete(messageID);
69                         Console.WriteLine(string.Format("{0} 收到消费成功消息:{1}", DateTime.Now.ToString(), message));
70                     }
71                 }
72                 catch (Exception ex)
73                 {
74                     //Logger.Error(ex);
75                 }
76             });
77         }
78     }
79 }

9)下单成功及消息回发测试

 1 namespace Demo.Sell.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Subscribe();
 8 
 9             Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString()));
10             Console.ReadKey();
11         }
12 
13         private static void Subscribe()
14         {
15             var client = RedisConfig.Instrace.NewClient();
16             var subscriber = client.GetSubscriber();
17 
18             subscriber.Subscribe("channel.Send", (chl, message) =>
19             {
20                 Consume(message);
21             });
22         }
23 
24         private static void Consume(string message)
25         {
26             var unitOfWork = new UnitOfWork(Enums.Sell);
27 
28             try
29             {
30                 Console.WriteLine(string.Format("{0} 收到减库存消息: {1}", DateTime.Now.ToString(), message));
31 
32                 var productMessage = JsonConvert.DeserializeObject<ProductMessage>(message);
33 
34                 var productSellRepository = new BaseRepository<ProductSell>(unitOfWork);
35                 var productSellService = new ProductSellService(productSellRepository);
36 
37                 var productMessageApplyRepository = new BaseRepository<ProductMessageApply>(unitOfWork);
38                 var productMessageApplyService = new ProductMessageApplyService(productMessageApplyRepository);
39 
40                 var noExists = productMessageApplyService.Get(productMessage.ID) == null;
41                 if (noExists)
42                 {
43                     productSellService.Add(new ProductSell()
44                     {
45                         Product = productMessage.Product,
46                         Amount = productMessage.Amount,
47                         Customer = 123
48                     });
49 
50                     productMessageApplyService.Add(new ProductMessageApply()
51                     {
52                         MesageID = productMessage.ID,
53                         CreateTime = DateTime.Now
54                     });
55 
56                     unitOfWork.Commit();
57                     Console.WriteLine(string.Format("{0} 消息消费完成", DateTime.Now.ToString()));
58                     Thread.Sleep(1000);
59                 }
60 
61                 RedisConfig.Instrace.Publish("channel.Success", productMessage.ID.ToString());
62                 Console.WriteLine(string.Format("{0} 发送消费完成通知:{1}", DateTime.Now.ToString(), productMessage.ID.ToString()));
63             }
64             catch (Exception ex)
65             {
66                 //Logger.Error(ex);
67                 unitOfWork.Rollback();
68             }
69         }
70     }
71 }

 10)好了,到了最后检验成果的时候了

先打开Demo.Sell.App.exe、然后打开Demo.Reserve.App.exe

 

大功告成!

 

编辑:董密龙

发布:2019-03-21 02:52:10

当前文章:http://leetaemin.cn/2018ircimepusl/index.html

小学新生:要做哪些准备适应新环境 环境磁场能量时时影响您的心情与运气 罗李华:天秤座2016年运势 如何修炼一个吸引妹子的领袖型人格? 泡妞的一个中心和两个基点 董卿为什么没有被狐狸狗催眠? 怎样才能找到一位好的整形医生——保证安全和效果 罗李华语录

47415 24287 87158 53179 16354 36310 50216 81120 35295 33494 81177 79037 24530 30509 19599 72855 22972 78464 61245 59043

我要说两句: (0人参与)

发布