想用ZeroMq的发布订阅者模式,又不想写一大串switch case?
想用RPC函数代理机制,又想多对多进行通讯?
下面就结合二者的优点重新封装一套通讯模块
一、先写ZeroMq的发布订阅这模式


1 // 1. 初始化代理(Proxy) 2 var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225"); 3 var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216"); 4 { 5 _proxy = new Proxy(xSubSocket, xPubSocket); 6 // 2. 启动代理(异步运行) 7 var proxyTask = Task.Run(() => _proxy.Start()); 8 }View Code
- 封装客户端代码


1 using Communication.Nmq.Dto; 2 using System; 3 using System.Collections.Generic; 4 5 namespace Communication.Nmq 6 { 7 public class MqClientMain 8 { 9 public readonly static MqClientMain Instance = new MqClientMain(); 10 internal Transceiver _client; 11 private List<MqType> _subscriberList = new(); 12 private List<MqType> _unSubscriberList = new(); 13 private readonly MethodManager _methodManager = new MethodManager(); 14 private bool _isListnerAll; 15 private MqType _owner; 16 protected MqClientMain() { } 17 18 19 /// <summary> 20 /// 函数反射监听(可监听多个) 21 /// </summary> 22 /// <param name="targetInstance"></param> 23 /// <param name="targgetMonitorList"></param> 24 /// <returns></returns> 25 public MqClientMain ProxyAddInstanceMethods<InterfaceType>(InterfaceType targetInstance, params MqType[] targgetMonitorList) where InterfaceType : class 26 { 27 foreach (MqType targgetMonitor in targgetMonitorList) 28 { 29 if (!_subscriberList.Contains(targgetMonitor)) 30 _subscriberList.Add(targgetMonitor); 31 } 32 33 _methodManager.AddInstanceMethods(targetInstance); 34 return this; 35 } 36 37 38 /// <summary> 39 ///额外增加事件(可监听多个) 40 /// </summary> 41 /// <param name="targetInstance"></param> 42 /// <param name="mathName"></param> 43 /// <param name="targgetMonitor"></param> 44 /// <returns></returns> 45 public MqClientMain ProxyAddMethods(object targetInstance, string[] mathName, params MqType[] targgetMonitorList) 46 { 47 foreach (MqType targgetMonitor in targgetMonitorList) 48 { 49 if (!_subscriberList.Contains(targgetMonitor)) 50 _subscriberList.Add(targgetMonitor); 51 } 52 _methodManager.AddMethods(mathName, targetInstance); 53 return this; 54 } 55 56 /// <summary> 57 /// 开始通讯 58 /// </summary> 59 /// <param name="owner">注册者类型(你是谁)</param> 60 public virtual void Start(MqType owner) 61 { 62 if (_client == null) 63 { 64 if (_isListnerAll) 65 { 66 //监听所有会监听到自己,所以不监听自己 67 _subscriberList.Remove(owner); 68 } 69 _owner = owner; 70 _client = new Transceiver(owner, _subscriberList, _unSubscriberList, _methodManager); 71 72 73 } 74 } 75 76 public void Stop() 77 { 78 _client.Dispose(); 79 } 80 81 /// <summary> 82 /// 发布事件 83 /// </summary> 84 /// <param name="msg"></param> 85 public void MqSendMessage(string msg) 86 { 87 if (_client != null) 88 { 89 _client.SendMessage(msg); 90 } 91 } 92 93 /// <summary> 94 /// 代理列表 95 /// </summary> 96 private Dictionary<Type, object> _proxyList = new(); 97 /// <summary> 98 /// 获取代理 99 /// </summary> 100 /// <typeparam name="T"></typeparam> 101 /// <returns></returns> 102 public T GetInterfaceProxy<T>() where T : class 103 { 104 if (_client == null) 105 return null; 106 if (!_proxyList.ContainsKey(typeof(T))) 107 _proxyList.Add(typeof(T), InterfaceProxy<T>.Create(_client)); 108 return (T)_proxyList[typeof(T)]; 109 } 110 } 111 }View Code
二、封装一下RPC的函数代理
- 封装一个接口代理类


1 internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class 2 { 3 private static Transceiver _client; 4 5 private static JsonSerializerOptions _options = new JsonSerializerOptions 6 { 7 WriteIndented = true, // 让 JSON 格式更加可读 8 Converters = { new JsonStringEnumConverter() } // 使用字符串枚举转换器 9 }; 10 internal static TInterface Create(Transceiver client) 11 { 12 object proxy = Create<TInterface, InterfaceProxy<TInterface>>(); 13 _client = client; 14 return (TInterface)proxy; 15 } 16 protected override object Invoke(MethodInfo targetMethod, object[] args) 17 { 18 var message = new ProxyMessage 19 { 20 InterfaceType = typeof(TInterface).FullName, 21 Method = targetMethod.Name, 22 Parameters = args, 23 }; 24 _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options)); 25 return targetMethod.ReturnType; 26 } 27 }View Code
- 复制一份RPC封装的获取类里面的所有方法


1 public class MethodManager 2 { 3 private readonly string[] instanceMethodsOnObject = new string[4] { "Equals", "GetHashCode", "GetType", "ToString" }; 4 5 /// <summary> 6 /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。 7 /// 内部字典的键为整数,值为 Method 对象。 8 /// </summary> 9 public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase); 10 11 /// <summary> 12 /// 根据方法名和参数数量获取方法 13 /// </summary> 14 /// <param name="name">方法名</param> 15 /// <param name="paramCount">参数数量</param> 16 /// <returns>找到的方法对象,若未找到则返回null</returns> 17 public Method Get(string name, int paramCount) 18 { 19 if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2)) 20 { 21 return value2; 22 } 23 24 if (name != "*") 25 { 26 return Get("*", 2); 27 } 28 29 return null; 30 } 31 32 /// <summary> 33 /// 向方法集合中添加一个方法。 34 /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。 35 /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息, 36 /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。 37 /// </summary> 38 public void Add(Method method) 39 { 40 if (!Methods.ContainsKey(method.Name)) 41 { 42 Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>()); 43 } 44 45 ConcurrentDictionary<int, Method> concurrentDictionary = Methods[method.Name]; 46 ParameterInfo[] parameters = method.Parameters; 47 int num = parameters.Length; 48 int num2 = 0; 49 for (int i = 0; i < num; i++) 50 { 51 ParameterInfo parameterInfo = parameters[i]; 52 if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType)) 53 { 54 num2 = 1; 55 } 56 else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue) 57 { 58 concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method); 59 } 60 } 61 62 concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method); 63 } 64 65 /// <summary> 66 /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。 67 /// </summary> 68 /// <param name="methodInfo">方法的信息。</param> 69 /// <param name="name">方法的名称。</param> 70 /// <param name="target">方法的目标对象,默认为null。</param> 71 public void Add(MethodInfo methodInfo, string name, object target = null) 72 { 73 Add(new Method(methodInfo, name, target)); 74 } 75 76 /// <summary> 77 /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。 78 /// </summary> 79 /// <param name="name">方法的名称。</param> 80 /// <param name="target">包含方法的对象。</param> 81 /// <param name="alias">方法的别名,如果为空则使用方法名称。</param> 82 public void AddMethod(string name, object target, string alias = "") 83 { 84 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public); 85 if (string.IsNullOrEmpty(alias)) 86 { 87 alias = name; 88 } 89 90 MethodInfo[] array = methods; 91 foreach (MethodInfo methodInfo in array) 92 { 93 if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase)) 94 { 95 Add(methodInfo, alias, target); 96 } 97 } 98 } 99 100 /// <summary> 101 /// 向目标对象添加方法。 102 /// </summary> 103 /// <param name="names">方法名称数组。</param> 104 /// <param name="target">目标对象,方法将添加到该对象上。</param> 105 /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param> 106 public void AddMethods(string[] names, object target, string ns = "") 107 { 108 foreach (string text in names) 109 { 110 if (string.IsNullOrEmpty(ns)) 111 { 112 AddMethod(text, target, text); 113 } 114 else 115 { 116 AddMethod(text, target, ns + "_" + text); 117 } 118 } 119 } 120 121 /// <summary> 122 /// 向目标对象添加实例方法。 123 /// </summary> 124 /// <param name="target">目标对象,其实例方法将被添加。</param> 125 /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param> 126 public void AddInstanceMethods(object target, string ns = "") 127 { 128 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public); 129 foreach (MethodInfo methodInfo in methods) 130 { 131 if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1) 132 { 133 string text = methodInfo.Name; 134 if (!string.IsNullOrEmpty(ns)) 135 { 136 text = ns + "_" + text; 137 } 138 139 Add(methodInfo, text, target); 140 } 141 } 142 } 143 }View Code
- 通过反射执行方法


1 using Communication.Nmq.Dto; 2 using NetMQ; 3 using NetMQ.Sockets; 4 using System; 5 using System.Collections.Generic; 6 using System.Linq; 7 using Communication.Utils; 8 using Newtonsoft.Json; 9 10 namespace Communication.Nmq 11 { 12 internal class Transceiver : IDisposable 13 { 14 private List<MqType> SubscribeTypes; 15 private List<MqType> UnSubscribleTypes; 16 private MethodManager FunListeners; 17 private string Owner; 18 private PublisherSocket ClientPub; 19 private SubscriberSocket ClientSub; 20 private NetMQPoller Poller; 21 private static readonly object SendLock = new(); 22 23 24 internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener) 25 { 26 SubscribeTypes = subscribeType; 27 UnSubscribleTypes = unSubscribleType; 28 FunListeners = funListener; 29 Owner = owner.ToString(); 30 ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225"); 31 ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216"); 32 Poller = new NetMQPoller { ClientSub }; 33 SubTopic(); 34 ClientSub.ReceiveReady += ClientSub_ReceiveReady; 35 Poller.RunAsync(); 36 } 37 38 private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e) 39 { 40 try 41 { 42 List<string> frames = new(); 43 if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null) 44 { 45 Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr); 46 return; 47 } 48 if (frames.Count == 2) 49 { 50 string topic = frames[0]; 51 string msg = frames[1]; 52 if (Enum.TryParse(topic, out MqType topicType)) 53 { 54 if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method)) 55 { 56 if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods)) 57 { 58 foreach (var methodInfo in methods.Select(m => m.Value)) 59 { 60 try 61 { 62 var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters[i].ParameterType)).ToArray(); 63 methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters); 64 } 65 catch (Exception ex) 66 { 67 Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr); 68 return; 69 } 70 } 71 } 72 else 73 { 74 throw new InvalidOperationException("找不到对应的函数"); 75 } 76 } 77 else 78 { 79 throw new InvalidOperationException("无法转换格式"); 80 } 81 } 82 else 83 { 84 Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr); 85 } 86 } 87 else 88 { 89 Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr); 90 } 91 } 92 catch (Exception ex) 93 { 94 Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr); 95 } 96 } 97 98 public object SafeChangeType(object value, Type targetType) 99 { 100 if (targetType.IsEnum && value is string strValue) 101 { 102 return Enum.Parse(targetType, strValue); 103 } 104 return Convert.ChangeType(value, targetType); 105 } 106 107 private bool TryDeserializeProxyMessage(string json, out ProxyMessage message) 108 { 109 message = null; 110 try 111 { 112 message = JsonConvert.DeserializeObject<ProxyMessage>(json); 113 return message != null; 114 } 115 catch 116 { 117 return false; 118 } 119 } 120 121 private void SubTopic() 122 { 123 if (SubscribeTypes?.Any() == true) 124 { 125 foreach (var item in SubscribeTypes) 126 { 127 ClientSub.Subscribe(item.ToString()); 128 } 129 } 130 if (UnSubscribleTypes?.Any() == true) 131 { 132 foreach (var item in UnSubscribleTypes) 133 { 134 ClientSub.Unsubscribe(item.ToString()); 135 } 136 } 137 } 138 139 internal void SendMessage(string msg) 140 { 141 try 142 { 143 lock (SendLock) 144 { 145 var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg); 146 } 147 } 148 catch (Exception ex) 149 { 150 Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr); 151 } 152 } 153 154 public void Dispose() 155 { 156 Poller.Stop(); 157 ClientPub?.Dispose(); 158 ClientSub?.Dispose(); 159 Poller.Dispose(); 160 } 161 } 162 }View Code
这一切,似未曾拥有