使用MSMQ进行可靠的消息通讯

一、为什么要使用MSMQ

在一个分布式的环境中,我们往往需要根据具体的情况采用不同的方式进行数据的传输。比如在一个Intranet内,我们一般通过TCP进行高效的数据通信;而在一个Internet的环境中,我们则通常使用Http进行跨平台的数据交换。而这些通信方式具有一个显著的特点,那就是他们是基于Connection的,也就是说,交互双方在进行通信的时候必须保证有一个可用的Connection存在于他们之间。而在某些时候,比如那些使用拨号连接的用户、以及使用便携式计算机的用户,我们不能保证在他们和需要访问的Server之间有一个的可靠的连接,在这种情况下,基于Messaging Queue的连接就显得尤为重要了。我们今天就来谈谈在WCF中如何使用MSMQ。

MSMQ不仅仅是作为支持客户端连接工具而存在,合理的使用MSMQ可以在很大程度上提升系统的Performance和Scalability。我们先来看看MSMQ能给我们带来怎样的好处:

1.MSMQ是基于Disconnection

MSMQ通过Message Queue进行通信,这种通信方式为离线工作成为了可能。比如在介绍MSMQ时都会提到的Order Delivery的例子:在一个基于B2C的系统中,订单从各种各样的客户传来,由于 客户的各异性,不能保证每个客户在每时每刻都和用于接收订单的Server保持一个可靠的连接,我们有时候甚至允许客户即使在离线的情况下也可以递交订单(虽然订单不能发送到订单的接收方,但是我们可以通过某种机制保证先在本地保存该订单,一旦连接建立,则马上向接收方递交订单),而MSMQ则有效地提供了这样的机制:Server端建立一个Message Queue来接收来个客户的订单,客户端通过向该Message Queue发送承载了订单数据的Message实现订单的递交。如果在客户离线的情况下,他仍然可以通过客户端程序进行订单递交的操作,存储着订单数据的Message会被暂时保存在本地的Message Queue中,一旦客户联机,MSMQ将Message从中取出,发送到真正的接收方,而这个动作对于用户的透明的。

2.MSMQ天生是One-way、异步的

在MSMQ中,Message始终以One-way的方式进行发送,所以MSMQ具有天生的异步特性。所以MSMQ使用于那些对于用户的请求,Server端无需立即响应的场景。也就是说Server对数据的处理无需和Client的数据的发送进行同步,它可以独自地按照自己的Schedule进行工作。这可以避免峰值负载。比如Server端可以在一个相对低负载的时段(比如深夜)来对接收到的Order进行批处理,而无需一天24小时一直进行Order的监听、接收和处理。

3.MSMQ能够提供高质量的Reliable Messaging

我们知道,在一般的情况下,如果Client端以异步的方式对Service进行调用就意味着:Client无法获知Message是否成功抵达Service端;也不会获得Service端执行的结果和出错信息。但是我们仍然说MSMQ为我们提供了可靠的传输(Reliable Messaging),这主要是因为MSMQ为我们提供一些列Reliable Messaging的机制:

  • 超时机制(Timeout):可以设置发送和接收的时间,超出该时间则被认为操作失败。

  • 确认机制(Acknowledgement):当Message成功抵达Destination Queue,或者被成功接收,向发送端发送一个Acknowledgement message用以确认操作的状态。

  • 日志机制(Journaling):当Message被发送或接收后,被Copy一份存放在Journal Queue中。

此外,MSMQ还提供了死信队列(Dead letter Queue)用以保存发送失败的message。这一切保证了保证了Reliable Messaging。

二、 MSMQ在WCF的运用

在WCF中,MSMQ提供的数据传输功能被封装在一个Binding中,提供WCF Endpoint之间、以及Endpoint和现有的基于MSMQ的Application进行通信的实现。为此WCF为我们提供了两种不同的built-in binding:

  • NetMsmqBinding:从提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding没有什么区别:在两个Endpoint之间实现了数据的通信,所不同的是,它提供的是基于MSMQ的Reliable Messaging。从变成模式上看,和一般的binding完全一样。

  • MsmqIntegrationBinding:从命名上我们可以看出,MsmqIntegrationBinding主要用于需要将我们的WCF Application和现有的基于MSMQ的Application集成的情况。MsmqIntegrationBinding实现了WCF Endpoint和某个Message Queue进行数据的通信,具体来说,就是实现了单一的向某个Message Queue 发送Message,和从某个Message Queue中接收Message的功能。从编程模式上看,也有所不同,比如Operation只接收一个MsmqMessage<T>的参数。

