using System; using System.Collections.Generic; using System.Linq; using System.Text; using WebSocketSharp; using System.Threading; using System.Configuration; using System.IO; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; namespace Bowin.Common.Mobile { public class MessageBody { public string Type { get; set; } public string Key { get; set; } public string Message { get; set; } } public static class MobileMQ { private const string CONFIG_HEARTBEAT_INTERVAL_NAME = "MobileMQ_Heartbeat_Interval"; private enum SendType { HeartBeat, Message } private class MQAddress { internal string IPAddress { get; set; } internal int Port { get; set; } } private static Dictionary _ipPool; private static WebSocket socket; static MobileMQ() { _ipPool = new Dictionary(); } private static void OnInit() { int interval; try { interval = Convert.ToInt32(ConfigurationManager.AppSettings[CONFIG_HEARTBEAT_INTERVAL_NAME]); } catch { //默认两分钟ping一次 interval = 120000; } while (_ipPool.Count > 0) { Thread.Sleep(interval); //测心跳,清废弃链接 HeartBeat(); } } public static void Regist(string key, string ip, int port) { if (!_ipPool.ContainsKey(key)) { _ipPool.Add(key, new MQAddress { IPAddress = ip, Port = port }); if (_ipPool.Count == 1) { OnInit(); } } else { _ipPool[key].IPAddress = ip; _ipPool[key].Port = port; } } public static void Remove(string key) { _ipPool.Remove(key); } public static void SendMessage(string[] keys, MessageBody message) { sendMessage(SendType.Message, message, x => keys.Contains(x.Key)); } public static void SendMessage(MessageBody message) { sendMessage(SendType.Message, message); } private static void sendMessage(SendType sendType, MessageBody message = null, Func, bool> predicate = null) { List> sendPool; if (predicate == null) { sendPool = _ipPool.ToList(); } else { sendPool = _ipPool.Where(predicate).ToList(); } sendPool.ForEach(x => { socket = new WebSocket(string.Format("ws://{0}:{1}", x.Value.IPAddress, x.Value.Port)); socket.Connect(); //发送两种信号:心跳和消息 if (sendType == SendType.HeartBeat) { if (!socket.Ping()) { Remove(x.Key); } } else { if (message != null) { using (MemoryStream ms = new MemoryStream()) { IFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, message); socket.SendAsync(ms.GetBuffer(), (res => { socket.Close(); })); } } } }); } private static void HeartBeat() { _ipPool.ToList().ForEach(x => { sendMessage(SendType.HeartBeat); }); } } }