Helios是一套高性能的Socket通信中間件,使用C#編寫。Helios的開發受到Netty的啟發,使用非阻塞的事件驅動模型架構來實現高並發高吞吐量。Helios為我們大大的簡化了Socket編程,它已經為我們處理好了高並發情況下的解包,粘包,buffer管理等等。
GitHub:https://github.com/helios-io/helios/
為避免誤會特別提示:helios不是本人作品,小弟還在努力的路上。
Takes the complexity out of socket programming with intelligent I/O, concurrency, buffer management, and pipelining APIs.
使用socket編程不再復雜。提供智能的I/O,並發,buffer管理,管道形式的API。
Helios is Reactive - it uses a event-driven architecture to simplify development and build responsive systems that scale.
Helios是反應式的,它使用事件驅動的架構來簡化開發和構建易伸縮的系統。
Performance is a cross-cutting concern we factor in at every level in the design of the framework in order to eliminate overhead for your apps and clients.
這個系統在開發和設計的時候都充分考慮到了性能,構建你的app和client的時候請消除這方面的顧慮。
Helios powers the clustering and remoting capbilities built into Akka.NET and more.
Akka.net的集群,遠程功能構建在Helios之上。
要用來演示Socket通信那麼最好的示例無非就是聊天程序了。
整個解決方案包含3個項目:
這個項目裡是一些公共的類型,新建完之後使用nuget添加helios的庫
Message 類:所有發送的消息都是通過Message包裝的,每一個消息都有一個Command跟Content來構成。
public class Message { public Command Command { get; set; } public string Content { get; set; } }
Command枚舉:用來描述消息的命令
public enum Command { Join, Send, }
MessageConverter靜態類:這個類用來轉換Message對象為Byte[],或者把Byte[]轉換成Message對象。Message對象在通過Helios傳輸的時候需要先轉成Byte[],所以我們需要自己定義包的格式。我們用Byte[]的前四位來存放Command,Content轉成Byte後從第5位開始存放。
public class MessageConverter { public static Message ToMessage(NetworkData data) { try { var commandData = data.Buffer.Take(4).ToArray(); var contentData = data.Buffer.Skip(4).Take(data.Buffer.Length - 4).ToArray(); var command = BitConverter.ToInt32(commandData,0); var content = Encoding.UTF8.GetString(contentData); return new Message() { Command = (Command)command, Content = content }; } catch (Exception exc) { Console.WriteLine("Cant convert NetworkData to Message : {0}", exc.Message); } return null; } public static byte[] ToBytes(Message message) { try { var commandBytes = BitConverter.GetBytes((int)message.Command); var messageBytes = Encoding.UTF8.GetBytes(message.Content); var bytes = new byte[commandBytes.Length + messageBytes.Length]; commandBytes.CopyTo(bytes, 0); messageBytes.CopyTo(bytes, commandBytes.Length); return bytes; } catch (Exception exc) { Console.WriteLine("Cant convert message to bytes : {0}", exc.Message); } return null; } }
不用說也知道,這是聊天室的服務端,負責連接用戶及轉發消息。
internal class Program { private static readonly ConcurrentDictionary<string, IConnection> Clients = new ConcurrentDictionary<string, IConnection>(); private static void Main(string[] args) { var host = IPAddress.Any; var port = 9991; Console.Title = "Server"; Console.WriteLine("Starting server on {0}:{1}", host, port); var serverFactory = new ServerBootstrap() .SetTransport(TransportType.Tcp) .Build(); var server = serverFactory.NewReactor(NodeBuilder.BuildNode().Host(host).WithPort(port)); server.OnConnection += (address, connection) => { Console.WriteLine("Connected: {0}", address); connection.BeginReceive(Receive); }; server.OnDisconnection += (reason, address) => Console.WriteLine("Disconnected: {0}; Reason: {1}", address.RemoteHost, reason.Type); server.Start(); Console.WriteLine("Running, press any key to exit"); Console.ReadKey(); } /// <summary> /// 處理接受到的消息 /// </summary> /// <param name="data"></param> /// <param name="channel"></param> public static void Receive(NetworkData data, IConnection channel) { var message = MessageConverter.ToMessage(data); switch (message.Command) { case Command.Join: JoinGroup(message.Content, channel); break; case Command.Send: Broadcast(message.Content); break; } } public static void JoinGroup(string clientName, IConnection channel) { if (Clients.TryAdd(clientName, channel)) { Broadcast(string.Format("{0} join group successful .", clientName)); } else { var errMsg = new Message() { Command = Command.Send, Content = "client name is used." }; SendMessage(channel, errMsg); } } /// <summary> /// 廣播消息 /// </summary> /// <param name="clientMessage"></param> public static void Broadcast(string clientMessage) { Console.WriteLine(clientMessage); var clientName = clientMessage.Split(':')[0]; var message = new Message { Command = Command.Send, Content = clientMessage }; foreach (var client in Clients) { if (client.Key != clientName) { SendMessage(client.Value, message); } } } public static void SendMessage(IConnection connection, Message message) { var messageBytes = MessageConverter.ToBytes(message); connection.Send(new NetworkData { Buffer = messageBytes, Length = messageBytes.Length }); } }
聊天服務的客戶端
internal class Program { public static IConnection Client; public static string ClientName; private static void Main(string[] args) { var host = IPAddress.Loopback; var port = 9991; var connectionFactory = new ClientBootstrap() .SetTransport(TransportType.Tcp).Build(); //New一個Client Client = connectionFactory.NewConnection(Node.Empty(), NodeBuilder.BuildNode().Host(host).WithPort(port)); Client.OnConnection += (address, connection) => { Console.WriteLine("Connect server successful."); connection.BeginReceive(Received); }; Client.OnDisconnection += (address, reason) => Console.WriteLine("Disconnected."); Console.WriteLine("Input ClientName "); ClientName = Console.ReadLine(); Console.Title = string.Format("Client {0}", ClientName); //建立連接 Client.Open(); //加入聊天組 Join(); //等待輸入 WaitInput(); } public static void WaitInput() { while (true) { var input = Console.ReadLine(); if (!string.IsNullOrEmpty(input)) { var message = MakeSendMessage(input); SendMessage(Client, message); } } } /// <summary> /// Jion chat group /// </summary> public static void Join() { var message = MakeJoinMessage(); SendMessage(Client,message); } /// <summary> /// 處理接受到的消息 /// </summary> /// <param name="data"></param> /// <param name="responseChannel"></param> public static void Received(NetworkData data, IConnection responseChannel) { var message = MessageConverter.ToMessage(data); if (message.Command == Command.Send) { Console.WriteLine(message.Content); } } /// <summary> /// 構造聊天消息 /// </summary> /// <param name="input"></param> /// <returns></returns> public static Message MakeSendMessage(string input) { return new Message { Command = Command.Send, Content = string.Format("{0}:{1}", ClientName, input) }; } /// <summary> /// 構造加入組的消息 /// </summary> /// <returns></returns> public static Message MakeJoinMessage() { var message = new Message(); message.Command = Command.Join; message.Content = ClientName; return message; } public static void SendMessage(IConnection connection, Message message) { var messageBytes = MessageConverter.ToBytes(message); connection.Send(new NetworkData { Buffer = messageBytes, Length = messageBytes.Length }); } }
這樣一個簡單的聊天室程序就完成了。
helios 1.0的異步編程模型是基於APM的,從helios 2.0開始會改成SocketAsyncEventArgs方式來實現異步。SocketAsyncEventArgs底層封裝了IOCP,IOCP是Windows server上Socket通訊性能最高的技術,使用了IOCP的helios 2.0勢必具有更高的性能,所以對於helios 2.0還是非常期待的。
示例下載:http://files.cnblogs.com/files/kklldog/HeliosChat.7z