这是Client和Service通信的图示:

三、MSMQ和Transaction(事务)

MSMQ提供对Transaction的支持。在一般的情况下,MSMQ通过Message Queue Transaction实现对Transaction的原生的支持,借助Message Queue Transaction,可以把基于一个或多个Message Queue的相关操作纳入同一个Transaction中。

Message Queue Transaction仅仅限于基于Message Queue的操作,倘若操作涉及到另外一些资源,比如SQL Server, 则可以使用基于DTC的分布式Transaction

对于WCF中MSMQ,由于Client和Service的相对独立(可能Client发送Message到Service处理Message会相隔很长一段时间),所以Client和Service的操作只能纳入不同的Transaction中,如下图。

四、Sample1:NetMsmqBinding

我们首先做一个基于NetMsmqBinding Sample,实现的功能就是我们开篇所提出的Order Delivery。我们说过,NetMsmqBinding和一般的binding在实现的功能和变成模式上完全一样。下面是我们熟悉的4层结构:



1.Contract

DataContract:Order & OrderItem

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;

namespace Artech.QueuedService.Contract
{
	[DataContract]
	[KnownType(typeof(OrderItem))]
	public class Order
	{
		private Guid _orderNo;
		private DateTime _orderDate;
		private Guid _supplierID;
		private string _supplierName;
		private IList<OrderItem> _orderItems;

		public Order()
		{
			this._orderItems = new List<OrderItem>();
		}

		public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
		{
			this._orderNo = orderNo;
			this._orderDate = orderDate;
			this._supplierID = supplierID;
			this._supplierName = supplierName;
			this._orderItems = new List<OrderItem>();
		}


		[DataMember]
		public Guid OrderNo
		{
			get { return _orderNo; }
			set { _orderNo = value; }
		}
		[DataMember]
		public DateTime OrderDate
		{
			get { return _orderDate; }
			set { _orderDate = value; }
		}
		[DataMember]
		public Guid SupplierID
		{
			get { return _supplierID; }
			set { _supplierID = value; }
		}

		[DataMember]
		public string SupplierName
		{
			get { return _supplierName; }
			set { _supplierName = value; }
		}

		[DataMember]
		public IList<OrderItem> OrderItems
		{
			get { return _orderItems; }
			set { _orderItems = value; }
		}

		public override string ToString()
		{
			string description = string.Format("General Informaion:\n\tOrder No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
			StringBuilder productList = new StringBuilder();
			productList.AppendLine("\nProducts:");
			int index = 0;
			foreach (OrderItem item in this._orderItems)
			{
				productList.AppendLine(string.Format("\n{4}. \tNo.\t\t: {0}\n\tName\t\t: {1}\n\tPrice\t\t: {2}\n\tQuantity\t: {3}", item.ProductID, item.ProductName, item.UnitPrice, item.Quantity, ++index));
			}
			return description + productList.ToString();
		}
	}
}


using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.QueuedService.Contract 
{ 
	[DataContract] 
	public class OrderItem 
	{ 
		
		private Guid _productID;        
		private string _productName;        
		private decimal _unitPrice;        
		private int _quantity;      
		public OrderItem()        
		{ 
		}        
		public OrderItem(Guid productID, string productName, decimal unitPrice, int quantity)        
		{            
			this._productID = productID;            
			this._productName = productName;            
			this._unitPrice = unitPrice;            
			this._quantity = quantity;        
		}      
		[DataMember]        
		public Guid ProductID        
		{            
			get { return _productID; }            
			set { _productID = value; }        
		}        
		[DataMember]        
		public string ProductName        
		{            
			get { return _productName; }            
			set { _productName = value; }       
		}        
		[DataMember]        
		public decimal UnitPrice        
		{            
			get { return _unitPrice; }            
			set { _unitPrice = value; }        
		}        
		[DataMember]        
		public int Quantity        
		{            
			get { return _quantity; }            
			set { _quantity = value; }        
		}       
	
	}
}

ServiceContract: IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.QueuedService.Contract 
{ 
	[ServiceContract]
	[ServiceKnownType(typeof(Order))] 
	public interface IOrderProcessor 
	{ 
		[OperationContract(IsOneWay = true)] 
		void Submit(Order order); 
	} 
}

2.Service:IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;

namespace Artech.QueuedService.Service
{
	public class OrderProcessorService : IOrderProcessor
	{
		[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
		public void Submit(Order order)
		{
			Orders.Add(order);
			Console.WriteLine("Receive an order.");
			Console.WriteLine(order.ToString());
		}
	}
}

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;

namespace Artech.QueuedService.Service 
{ 
	public static class Orders 
	{ 
		private static IDictionary<Guid, Order> _orderList = new Dictionary<Guid, Order>(); 
		public static void Add(Order order) 
		{ 
			_orderList.Add(order.OrderNo, order); 
		} 
		
