最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习。
1 ///2 /// RabbitMQ客户端封装类,基于EasyNetQ,使用时需要从nuget安装EasyNetQ。 3 /// 14 public class RabbitMqClient : IDisposable 15 { 16 private readonly IBus bus; 17 18 ///4 /// 13 ///5 /// 使用方法: 6 /// 12 ///7 /// using(var mq = new RabbitMqClient('rabbitmq连接字符串')) 8 /// { ... 9 /// } 10 ///
11 ///19 /// 构造函数 20 /// 21 /// rabbitmq连接字符串 22 public RabbitMqClient(string connectionString) 23 { 24 if (string.IsNullOrEmpty(connectionString)) 25 throw new ArgumentNullException(nameof(connectionString)); 26 bus = RabbitHutch.CreateBus(connectionString); 27 } 28 ///29 /// 发布一条消息(广播) 30 /// 31 /// 32 public void Publish(TMessage message) where TMessage:class 33 { 34 bus.PublishAsync(message); 35 } 36 37 /// 38 /// 指定Topic,发布一条消息 39 /// 40 /// 41 /// 42 public void PublishWithTopic(TMessage message, string topic) where TMessage : class 43 { 44 if(string.IsNullOrEmpty(topic)) 45 Publish(message); 46 else 47 bus.PublishAsync(message, x=>x.WithTopic(topic)); 48 } 49 50 /// 51 /// 发布消息。一次性发布多条 52 /// 53 /// 54 public void PublishMany(List messages) where TMessage : class 55 { 56 foreach (var message in messages) 57 { 58 Publish(message); 59 Thread.Sleep(50);//必须加上,以防消息阻塞 60 } 61 } 62 63 /// 64 /// 发布消息。一次性发布多条 65 /// 66 /// 67 /// 68 public void PublishManyWithTopic(List messages, string topic) where TMessage : class 69 { 70 foreach (var message in messages) 71 { 72 PublishWithTopic(message, topic); 73 Thread.Sleep(50);//必须加上,以防消息阻塞 74 } 75 } 76 77 /// 78 /// 给指定队列发送一条信息 79 /// 80 /// 队列名称 81 /// 消息 82 public void Send(string queue, TMessage message) where TMessage : class 83 { 84 bus.Send(queue, message); 85 } 86 87 /// 88 /// 给指定队列批量发送信息 89 /// 90 /// 队列名称 91 /// 消息 92 public void SendMany(string queue, IList messages) where TMessage : class 93 { 94 foreach (var message in messages) 95 { 96 SendAsync(queue, message); 97 Thread.Sleep(50);//必须加上,以防消息阻塞 98 } 99 }100 101 /// 102 /// 给指定队列发送一条信息(异步)103 /// 104 /// 队列名称105 /// 消息106 ///107 public async void SendAsync (string queue, TMessage message) where TMessage:class 108 {109 await bus.SendAsync(queue, message);110 }111 112 /// 113 /// 从指定队列接收一天信息,并做相关处理。114 /// 115 /// 队列名称116 /// 117 /// 消息处理委托方法118 ///119 /// 128 /// 129 public void Receive120 /// 例如:121 /// 127 ///122 /// message=>Task.Factory.StartNew(()=>{123 /// Console.WriteLine(message);124 /// })125 ///
126 ///(string queue, Func process) where TMessage:class 130 {131 bus.Receive(queue, process);132 }133 134 /// 135 /// 消息订阅136 /// 137 /// 消息订阅标识138 /// 139 /// 消息处理委托方法140 ///141 /// 150 /// 151 public void Subscribe142 /// 例如:143 /// 149 ///144 /// message=>Task.Factory.StartNew(()=>{145 /// Console.WriteLine(message);146 /// })147 ///
148 ///(string subscriptionId, Func process) where TMessage:class 152 {153 bus.Subscribe (subscriptionId, message => process(message));154 }155 156 /// 157 /// 消息订阅158 /// 159 /// 消息订阅标识160 /// 161 /// 消息处理委托方法162 ///163 /// 172 /// 173 /// topic174 public void SubscribeWithTopic164 /// 例如:165 /// 171 ///166 /// message=>Task.Factory.StartNew(()=>{167 /// Console.WriteLine(message);168 /// })169 ///
170 ///(string subscriptionId, Func process, string topic) where TMessage:class 175 {176 bus.Subscribe (subscriptionId, message => process(message), x=>x.WithTopic(topic));177 }178 179 /// 180 /// 自动订阅181 /// 182 /// 183 /// 184 /// 185 public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic)186 {187 var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);188 if (!string.IsNullOrEmpty(topic))189 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic);190 subscriber.Subscribe(Assembly.Load(assemblyName));191 }192 193 ///194 /// 资源释放195 /// 196 public void Dispose()197 {198 if (bus != null) bus.Dispose();199 }200 }