在做c#中面向無連接的傳輸時用到了UDP,雖然沒有TCP穩定可靠。但是效率是要高些,優勢也有,缺點也有
就是有的時候要丟包,有的時候不得不用UDP,但是如何才能比較穩定的實現可靠傳輸呢,這是一個問題。
TCP傳輸數據的時候沒有大小限制,但是UDP傳輸的時候是有大小限制的,我們怎麼才能夠實現大數據的穩定傳輸呢。我們想到了,把數據包分包。
把一個大數據分割為一系列的小數據包然後分開發送,然後服務端收到了就拼湊起完整數據。
如果遇到中途丟包就重發。
UDP線程類,實現數據的分包發送和重發。具體的接收操作需要實現其中的事件
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using Model; using System.Net; using Tool; using System.Threading; namespace ZZUdp.Core { //udp的類 public class UDPThread { #region 私有變量 UdpClient client;//UDP客戶端 Listsendlist;// 用於輪詢是否發送成功的記錄 Dictionary RecListDic = new Dictionary ();//數據接收列表,每一個sequence對應一個 IPEndPoint remotIpEnd = null;//用來在接收數據的時候對遠程主機的信息存放 int port=6666;//定義服務器的端口號 #endregion #region 屬性 public int CheckQueueTimeInterval { get; set; }//檢查發送隊列間隔 public int MaxResendTimes { get; set; }//沒有收到確認包時,最大重新發送的數目,超過此數目會丟棄並觸發PackageSendFailture事件 #endregion #region 事件 /// /// 當數據包收到時觸發 /// public event EventHandlerPackageReceived; /// /// 當數據包收到事件觸發時,被調用 /// /// 包含事件的參數 protected virtual void OnPackageReceived(PackageEventArgs e) { if (PackageReceived != null) PackageReceived(this, e); } ////// 數據包發送失敗 /// public event EventHandlerPackageSendFailure; /// /// 當數據發送失敗時調用 /// /// 包含事件的參數 protected virtual void OnPackageSendFailure(PackageEventArgs e) { if (PackageSendFailure != null) PackageSendFailure(this, e); } ////// 數據包未接收到確認,重新發送 /// public event EventHandlerPackageResend; /// /// 觸發重新發送事件 /// /// 包含事件的參數 protected virtual void OnPackageResend(PackageEventArgs e) { if (PackageResend != null) PackageResend(this, e); } #endregion //無參構造函數 public UDPThread() { } //構造函數 public UDPThread(string ipaddress, int port) { IPAddress ipA = IPAddress.Parse(ipaddress);//構造遠程連接的參數 IPEndPoint ipEnd = new IPEndPoint(ipA, port); client = new UdpClient();// client = new UdpClient(ipEnd)這樣的話就沒有創建遠程連接 client.Connect(ipEnd);//使用指定的遠程主機信息建立默認遠程主機連接 sendlist = new List(); CheckQueueTimeInterval = 2000;//輪詢間隔時間 MaxResendTimes = 5;//最大發送次數 new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//啟動輪詢線程 //開始監聽數據 AsyncReceiveData(); } /// /// 同步數據接收方法 /// public void ReceiveData() { while (true) { IPEndPoint retip = null; UdpPacket udpp = null; try { byte[] data = client.Receive(ref retip);//接收數據,當Client端連接主機的時候,retip就變成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //異常處理操作 } if (udpp != null) { PackageEventArgs arg = new PackageEventArgs(udpp, retip); OnPackageReceived(arg);//數據包收到觸發事件 } } } //異步接受數據 public void AsyncReceiveData() { try { client.BeginReceive(new AsyncCallback(ReceiveCallback), null); } catch (SocketException ex) { throw ex; } } //接收數據的回調函數 public void ReceiveCallback(IAsyncResult param) { if (param.IsCompleted) { UdpPacket udpp = null; try { byte[] data = client.EndReceive(param, ref remotIpEnd);//接收數據,當Client端連接主機的時候,test就變成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //異常處理操作 } finally { AsyncReceiveData(); } if (udpp != null)//觸發數據包收到事件 { PackageEventArgs arg = new PackageEventArgs(udpp, null); OnPackageReceived(arg); } } } ////// 同步發送分包數據 /// /// public void SendData(Msg message) { ICollectionudpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步發送 client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip); if (udpPacket.IsRequireReceiveCheck) PushSendItemToList(udpPacket);//將該消息壓入列表 } } /// /// 異步分包發送數組的方法 /// /// public void AsyncSendData(Msg message) { ICollectionudpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步發送 //client.Send(udpPacketDatagram, udpPacketDatagram.Length); //使用異步的方法發送數據 this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null); } } //發送完成後的回調方法 public void SendCallback(IAsyncResult param) { if (param.IsCompleted) { try { client.EndSend(param);//這句話必須得寫,BeginSend()和EndSend()是成對出現的 } catch (Exception e) { //其他處理異常的操作 } } } static object lockObj = new object(); /// /// 自由線程,檢測未發送的數據並發出,存在其中的就是沒有收到確認包的數據包 /// void CheckUnConfirmedQueue() { do { if (sendlist.Count > 0) { UdpPacket[] array = null; lock (sendlist) { array = sendlist.ToArray(); } //挨個重新發送並計數 Array.ForEach(array, s => { s.sendtimes++; if (s.sendtimes >= MaxResendTimes) { //sOnPackageSendFailure//出發發送失敗事件 sendlist.Remove(s);//移除該包 } else { //重新發送 byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s); client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip); } }); } Thread.Sleep(CheckQueueTimeInterval);//間隔一定時間重發數據 } while (true); } ////// 將數據信息壓入列表 /// /// void PushSendItemToList(UdpPacket item) { sendlist.Add(item); } ////// 將數據包從列表中移除 /// /// 數據包編號 /// 數據包分包索引 public void PopSendItemFromList(long packageNo, int packageIndex) { lock (lockObj) { Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s)); } } ////// 關閉客戶端並釋放資源 /// public void Dispose() { if (client != null) { client.Close(); client = null; } } } }
首先是數據信息實體類
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; namespace Model { //封裝消息類 [Serializable] public class Msg { //所屬用戶的用戶名 public string name { get; set; } //所屬用戶的ip public string host { get; set; } //命令的名稱 public string command { get; set; } //收信人的姓名 public string desname { get; set; } //你所發送的消息的目的地ip,應該是對應在服務器的列表裡的主鍵值 public string destinationIP { get; set; } //端口號 public int port { get; set; } //文本消息 public string msg { get; set; } //二進制消息 public byte[] byte_msg { get; set; } //附加數據 public string extend_msg { get; set; } //時間戳 public DateTime time { get; set; } //構造函數 public Msg(string command,string desip,string msg,string host) { this.command = command; this.destinationIP = desip; this.msg = msg; this.time = DateTime.Now; this.host = host; } override public string ToString() { return name + "說:" + msg; } } }
分包實體類
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using System.Net; namespace Model { [Serializable] public class UdpPacket { public long sequence{get;set;}//所屬組的唯一序列號 包編號 public int total { get; set; }//分包總數 public int index { get; set; }//消息包的索引 public byte[] data { get; set; }//包的內容數組 public int dataLength { get; set; }//分割的數組包大小 public int remainder { get; set; }//最後剩余的數組的數據長度 public int sendtimes { get; set; }//發送次數 public IPEndPoint remoteip { get; set; }//接受該包的遠程地址 public bool IsRequireReceiveCheck { get; set; }//獲得或設置包收到時是否需要返回確認包 public static int HeaderSize = 30000; public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port) { this.sequence = sequence; this.total = total; this.index = index; this.data = data; this.dataLength = dataLength; this.remainder = remainder; this.IsRequireReceiveCheck = true;//默認都需要確認包 //構造遠程地址 IPAddress ipA = IPAddress.Parse(desip); this.remoteip = new IPEndPoint(ipA, port); } //把這個對象生成byte[] public byte[] ToArray() { return SerializationUnit.SerializeObject(this); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; namespace Model { ////// UDP數據包分割器 /// public static class UdpPacketSplitter { public static ICollectionSplit(Msg message) { byte[] datagram = null; try { datagram = SerializationUnit.SerializeObject(message); } catch (Exception e) { //AddTalkMessage("數據轉型異常"); } //產生一個序列號,用來標識包數據屬於哪一組 Random Rd = new Random(); long SequenceNumber = Rd.Next(88888, 999999); ICollection udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port); return udpPackets; } /// /// 分割UDP數據包 /// /// UDP數據包所持有的序號 /// 被分割的UDP數據包 /// 分割塊的長度 ////// 分割後的UDP數據包列表 /// public static ICollectionSplit(long sequence, byte[] datagram, int chunkLength,string desip,int port) { if (datagram == null) throw new ArgumentNullException("datagram"); List packets = new List (); int chunks = datagram.Length / chunkLength; int remainder = datagram.Length % chunkLength; int total = chunks; if (remainder > 0) total++; for (int i = 1; i <= chunks; i++) { byte[] chunk = new byte[chunkLength]; Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength); packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port)); } if (remainder > 0) { int length = datagram.Length - (chunkLength * chunks); byte[] chunk = new byte[length]; Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length); packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port)); } return packets; } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using Model; namespace Model { //一個sequence對應一組的數據包的數據結構 public class RecDataList { public long sequence { get; set; }//序列號 //對應的存儲包的List ListRecudpPackets = new List (); public int total { get; set; } public int dataLength { get; set; } public int remainder { get; set; } public byte[] DataBuffer = null; public RecDataList(UdpPacket udp) { this.sequence = udp.sequence; this.total = udp.total; this.dataLength = udp.dataLength; this.remainder = udp.remainder; if (DataBuffer == null) { DataBuffer = new byte[dataLength * (total - 1) + remainder]; } } public RecDataList(long sequence, int total, int chunkLength, int remainder) { this.sequence = sequence; this.total = total; this.dataLength = chunkLength; this.remainder = remainder; if (DataBuffer == null) { DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder]; } } public void addPacket(UdpPacket p) { RecudpPackets.Add(p); } public Msg show() { if (RecudpPackets.Count == total)//表示已經收集滿了 { //重組數據 foreach (UdpPacket udpPacket in RecudpPackets) { //偏移量 int offset = (udpPacket.index - 1) * udpPacket.dataLength; Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length); } Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer); DataBuffer = null; RecudpPackets.Clear(); return rmsg; } else { return null; } } public bool containskey(UdpPacket udp) { foreach (UdpPacket udpPacket in RecudpPackets) { if (udpPacket.index == udp.index) return true; } return false; } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class EncodingTool { //編碼 public static byte[] EncodingASCII(string buf) { byte[] data = Encoding.Unicode.GetBytes(buf); return data; } //解碼 public static string DecodingASCII(byte[] bt) { string st = Encoding.Unicode.GetString(bt); return st; } //編碼 public static byte[] EncodingUTF_8(string buf) { byte[] data = Encoding.UTF8.GetBytes(buf); return data; } //編碼 public static string DecodingUTF_8(byte[] bt) { string st = Encoding.UTF8.GetString(bt); return st; } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class SerializationUnit { ////// 把對象序列化為字節數組 /// public static byte[] SerializeObject(object obj) { if (obj == null) return null; //內存實例 MemoryStream ms = new MemoryStream(); //創建序列化的實例 BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, obj);//序列化對象,寫入ms流中 ms.Position = 0; //byte[] bytes = new byte[ms.Length];//這個有錯誤 byte[] bytes = ms.GetBuffer(); ms.Read(bytes, 0, bytes.Length); ms.Close(); return bytes; } ////// 把字節數組反序列化成對象 /// public static object DeserializeObject(byte[] bytes) { object obj = null; if (bytes == null) return obj; //利用傳來的byte[]創建一個內存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); obj = formatter.Deserialize(ms);//把內存流反序列成對象 ms.Close(); return obj; } ////// 把字典序列化 /// /// ///public static byte[] SerializeDic(Dictionary dic) { if (dic.Count == 0) return null; MemoryStream ms = new MemoryStream(); BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, dic);//把字典序列化成流 byte[] bytes = new byte[ms.Length];//從流中讀出byte[] ms.Read(bytes, 0, bytes.Length); return bytes; } /// /// 反序列化返回字典 /// /// ///public static Dictionary DeserializeDic(byte[] bytes) { Dictionary dic = null; if (bytes == null) return dic; //利用傳來的byte[]創建一個內存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); //把流中轉換為Dictionary dic = (Dictionary )formatter.Deserialize(ms); return dic; } } }
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using Model; namespace ZZUdp.Core { ////// 數據包事件數據 /// public class PackageEventArgs : EventArgs { ////// 網絡消息包 /// public UdpPacket udpPackage { get; set; } ////// 網絡消息包組 /// public UdpPacket[] udpPackages { get; set; } ////// 遠程IP /// public IPEndPoint RemoteIP { get; set; } ////// 是否已經處理 /// public bool IsHandled { get; set; } ////// 創建一個新的 PackageEventArgs 對象. /// public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP) { this.udpPackage = package; this.RemoteIP = RemoteIP; this.IsHandled = false; } } }