		public static Order GetOrder(Guid orderNo) 
		{ 
			return _orderList[orderNo]; 
		} 
	} 
}

3.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
	<system.serviceModel>
		<bindings>
			<netMsmqBinding>
				<binding name="msmqBinding">
					<security>
						<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
						<message clientCredentialType="None" />
					</security>
				</binding>
			</netMsmqBinding>
		</bindings>
		<services>
			<service name="Artech.QueuedService.Service. OrderProcessorService">
				<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"  
						  bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor" />
			</service>
		</services>
	</system.serviceModel>
</configuration>

在默认的情况下,netMsmqBinding 的msmqAuthenticationModeWindowsDomain,由于基于WindowsDomain必须安装AD,利于在本机模拟,我把msmqAuthenticationMode改为None,相应的ProtectionLevel和clientCredentialType改为None。

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.ServiceModel;
using Artech.QueuedService.Service;

namespace Artech.QueuedService.Hosting
{
	class Program
	{
		static void Main(string[] args)
		{
			string path = @".\private$\orders";
			if (!MessageQueue.Exists(path))
			{
				MessageQueue.Create(path, true);
			}
			using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
			{
				host.Opened += delegate
				{
					Console.WriteLine("Service has begun to listen\n\n");
				};
				host.Open();
				Console.Read();
			}
		}
	}
}

在Host Service之前,通过MessageQueue.Create创建一个Message Queue,第二个参数为表明Queue是否支持Transaction的indicator,这里支持Transaction。

4.Client:

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
	<system.serviceModel>
		<bindings>
			<netMsmqBinding>
				<binding name="msmqBinding">
					<security>
						<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
						<message clientCredentialType="None" />
					</security>
				</binding>
			</netMsmqBinding>
		</bindings>
		<client>
			<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"                
					  bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor"                
					  name="defaultEndpoint" />
		</client>
	</system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
using System.Transactions;

namespace Artech.QueuedService.Client 
{ 
	class Program 
	{ 
		static void Main(string[] args) 
		{ 
			ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint"); 
			IOrderProcessor channel = channelFactory.CreateChannel(); 
			Order order = new Order(Guid.NewGuid(), DateTime.Today, Guid.NewGuid(), "A Company"); 
			order.OrderItems.Add(new OrderItem(Guid.NewGuid(), "PC", 5000, 20)); 
			order.OrderItems.Add(new OrderItem(Guid.NewGuid(), "Printer", 7000, 2)); 
			Console.WriteLine("Submit order to server"); 
			using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required)) 
			{ 
				channel.Submit(order); scope.Complete(); } 
			Console.Read(); 
		} 
	} 
}


先后运行Host和Client,Host端有下面的输出:

image.png




链接:https://pan.baidu.com/s/1dLIBPHD1lSi_P-kEMyM7Fw 

提取码:azuh 

--来自百度网盘超级会员V6勇哥的分享





本文出自勇哥的网站《少有人走的路》wwww.skcircle.com,转载请注明出处!讨论可扫码加群:
本帖最后由 勇哥,很想停止 于 2024-06-26 22:10:00 编辑

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

会员中心
搜索
«    2025年4月    »
123456
78910111213
14151617181920
21222324252627
282930
网站分类
标签列表
最新留言
    热门文章 | 热评文章 | 随机文章
文章归档
友情链接
  • 订阅本站的 RSS 2.0 新闻聚合
  • 扫描加本站机器视觉QQ群,验证答案为:halcon勇哥的机器视觉
  • 点击查阅微信群二维码
  • 扫描加勇哥的非标自动化群,验证答案:C#/C++/VB勇哥的非标自动化群
  • 扫描加站长微信:站长微信:abc496103864
  • 扫描加站长QQ:
  • 扫描赞赏本站:
  • 留言板:

Powered By Z-BlogPHP 1.7.2

Copyright Your skcircle.com Rights Reserved.

鄂ICP备18008319号


站长QQ:496103864 微信:abc496103864