c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

    正在检查是否收录...
一言准备中...

想用ZeroMq的发布订阅者模式,又不想写一大串switch case?

想用RPC函数代理机制,又想多对多进行通讯?

下面就结合二者的优点重新封装一套通讯模块

一、先写ZeroMq的发布订阅这模式

  •  

    先做个代理,负责分发事件,代码如下:
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯
1 // 1. 初始化代理(Proxy) 2 var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225"); 3 var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216"); 4  { 5 _proxy = new Proxy(xSubSocket, xPubSocket); 6 // 2. 启动代理(异步运行) 7 var proxyTask = Task.Run(() => _proxy.Start()); 8 }
View Code
  • 封装客户端代码
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯
 1 using Communication.Nmq.Dto;  2 using System;  3 using System.Collections.Generic;  4  5 namespace Communication.Nmq  6 {  7 public class MqClientMain  8  {  9 public readonly static MqClientMain Instance = new MqClientMain();  10 internal Transceiver _client;  11 private List<MqType> _subscriberList = new();  12 private List<MqType> _unSubscriberList = new();  13 private readonly MethodManager _methodManager = new MethodManager();  14 private bool _isListnerAll;  15 private MqType _owner;  16 protected MqClientMain() { }  17  18  19 /// <summary>  20 /// 函数反射监听(可监听多个)  21 /// </summary>  22 /// <param name="targetInstance"></param>  23 /// <param name="targgetMonitorList"></param>  24 /// <returns></returns>  25 public MqClientMain ProxyAddInstanceMethods<InterfaceType>(InterfaceType targetInstance, params MqType[] targgetMonitorList) where InterfaceType : class  26  {  27 foreach (MqType targgetMonitor in targgetMonitorList)  28  {  29 if (!_subscriberList.Contains(targgetMonitor))  30  _subscriberList.Add(targgetMonitor);  31  }  32  33  _methodManager.AddInstanceMethods(targetInstance);  34 return this;  35  }  36  37  38 /// <summary>  39 ///额外增加事件(可监听多个)  40 /// </summary>  41 /// <param name="targetInstance"></param>  42 /// <param name="mathName"></param>  43 /// <param name="targgetMonitor"></param>  44 /// <returns></returns>  45 public MqClientMain ProxyAddMethods(object targetInstance, string[] mathName, params MqType[] targgetMonitorList)  46  {  47 foreach (MqType targgetMonitor in targgetMonitorList)  48  {  49 if (!_subscriberList.Contains(targgetMonitor))  50  _subscriberList.Add(targgetMonitor);  51  }  52  _methodManager.AddMethods(mathName, targetInstance);  53 return this;  54  }  55  56 /// <summary>  57 /// 开始通讯  58 /// </summary>  59 /// <param name="owner">注册者类型(你是谁)</param>  60 public virtual void Start(MqType owner)  61  {  62 if (_client == null)  63  {  64 if (_isListnerAll)  65  {  66 //监听所有会监听到自己,所以不监听自己  67  _subscriberList.Remove(owner);  68  }  69 _owner = owner;  70 _client = new Transceiver(owner, _subscriberList, _unSubscriberList, _methodManager);  71  72  73  }  74  }  75  76 public void Stop()  77  {  78  _client.Dispose();  79  }  80  81 /// <summary>  82 /// 发布事件  83 /// </summary>  84 /// <param name="msg"></param>  85 public void MqSendMessage(string msg)  86  {  87 if (_client != null)  88  {  89  _client.SendMessage(msg);  90  }  91  }  92  93 /// <summary>  94 /// 代理列表  95 /// </summary>  96 private Dictionary<Type, object> _proxyList = new();  97 /// <summary>  98 /// 获取代理  99 /// </summary> 100 /// <typeparam name="T"></typeparam> 101 /// <returns></returns> 102 public T GetInterfaceProxy<T>() where T : class 103  { 104 if (_client == null) 105 return null; 106 if (!_proxyList.ContainsKey(typeof(T))) 107 _proxyList.Add(typeof(T), InterfaceProxy<T>.Create(_client)); 108 return (T)_proxyList[typeof(T)]; 109  } 110  } 111 }
View Code

二、封装一下RPC的函数代理

  • 封装一个接口代理类
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯
 1 internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class  2  {  3 private static Transceiver _client;  4  5 private static JsonSerializerOptions _options = new JsonSerializerOptions  6  {  7 WriteIndented = true, // 让 JSON 格式更加可读  8 Converters = { new JsonStringEnumConverter() } // 使用字符串枚举转换器  9  }; 10 internal static TInterface Create(Transceiver client) 11  { 12 object proxy = Create<TInterface, InterfaceProxy<TInterface>>(); 13 _client = client; 14 return (TInterface)proxy; 15  } 16 protected override object Invoke(MethodInfo targetMethod, object[] args) 17  { 18 var message = new ProxyMessage 19  { 20 InterfaceType = typeof(TInterface).FullName, 21 Method = targetMethod.Name, 22 Parameters = args, 23  }; 24  _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options)); 25 return targetMethod.ReturnType; 26  } 27 }
View Code
  • 复制一份RPC封装的获取类里面的所有方法
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯
 1 public class MethodManager  2  {  3 private readonly string[] instanceMethodsOnObject = new string[4] { "Equals", "GetHashCode", "GetType", "ToString" };  4  5 /// <summary>  6 /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。  7 /// 内部字典的键为整数,值为 Method 对象。  8 /// </summary>  9 public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase);  10  11 /// <summary>  12 /// 根据方法名和参数数量获取方法  13 /// </summary>  14 /// <param name="name">方法名</param>  15 /// <param name="paramCount">参数数量</param>  16 /// <returns>找到的方法对象,若未找到则返回null</returns>  17 public Method Get(string name, int paramCount)  18  {  19 if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2))  20  {  21 return value2;  22  }  23  24 if (name != "*")  25  {  26 return Get("*", 2);  27  }  28  29 return null;  30  }  31  32 /// <summary>  33 /// 向方法集合中添加一个方法。  34 /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。  35 /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息,  36 /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。  37 /// </summary>  38 public void Add(Method method)  39  {  40 if (!Methods.ContainsKey(method.Name))  41  {  42 Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>());  43  }  44  45 ConcurrentDictionary<int, Method> concurrentDictionary = Methods[method.Name];  46 ParameterInfo[] parameters = method.Parameters;  47 int num = parameters.Length;  48 int num2 = 0;  49 for (int i = 0; i < num; i++)  50  {  51 ParameterInfo parameterInfo = parameters[i];  52 if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType))  53  {  54 num2 = 1;  55  }  56 else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue)  57  {  58 concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method);  59  }  60  }  61  62 concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method);  63  }  64  65 /// <summary>  66 /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。  67 /// </summary>  68 /// <param name="methodInfo">方法的信息。</param>  69 /// <param name="name">方法的名称。</param>  70 /// <param name="target">方法的目标对象,默认为null。</param>  71 public void Add(MethodInfo methodInfo, string name, object target = null)  72  {  73 Add(new Method(methodInfo, name, target));  74  }  75  76 /// <summary>  77 /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。  78 /// </summary>  79 /// <param name="name">方法的名称。</param>  80 /// <param name="target">包含方法的对象。</param>  81 /// <param name="alias">方法的别名,如果为空则使用方法名称。</param>  82 public void AddMethod(string name, object target, string alias = "")  83  {  84 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);  85 if (string.IsNullOrEmpty(alias))  86  {  87 alias = name;  88  }  89  90 MethodInfo[] array = methods;  91 foreach (MethodInfo methodInfo in array)  92  {  93 if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase))  94  {  95  Add(methodInfo, alias, target);  96  }  97  }  98  }  99 100 /// <summary> 101 /// 向目标对象添加方法。 102 /// </summary> 103 /// <param name="names">方法名称数组。</param> 104 /// <param name="target">目标对象,方法将添加到该对象上。</param> 105 /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param> 106 public void AddMethods(string[] names, object target, string ns = "") 107  { 108 foreach (string text in names) 109  { 110 if (string.IsNullOrEmpty(ns)) 111  { 112  AddMethod(text, target, text); 113  } 114 else 115  { 116 AddMethod(text, target, ns + "_" + text); 117  } 118  } 119  } 120 121 /// <summary> 122 /// 向目标对象添加实例方法。 123 /// </summary> 124 /// <param name="target">目标对象,其实例方法将被添加。</param> 125 /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param> 126 public void AddInstanceMethods(object target, string ns = "") 127  { 128 MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public); 129 foreach (MethodInfo methodInfo in methods) 130  { 131 if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1) 132  { 133 string text = methodInfo.Name; 134 if (!string.IsNullOrEmpty(ns)) 135  { 136 text = ns + "_" + text; 137  } 138 139  Add(methodInfo, text, target); 140  } 141  } 142  } 143 }
View Code
  • 通过反射执行方法
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯
 1 using Communication.Nmq.Dto;  2 using NetMQ;  3 using NetMQ.Sockets;  4 using System;  5 using System.Collections.Generic;  6 using System.Linq;  7 using Communication.Utils;  8 using Newtonsoft.Json;  9  10 namespace Communication.Nmq  11 {  12 internal class Transceiver : IDisposable  13  {  14 private List<MqType> SubscribeTypes;  15 private List<MqType> UnSubscribleTypes;  16 private MethodManager FunListeners;  17 private string Owner;  18 private PublisherSocket ClientPub;  19 private SubscriberSocket ClientSub;  20 private NetMQPoller Poller;  21 private static readonly object SendLock = new();  22  23  24 internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener)  25  {  26 SubscribeTypes = subscribeType;  27 UnSubscribleTypes = unSubscribleType;  28 FunListeners = funListener;  29 Owner = owner.ToString();  30 ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225");  31 ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216");  32 Poller = new NetMQPoller { ClientSub };  33  SubTopic();  34 ClientSub.ReceiveReady += ClientSub_ReceiveReady;  35  Poller.RunAsync();  36  }  37  38 private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e)  39  {  40 try  41  {  42 List<string> frames = new();  43 if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null)  44  {  45 Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr);  46 return;  47  }  48 if (frames.Count == 2)  49  {  50 string topic = frames[0];  51 string msg = frames[1];  52 if (Enum.TryParse(topic, out MqType topicType))  53  {  54 if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method))  55  {  56 if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods))  57  {  58 foreach (var methodInfo in methods.Select(m => m.Value))  59  {  60 try  61  {  62 var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters[i].ParameterType)).ToArray();  63 methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters);  64  }  65 catch (Exception ex)  66  {  67 Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr);  68 return;  69  }  70  }  71  }  72 else  73  {  74 throw new InvalidOperationException("找不到对应的函数");  75  }  76  }  77 else  78  {  79 throw new InvalidOperationException("无法转换格式");  80  }  81  }  82 else  83  {  84 Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr);  85  }  86  }  87 else  88  {  89 Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr);  90  }  91  }  92 catch (Exception ex)  93  {  94 Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr);  95  }  96  }  97  98 public object SafeChangeType(object value, Type targetType)  99  { 100 if (targetType.IsEnum && value is string strValue) 101  { 102 return Enum.Parse(targetType, strValue); 103  } 104 return Convert.ChangeType(value, targetType); 105  } 106 107 private bool TryDeserializeProxyMessage(string json, out ProxyMessage message) 108  { 109 message = null; 110 try 111  { 112 message = JsonConvert.DeserializeObject<ProxyMessage>(json); 113 return message != null; 114  } 115 catch 116  { 117 return false; 118  } 119  } 120 121 private void SubTopic() 122  { 123 if (SubscribeTypes?.Any() == true) 124  { 125 foreach (var item in SubscribeTypes) 126  { 127  ClientSub.Subscribe(item.ToString()); 128  } 129  } 130 if (UnSubscribleTypes?.Any() == true) 131  { 132 foreach (var item in UnSubscribleTypes) 133  { 134  ClientSub.Unsubscribe(item.ToString()); 135  } 136  } 137  } 138 139 internal void SendMessage(string msg) 140  { 141 try 142  { 143 lock (SendLock) 144  { 145 var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg); 146  } 147  } 148 catch (Exception ex) 149  { 150 Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr); 151  } 152  } 153 154 public void Dispose() 155  { 156  Poller.Stop(); 157 ClientPub?.Dispose(); 158 ClientSub?.Dispose(); 159  Poller.Dispose(); 160  } 161  } 162 }
View Code

 



  • 本文作者:WAP站长网
  • 本文链接: https://wapzz.net/post-26920.html
  • 版权声明:本博客所有文章除特别声明外,均默认采用 CC BY-NC-SA 4.0 许可协议。
本站部分内容来源于网络转载,仅供学习交流使用。如涉及版权问题,请及时联系我们,我们将第一时间处理。
文章很赞!支持一下吧 还没有人为TA充电
为TA充电
还没有人为TA充电
0
  • 支付宝打赏
    支付宝扫一扫
  • 微信打赏
    微信扫一扫
感谢支持
文章很赞!支持一下吧
关于作者
2.7W+
8
1
1
WAP站长官方

扣子Coze智能体实战:自动化拆解抖音对标账号,输出完整分析报告(喂饭级教程)

上一篇

【VMware vSphere】借助 Live Patch 无停机修补 vSphere 9 集群。

下一篇
评论区
内容为空

这一切,似未曾拥有

  • 复制图片
按住ctrl可打开默认菜单