程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> C# Socket系列三 socket通信的封包和拆包,

C# Socket系列三 socket通信的封包和拆包,

編輯:C#入門知識

C# Socket系列三 socket通信的封包和拆包,


通過系列二 我們已經實現了socket的簡單通信 接下來我們測試一下,在時間應用的場景下,我們會快速且大量的傳輸數據的情況!

 1    class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             TCPListener tcp = new TCPListener();
 6             TSocketClient client = new TSocketClient();
 7             for (int i = 0; i < 10; i++)
 8             {
 9                 client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));
10             }
11             Console.ReadLine();
12         }
13     }

我們通過測試代碼快速發送10條消息到服務器去,

我們看看運行結果

這樣不難看出,我們的客戶端發送了10條消息,但是服務器收到的時候變成了兩條消息,回復客戶端自然就變成兩次回復。

這是為什麼呢?

我們修改一下程序一秒鐘發送一次消息試試

 1    class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             TCPListener tcp = new TCPListener();
 6             TSocketClient client = new TSocketClient();
 7             for (int i = 0; i < 5; i++)
 8             {
 9                 Thread.Sleep(1000);
10                 client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));
11             }
12             Console.ReadLine();
13         }
14     }

運行看看,

這次對了那麼分析分析到底為什麼呢?

這是socket的底層,做的手腳。因為我設置socket的發送和接受緩沖區

//10K的緩沖區空間
private int BufferSize = 10 * 1024;

10k的緩沖區,

且socket的底層 發送消息會有一定間隙,雖然這個時間很短,但是我們直接for循環發送的話,時間同意很快,

因為socket.send()方法並非真實的發送數據而是把數據壓入發送緩沖區。

那麼我們就明白了為什麼會出現上面的情況

出現了這樣的情況我們要怎麼解決呢?

時間應用場景不可能1秒鐘才一條消息啥。

我們知道了導致這個問題的原因是因為消息發送是出現了快速壓入很多發送消息到待發送緩沖區裡面一起發送導致的。這樣情況就是粘包了,那麼我們是不是可以考慮給每一個消息加入包標識呢?

接下來我們修改一下發送包的數據代碼

創建消息的構造體 TSocketMessage

 1  /// <summary>
 2     /// 底層通信消息
 3     /// </summary>
 4     public class TSocketMessage : IDisposable
 5     {
 6         /// <summary>
 7         /// 消息ID
 8         /// </summary>
 9         public int MsgID;
10         /// <summary>
11         /// 消息內容
12         /// </summary>
13         public byte[] MsgBuffer;
14 
15         public TSocketMessage(int msgID, byte[] msg)
16         {
17             this.MsgID = msgID;
18             this.MsgBuffer = msg;
19         }
20 
21         public void Dispose()
22         {
23             this.Dispose(true);
24             GC.SuppressFinalize(this);
25         }
26 
27         protected virtual void Dispose(bool flag1)
28         {
29             if (flag1) { this.MsgBuffer = null; }
30         }
31     }

 

