123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using WebSocketSharp;
- using System.Threading;
- using System.Configuration;
- using System.IO;
- using System.Runtime.Serialization;
- using System.Runtime.Serialization.Formatters.Binary;
- namespace Bowin.Common.Mobile
- {
- public class MessageBody
- {
- public string Type { get; set; }
- public string Key { get; set; }
- public string Message { get; set; }
- }
- public static class MobileMQ
- {
- private const string CONFIG_HEARTBEAT_INTERVAL_NAME = "MobileMQ_Heartbeat_Interval";
- private enum SendType
- {
- HeartBeat,
- Message
- }
- private class MQAddress
- {
- internal string IPAddress { get; set; }
- internal int Port { get; set; }
- }
- private static Dictionary<string, MQAddress> _ipPool;
- private static WebSocket socket;
- static MobileMQ() {
- _ipPool = new Dictionary<string, MQAddress>();
- }
- private static void OnInit()
- {
- int interval;
- try
- {
- interval = Convert.ToInt32(ConfigurationManager.AppSettings[CONFIG_HEARTBEAT_INTERVAL_NAME]);
- }
- catch
- {
- //默认两分钟ping一次
- interval = 120000;
- }
- while (_ipPool.Count > 0)
- {
- Thread.Sleep(interval);
- //测心跳,清废弃链接
- HeartBeat();
- }
- }
- public static void Regist(string key, string ip, int port)
- {
- if (!_ipPool.ContainsKey(key))
- {
- _ipPool.Add(key, new MQAddress { IPAddress = ip, Port = port });
- if (_ipPool.Count == 1)
- {
- OnInit();
- }
- }
- else
- {
- _ipPool[key].IPAddress = ip;
- _ipPool[key].Port = port;
- }
- }
- public static void Remove(string key)
- {
- _ipPool.Remove(key);
- }
- public static void SendMessage(string[] keys, MessageBody message)
- {
- sendMessage(SendType.Message, message, x => keys.Contains(x.Key));
- }
- public static void SendMessage(MessageBody message)
- {
- sendMessage(SendType.Message, message);
- }
- private static void sendMessage(SendType sendType, MessageBody message = null, Func<KeyValuePair<string, MQAddress>, bool> predicate = null)
- {
- List<KeyValuePair<string, MQAddress>> sendPool;
- if (predicate == null)
- {
- sendPool = _ipPool.ToList();
- }
- else
- {
- sendPool = _ipPool.Where(predicate).ToList();
- }
- sendPool.ForEach(x =>
- {
- socket = new WebSocket(string.Format("ws://{0}:{1}", x.Value.IPAddress, x.Value.Port));
- socket.Connect();
- //发送两种信号:心跳和消息
- if (sendType == SendType.HeartBeat)
- {
- if (!socket.Ping())
- {
- Remove(x.Key);
- }
- }
- else
- {
- if (message != null)
- {
- using (MemoryStream ms = new MemoryStream())
- {
- IFormatter formatter = new BinaryFormatter();
- formatter.Serialize(ms, message);
- socket.SendAsync(ms.GetBuffer(), (res =>
- {
- socket.Close();
- }));
- }
- }
- }
- });
- }
- private static void HeartBeat()
- {
- _ipPool.ToList().ForEach(x =>
- {
- sendMessage(SendType.HeartBeat);
- });
- }
- }
- }
|