FastSocket這個東西上次我已經說過,它使用簡單,功能強大,擴展靈活,目前在新浪的生產環境中已經被廣泛使用,所以它的性能,安全等各方面我們絕對可以信賴,今天我們來說一個話題,和上一講有關,這次我們制作一個基於FastSocket的傳輸協議,它的意義重大,當fastSocket提供的協議不能滿足項目要求時,我們就必須硬著頭皮去自己寫了,還好,fastsocket為我們鋪好了路,我們只要按著這條路走下去,就可以了。
首先,如果要想擴展一個自己的協議,要對 client和server端分別進行開發,下面我們來看一下client的開發
我們要添加的類有三個文件組成,分別是DSSBinaryProtocol,DSSBinaryResponse和一個使用這個協議的客戶端入口DSSBinarySocketClient
DSSBinaryProtocol
/// <summary> /// 異步二進制協議 /// 協議格式 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// 其中參數TableName和VersonNumber長度為40,不夠自動在左側補空格 /// </summary> public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryResponse> { #region IProtocol Members /// <summary> /// find response /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryResponse FindResponse(IConnection connection, ArraySegment<byte> buffer, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } //獲取message length var messageLength = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 8); var flagLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 10); var versonLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14, flagLength); var versonNumber = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14 + flagLength, versonLength); var dataLength = messageLength - 10 - flagLength - versonLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(buffer.Array, buffer.Offset + 14 + flagLength + versonLength, data, 0, dataLength); } return new DSSBinaryResponse(seqID, projectID, strName, versonNumber, data); } #endregion } View CodeDSSBinaryResponse
/// <summary> /// 數據同步系統DSS使用的Socket協議,我們稱為DSSBinary協議 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// </summary> public class DSSBinaryResponse : IResponse { /// <summary> /// 流水ID /// </summary> public int SeqID { get; private set; } /// <summary> /// 項目類型編號 /// </summary> public short ProjectID { get; set; } /// <summary> /// 本次傳輸的版本號,所有客戶端唯一[項目名稱(4字節)+guid(36字節)] /// </summary> public string VersonNumber { get; private set; } /// <summary> /// 命令名稱 /// </summary> public string Flag { get; private set; } /// <summary> /// 要操作的表對象,以字節數組形式進行傳輸 /// </summary> public readonly byte[] Buffer = null; public DSSBinaryResponse(int seqID, short projectID, string flag, string versonNumber, byte[] buffer) { this.SeqID = seqID; this.ProjectID = projectID; this.VersonNumber = versonNumber; this.Flag = flag; this.Buffer = buffer; } } View CodeDSSBinarySocketClient
/// <summary> /// 異步socket客戶端 /// </summary> public class DSSBinarySocketClient : PooledSocketClient<DSSBinaryResponse> { #region Constructors /// <summary> /// new /// </summary> public DSSBinarySocketClient() : base(new DSSBinaryProtocol()) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, 3000, 3000) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> /// <param name="millisecondsSendTimeout"></param> /// <param name="millisecondsReceiveTimeout"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize, int millisecondsSendTimeout, int millisecondsReceiveTimeout) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, millisecondsSendTimeout, millisecondsReceiveTimeout) { } #endregion #region Public Methods public Task<TResult> Send<TResult>(string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { return this.Send(null, cmdName, projectID, versonNumber, payload, funcResultFactory, asyncState); } public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory"); var seqID = base.NextRequestSeqID(); var cmdLength = cmdName.Length; var versonNumberLength = versonNumber.Length; var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + versonNumberLength + 10; var sendBuffer = new byte[messageLength + 4]; //write message length Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4); //write seqID. Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4); //write proejctID Buffer.BlockCopy(NetworkBitConverter.GetBytes(projectID), 0, sendBuffer, 8, 2); //write response flag length. Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 10, 2); //write verson length Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)versonNumberLength), 0, sendBuffer, 12, 2); //write response cmd Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 14, cmdLength); //write response versonNumber Buffer.BlockCopy(Encoding.ASCII.GetBytes(versonNumber), 0, sendBuffer, 14 + cmdLength, versonNumberLength); //write body buffer if (payload != null && payload.Length > 0) Buffer.BlockCopy(payload, 0, sendBuffer, 14 + cmdLength + versonNumberLength, payload.Length); var source = new TaskCompletionSource<TResult>(asyncState); base.Send(new Request<DSSBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer, ex => source.TrySetException(ex), response => { TResult result; try { result = funcResultFactory(response); } catch (Exception ex) { source.TrySetException(ex); return; } source.TrySetResult(result); })); return source.Task; } #endregion } View Code然後,我們再來說一下server端的開發,它有兩個文件組成,分別是DSSBinaryCommandInfo,DSSBinaryProtocol
DSSBinaryCommandInfo
/// <summary> /// async binary command info. /// </summary> public class DSSBinaryCommandInfo : ICommandInfo { #region Constructors /// <summary> /// new /// </summary> /// <param name="cmdName"></param> /// <param name="seqID"></param> /// <param name="buffer"></param> /// <exception cref="ArgumentNullException">cmdName is null or empty.</exception> public DSSBinaryCommandInfo(int seqID, short projectID, string cmdName, string versonNumber, byte[] buffer) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (string.IsNullOrEmpty(versonNumber)) throw new ArgumentNullException("versonNumber"); this.VersonNumber = versonNumber; this.CmdName = cmdName; this.SeqID = seqID; this.ProjectID = projectID; this.Buffer = buffer; } #endregion #region Public Properties /// <summary> /// 版本號 /// </summary> public string VersonNumber { get; private set; } public short ProjectID { get; private set; } /// <summary> /// get the current command name. /// </summary> public string CmdName { get; private set; } /// <summary> /// seq id. /// </summary> public int SeqID { get; private set; } /// <summary> /// 主體內容 /// </summary> public byte[] Buffer { get; private set; } #endregion #region Public Methods /// <summary> /// reply /// </summary> /// <param name="connection"></param> /// <param name="payload"></param> public void Reply(IConnection connection, byte[] payload) { var packet = PacketBuilder.ToDSSBinary(this.SeqID, this.ProjectID, this.CmdName, this.VersonNumber, payload); connection.BeginSend(packet); } #endregion } View CodeDSSBinaryProtocol
/// <summary> /// 數據中心二進制協議 /// 協議格式 /// [Message Length(int32)][SeqID(int32)][Request|Response Flag Length(int16)][VersonNumber Length(int16)][Request|Response Flag + VersonNumber + Body Buffer] /// </summary> public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryCommandInfo> { #region IProtocol Members /// <summary> /// find command /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="maxMessageSize"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryCommandInfo FindCommandInfo(IConnection connection, ArraySegment<byte> buffer, int maxMessageSize, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } var payload = buffer.Array; //獲取message length var messageLength = NetworkBitConverter.ToInt32(payload, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); if (messageLength > maxMessageSize) throw new BadProtocolException("message is too long"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(payload, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(payload, buffer.Offset + 8); var cmdNameLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 10); var versonNumberLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(payload, buffer.Offset + 14, cmdNameLength); var versonNumber = Encoding.UTF8.GetString(payload, buffer.Offset + 14 + cmdNameLength, versonNumberLength); var dataLength = messageLength - 8 - cmdNameLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(payload, buffer.Offset + 14 + cmdNameLength + versonNumberLength, data, 0, dataLength); } return new DSSBinaryCommandInfo(seqID, projectID, strName, versonNumber, data); } #endregion } View Code除了上面兩個文件外,我們還要修改服務端的管理類
/// <summary> /// Socket server manager. /// </summary> public class SocketServerManager { #region Private Members static private readonly List<SocketBase.IHost> _listHosts = new List<SocketBase.IHost>(); #endregion #region Static Methods /// <summary> /// 初始化Socket Server /// </summary> static public void Init() { Init("socketServer"); } /// <summary> /// 初始化Socket Server /// </summary> /// <param name="sectionName"></param> static public void Init(string sectionName) { if (string.IsNullOrEmpty(sectionName)) throw new ArgumentNullException("sectionName"); Init(ConfigurationManager.GetSection(sectionName) as Config.SocketServerConfig); } /// <summary> /// 初始化Socket Server /// </summary> /// <param name="config"></param> static public void Init(Config.SocketServerConfig config) { if (config == null) throw new ArgumentNullException("config"); if (config.Servers == null) return; foreach (Config.Server serverConfig in config.Servers) { //inti protocol var objProtocol = GetProtocol(serverConfig.Protocol); if (objProtocol == null) throw new InvalidOperationException("protocol"); //init custom service var tService = Type.GetType(serverConfig.ServiceType, false); if (tService == null) throw new InvalidOperationException("serviceType"); var serviceFace = tService.GetInterface(typeof(ISocketService<>).Name); if (serviceFace == null) throw new InvalidOperationException("serviceType"); var objService = Activator.CreateInstance(tService); if (objService == null) throw new InvalidOperationException("serviceType"); //init host. var host = Activator.CreateInstance(typeof(SocketServer<>).MakeGenericType( serviceFace.GetGenericArguments()), objService, objProtocol, serverConfig.SocketBufferSize, serverConfig.MessageBufferSize, serverConfig.MaxMessageSize, serverConfig.MaxConnections) as BaseSocketServer; host.AddListener(serverConfig.Name, new IPEndPoint(IPAddress.Any, serverConfig.Port)); _listHosts.Add(host); } } /// <summary> /// get protocol. /// </summary> /// <param name="protocol"></param> /// <returns></returns> static public object GetProtocol(string protocol) { switch (protocol) { case Protocol.ProtocolNames.AsyncBinary: return new Protocol.AsyncBinaryProtocol(); case Protocol.ProtocolNames.Thrift: return new Protocol.ThriftProtocol(); case Protocol.ProtocolNames.CommandLine: return new Protocol.CommandLineProtocol(); case Protocol.ProtocolNames.DSSBinary: return new Protocol.DSSBinaryProtocol(); } return Activator.CreateInstance(Type.GetType(protocol, false)); } /// <summary> /// 啟動服務 /// </summary> static public void Start() { foreach (var server in _listHosts) server.Start(); } /// <summary> /// 停止服務 /// </summary> static public void Stop() { foreach (var server in _listHosts) server.Stop(); } #endregion }
從上面的代碼中,我們看到了自己新加的協議DSSBinary,我們可以在配置文件中對它進行配置,方法和之前說的一樣,在這裡就不再重復了。
感謝各位的閱讀!