接下來我們創建消息包的封裝和拆分 MarshalEndian

  1  public class MarshalEndian
  2     {
  3         //用於存儲剩余未解析的字節數
  4         private List<byte> _LBuff = new List<byte>(2);
  5         //默認是utf8的編碼格式
  6         private UTF8Encoding utf8 = new UTF8Encoding();
  7 
  8         //包頭1
  9         const Int16 t1 = 0x55;
 10         //包頭2
 11         const Int16 t2 = 0xAA;
 12         //字節數常量 兩個包頭4個字節,一個消息id4個字節,封裝消息長度 long 8個字節
 13         const long ConstLenght = 12L;
 14 
 15         public void Dispose()
 16         {
 17             this.Dispose(true);
 18             GC.SuppressFinalize(this);
 19         }
 20 
 21         protected virtual void Dispose(bool flag1)
 22         {
 23             if (flag1)
 24             {
 25                 IDisposable disposable2 = this.utf8 as IDisposable;
 26                 if (disposable2 != null) { disposable2.Dispose(); }
 27                 IDisposable disposable = this._LBuff as IDisposable;
 28                 if (disposable != null) { disposable.Dispose(); }
 29             }
 30         }
 31 
 32         public byte[] Encode(TSocketMessage msg)
 33         {
 34             MemoryStream ms = new MemoryStream();
 35             BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding());
 36             byte[] msgBuffer = msg.MsgBuffer;
 37 
 38             #region 封裝包頭
 39             bw.Write((Int16)t1);
 40             bw.Write((Int16)t2);
 41             #endregion
 42 
 43             #region 包協議
 44             if (msgBuffer != null)
 45             {
 46                 bw.Write((Int64)(msgBuffer.Length + 4));
 47                 bw.Write(msg.MsgID);
 48                 bw.Write(msgBuffer);
 49             }
 50             else { bw.Write((Int64)0); }
 51             #endregion
 52 
 53             bw.Close();
 54             ms.Close();
 55             bw.Dispose();
 56             ms.Dispose();
 57             return ms.ToArray();
 58         }
 59 
 60         public List<TSocketMessage> GetDcAppMess(byte[] buff, int len)
 61         {
 62             //拷貝本次的有效字節
 63             byte[] _b = new byte[len];
 64             Array.Copy(buff, 0, _b, 0, _b.Length);
 65             buff = _b;
 66             if (this._LBuff.Count > 0)
 67             {
 68                 //拷貝之前遺留的字節
 69                 this._LBuff.AddRange(_b);
 70                 buff = this._LBuff.ToArray();
 71                 this._LBuff.Clear();
 72                 this._LBuff = new List<byte>(2);
 73             }
 74 
 75             List<TSocketMessage> list = new List<TSocketMessage>();
 76             MemoryStream ms = new MemoryStream(buff);
 77             BinaryReader buffers = new BinaryReader(ms, this.utf8);
 78             try
 79             {
 80                 byte[] _buff;
 81             Label_0073:
 82                 //判斷本次解析的字節是否滿足常量字節數 
 83                 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)
 84                 {
 85                     _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position)];
 86                     Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
 87                     this._LBuff.AddRange(_buff);
 88                     return list;
 89                 }
 90                 #region 包頭讀取
 91             //循環讀取包頭
 92             Label_00983:
 93                 Int16 tt1 = buffers.ReadInt16();
 94                 Int16 tt2 = buffers.ReadInt16();
 95                 if (!(tt1 == t1 && tt2 == t2))
 96                 {
 97                     long ttttt = buffers.BaseStream.Seek(-3, SeekOrigin.Current);
 98                     goto Label_00983;
 99                 }
100                 #endregion
101 
102                 #region 包協議
103                 long offset = buffers.ReadInt64();
104                 #endregion
105 
106                 #region 包解析
107                 //剩余字節數大於本次需要讀取的字節數
108                 if (offset < (buffers.BaseStream.Length - buffers.BaseStream.Position))
109                 {
110                     int msgID = buffers.ReadInt32();
111                     _buff = new byte[offset - 4];
112                     Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
113                     list.Add(new TSocketMessage(msgID, _buff));
114                     //設置偏移量 然後繼續循環讀取
115                     buffers.BaseStream.Seek(offset, SeekOrigin.Current);
116                     goto Label_0073;
117                 }
118                 else if (offset == (buffers.BaseStream.Length - buffers.BaseStream.Position))
119                 {
120                     int msgID = buffers.ReadInt32();
121                     //剩余字節數剛好等於本次讀取的字節數
122                     _buff = new byte[offset - 4];
123                     Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
124                     list.Add(new TSocketMessage(msgID, _buff));
125                 }
126                 else
127                 {
128                     //剩余字節數剛好小於本次讀取的字節數 存起來,等待接受剩余字節數一起解析
129                     _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght)];
130                     Array.Copy(buff, (int)(buffers.BaseStream.Position - ConstLenght), _buff, 0, _buff.Length);
131                     buff = _buff;
132                     this._LBuff.AddRange(_buff);
133                 }
134                 #endregion
135 
136             }
137             catch { }
138             finally
139             {
140                 if (buffers != null) { buffers.Dispose(); }
141                 buffers.Close();
142                 if (buffers != null) { buffers.Dispose(); }
143                 ms.Close();
144                 if (ms != null) { ms.Dispose(); }
145             }
146             return list;
147         }
148     }

 

接下來我們修改一下 TSocketBase 的 抽象方法  

1  public abstract void Receive(TSocketMessage msg);

 

在修改接受消息回調函數

 1   /// <summary>
 2         /// 消息解析器
 3         /// </summary>
 4         MarshalEndian mersha = new MarshalEndian();
 5 
 6         /// <summary>
 7         /// 接收消息回調函數
 8         /// </summary>
 9         /// <param name="iar"></param>
10         private void ReceiveCallback(IAsyncResult iar)
11         {
12             if (!this.IsDispose)
13             {
14                 try
15                 {
16                     //接受消息
17                     ReceiveSize = _Socket.EndReceive(iar, out ReceiveError);
18                     //檢查狀態碼
19                     if (!CheckSocketError(ReceiveError) && SocketError.Success == ReceiveError)
20                     {
21                         //判斷接受的字節數
22                         if (ReceiveSize > 0)
23                         {
24                             byte[] rbuff = new byte[ReceiveSize];
25                             Array.Copy(this.Buffers, rbuff, ReceiveSize);
26                             var msgs = mersha.GetDcAppMess(rbuff, ReceiveSize);
27                             foreach (var msg in msgs)
28                             {
29                                 this.Receive(msg);
30                             }
31                             //重置連續收到空字節數
32                             ZeroCount = 0;
33                             //繼續開始異步接受消息
34                             ReceiveAsync();
35                         }
36                         else
37                         {
38                             ZeroCount++;
39                             if (ZeroCount == 5) { this.Close("錯誤鏈接"); }
40                         }
41                     }
42                 }
43                 catch (System.Net.Sockets.SocketException) { this.Close("鏈接已經被關閉"); }
44                 catch (System.ObjectDisposedException) { this.Close("鏈接已經被關閉"); }
45             }
46         }

這樣我們完成了在收到消息後對數據包的解析。

修改一下TSocketClient的 Receive 重寫方法

 1         /// <summary>
 2         /// 收到消息後
 3         /// </summary>
 4         /// <param name="rbuff"></param>
 5         public override void Receive(TSocketMessage msg)
 6         {
 7             Console.WriteLine("Receive ID:" + msg.MsgID + " Msg:" + System.Text.UTF8Encoding.Default.GetString(msg.MsgBuffer));
 8             if (isServer)
 9             {
10                 this.SendMsg(new TSocketMessage(msg.MsgID, System.Text.UTF8Encoding.Default.GetBytes("Holle Client!")));
11             }
12         }

修改測試代碼如下

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             TCPListener tcp = new TCPListener();
 6             TSocketClient client = new TSocketClient();
 7             for (int i = 1; i < 5; i++)
 8             {
 9                 Thread.Sleep(1000);
10                 client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));
11             }
12             Console.ReadLine();
13         }
14     }

運行結果

 

接受成功了,那麼我們取消暫停狀態,快速發送消息試試

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             TCPListener tcp = new TCPListener();
 6             TSocketClient client = new TSocketClient();
 7             for (int i = 1; i < 5; i++)
 8             {
 9                 client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));
10             }
11             Console.ReadLine();
12         }
13     }

看看運行結果

 

瞬間完成了消息發送,也沒有再出現第一次運行的那樣~!

這樣完美的解決了socket通信 在傳輸上發送粘包問題

 

下載程序完整代碼

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved