程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> 關於C# >> c#實現的P2P網絡通訊程序

c#實現的P2P網絡通訊程序

編輯:關於C#

在網上看了很多程序(QQ、Azureus、Ants、PPStream)都實現了p2p,以前覺得技術很高深。通過這 段時間的學習才發現,單純的實現p2p在局域網通訊很容易,但是要實現外網穿透(NAT)感覺很困難。最近 看了Azureus和emule源碼,分別是JAVA和C++版,本人對這兩門語言都不熟悉,看起來很吃力。最後只好 根據VC++實現的P2PDemo程序進行了改版,根據設計思路用c#寫了一個Demo出來。通過測試,多個客戶端 在局域網能脫離服務端實現端到端工作。外網的情況要通過路由器,看了Azureus要實現uPnp進行端口映 射,在CodeProject上下載了一個uPnp源碼看,測試結果沒有啟用uPnp路由器。結果現在郁悶了,不知道 下一步怎麼測試,是不是用upnp實現了端口自動映射成功就能實現象QQ那樣通訊。

下面是程序說明:

1、公共類

公共類主要定義一些包結構

a、Packet.cs

[Serializable()]
    public abstract class Packet
    {
        /// <summary>
        /// 命令類型
        /// </summary>
        /// <returns></returns>
        public virtual int GetCommandType()
        {
            return -1;
        }

        /// <summary>
        /// 用戶名
        /// </summary>
        public string UserName
        {
            get;
            set;
        }

        public Packet()
        { }

        public Packet(string username)
        {
            this.UserName = username;
        }
    }

b、MassTextPacket.cs  --分片傳輸類

 [Serializable()]
    public class MassTextPacket:TextPacket
    {
        private int seqID;
        /// <summary>
        /// 包序列
        /// </summary>
        public int SeqID
        {
            get { return seqID; }
            set { seqID = value; }
        }

        private int seqCount;
        /// <summary>
        /// 包數量
        /// </summary>
        public int SeqCount
        {
            get { return seqCount; }
            set { seqCount = value; }
        }

        private int _CLSD;
        public int CLSD
        {
            get { return _CLSD; }
            set { _CLSD = value; }
        }

    }

2、客戶端

a、消息傳送時進行p2p通訊

        private bool SendMessageTo(string toUserName, Packet packet)
        {
            PeerEntity toUser = userList.Single(c => c.UserName == toUserName);
            if (toUser == null)
            {
                return false;
            }
            ReceivedACK = false;

            for (int i=0; i<MAXRETRY; i++)
            {
                // 如果對方P2P地址不為0,就試圖以它為目的地址發送數據,
                // 如果發送失敗,則認為此P2P地址無效
                if (toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)
                {
                    if (packet.GetType() == typeof(TextPacket))
                    {
                        TextPacket msgPacket = new TextPacket (toUserName, (packet as TextPacket).Message);
                        byte[] buffer = UtilityHelper.Serialize (msgPacket);
                        if (buffer.Length > MAXBUFFERSIZE)
                        {

                            MassTextPacket mtp = new MassTextPacket();
                            mtp.SeqID = 0;
                            mtp.SeqCount = (int) System.Math.Ceiling(buffer.Length / (decimal)MAXBUFFERSIZE);
                            mtp.CLSD = mtp.GetHashCode ();

                            long pos = 0;
                            long count = buffer.Length < MAXBUFFERSIZE ? buffer.Length : MAXBUFFERSIZE;
                            while (pos < buffer.Length && pos > 0)
                            {
                                byte[] bytes = new byte [count];                          ;
                                for (int k = 0; k < count; k++)
                                    bytes[k] = buffer [pos + k];
                                //數據組包
                                mtp.SeqID = mtp.SeqID + 1;
                                mtp.Message = Convert.ToBase64String(bytes);

                                //發送數據
                                byte[] buf = UtilityHelper.Serialize(mtp);
                                client.Send(buf, buf.Length, toUser.P2PAddress);
                                Thread.Sleep(100);
                            }
                        }
                        else
                            client.Send(buffer, buffer.Length, toUser.P2PAddress);
                    }
                    else if (packet.GetType() == typeof (FileStreamPacket))
                    {
                        FileStreamPacket fsp = packet as FileStreamPacket;
                        System.IO.FileStream fs = new System.IO.FileStream(fsp.FileName, System.IO.FileMode.Open, System.IO.FileAccess.Read, FileShare.Read);

                       handle1.Reset();

                        fsp.SeqID = 0;
                        fsp.SeqCount = (int)System.Math.Ceiling (fs.Length / (decimal)MAXBUFFERSIZE);
                        fsp.CLSD = fsp.GetHashCode();

                        long pos = 0;
                        long count = fs.Length < MAXBUFFERSIZE ? fs.Length : MAXBUFFERSIZE;
                        while (pos < fs.Length && count > 0)
                        {
                            byte[] buffer = new byte [count];
                            fs.Seek(pos, SeekOrigin.Begin);
                            fs.Read(buffer, 0, (int) count);

                            pos += count;
                            count = pos + MAXBUFFERSIZE < fs.Length ? MAXBUFFERSIZE : fs.Length - pos;

                            //數據組包
                            fsp.SeqID = fsp.SeqID + 1;
                            fsp.Message = Convert.ToBase64String(buffer);

                            //發送數據
                            byte[] buf = UtilityHelper.Serialize(fsp);
                            client.Send(buf, buf.Length, toUser.P2PAddress);
                            Thread.Sleep(300);
                        }

                        handle1.Set();
                    }

                    // 等待接收線程將標記修改
                    for (int j = 0; j < 10; j++)
                    {
                        if (this.ReceivedACK)
                        {
                            this.ReceivedACK = false;
                            return true;
                        }
                        else
                        {
                            Thread.Sleep(300);
                        }
                    }
                }

                // 構建P2P打洞封包
                // 然後通過服務器轉發,請求對方向自己打洞
                P2PConnectionPacket transMsg = new P2PConnectionPacket (UserName, toUserName);
                byte[] msgBuffer = UtilityHelper.Serialize(transMsg);
                client.Send(msgBuffer, msgBuffer.Length, hostPoint);

                // 等待對方的P2PCONNECTACK消息
                for(int j = 0; j < 10; ++j)
                {
                    toUser = userList.Single(c => c.UserName == toUserName);
                    if ( toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)
                        break;
                    Thread.Sleep(300);
                }

            }
            return false;
        }

b、消息接受線程

/// <summary>
        /// 接受線程處理
        /// </summary>
        private void RecvThreadProc()
        {
            byte[] buffer;
            while (true)
            {

                buffer = client.Receive(ref remotePoint);
                Packet packet = UtilityHelper.Deserialize(buffer) as Packet;
                Type msgType = packet.GetType();
                if (msgType == typeof(UserListAckPacket))
                {
                    // 轉換消息
                    UserListAckPacket usersMsg = (UserListAckPacket) packet;
                    // 更新用戶列表
                    userList.Clear();
                    foreach (PeerEntity user in usersMsg.Users)
                    {
                        userList.Add(user);
                    }
                    bUserListComplete = true;
                }
                else if (msgType == typeof(UserLoginAckPacket))
                {
                    ProcUserLogAckMsg(packet);
                }
                else if (msgType == typeof(TextPacket))
                {
                    // 轉換消息
                    TextPacket txtPacket = (TextPacket)packet;
                    printf("Receive a message: {0}", txtPacket.Message);
                    // 發送應答消息
                    P2PAckPacket ackMsg = new P2PAckPacket();
                    buffer = UtilityHelper.Serialize(ackMsg);
                    client.Send(buffer, buffer.Length, remotePoint);
                }
                else if (msgType == typeof(MassTextPacket))
                {
                    lock (this)
                    {
                        MassTextPacket fPacket = (MassTextPacket) packet;
                        if (packets.ContainsKey(fPacket.CLSD))
                            packets[fPacket.CLSD].Add (fPacket);
                        else
                            packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });

                        printf("PacketID:{0}--SeqNo:{1}--progress: {2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as MassTextPacket).SeqCount, 2) * 100));
                        //組包
                        if ((fPacket as MassTextPacket).SeqCount == packets[fPacket.CLSD].Count)
                        {
                            List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();
                            List<byte> values = new List<byte>();
                            foreach (MassTextPacket mt in temp)
                            {
                                byte[] buf = Convert.FromBase64String(mt.Message);
                                values.AddRange(buf);
                            }

                            MassTextPacket value = UtilityHelper.Deserialize(values.ToArray()) as MassTextPacket;

                            printf("Receive a message: {0}", value.Message);

                            // 發送應答消息
                            P2PAckPacket ackMsg = new P2PAckPacket();
                            buffer = UtilityHelper.Serialize (ackMsg);
                            client.Send(buffer, buffer.Length, remotePoint);
                        }
                    }

                }
                else if (msgType == typeof(FileStreamPacket))
                {
                    lock (this)
                    {
                        FileStreamPacket fPacket = (FileStreamPacket)packet;
                        if (packets.ContainsKey(fPacket.CLSD))
                            packets[fPacket.CLSD].Add (fPacket);
                        else
                            packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });

                        printf("PacketID:{0}--SeqNo:{1}--progress: {2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as FileStreamPacket).SeqCount, 2) * 100));
                        //組包
                        if ((fPacket as FileStreamPacket).SeqCount == packets[fPacket.CLSD].Count)
                        {
                            List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();

                            System.IO.FileStream fs = new System.IO.FileStream((fPacket as FileStreamPacket).FileName + ".tmp", System.IO.FileMode.Create, System.IO.FileAccess.ReadWrite);
                            foreach (FileStreamPacket mt in temp)
                            {
                                byte[] buf = Convert.FromBase64String(mt.Message);
                                fs.Write(buf, 0, buf.Length);

                            }
                            fs.Flush();
                            fs.Close();
                           printf("Receive a file: {0}", (fPacket as FileStreamPacket).FileName);

                           //清除數據包
                           packets[fPacket.CLSD].Clear ();

                           // 發送應答消息
                           P2PAckPacket ackMsg = new P2PAckPacket();
                           buffer = UtilityHelper.Serialize (ackMsg);
                           client.Send(buffer, buffer.Length, remotePoint);
                        }
                    }

                }
                else if (msgType == typeof(P2PAckPacket))
                {
                    this.ReceivedACK = true;
                }
                else if (msgType == typeof(P2PPurchHolePacket))
                {
                    ProcP2PPurchHoleMsg(packet, remotePoint);
                }
                else if (msgType == typeof(P2PPurchHoleAckPacket))
                {
                    PeerEntity touser = userList.SingleOrDefault(c => c.UserName == (packet as P2PPurchHoleAckPacket).ToUserName);
                    //更改本地的P2P連接時使用的IP地址
                    touser.P2PAddress = touser.RemoteEndPoint;
                }
                Thread.Sleep(100);
            }

        }

c.建立p2p會話

        private void ProcP2PPurchHoleMsg(Packet packet,IPEndPoint remoteEP)
        {
            //打洞請求消息
            P2PPurchHolePacket purchReqMsg = (P2PPurchHolePacket)packet;
            PeerEntity toUser = userList.Single(c => c.UserName == purchReqMsg.ToUserName);
            PeerEntity user = userList.Single(c => c.UserName == purchReqMsg.UserName);
            toUser.P2PAddress = toUser.RemoteEndPoint;
            printf("Set P2P Address for {0}->[{1}]", user.UserName, toUser.P2PAddress.ToString());

            //uPnp實現端口映射
            if(NAT.AddPortMapping(toUser.P2PAddress.Port, ProtocolType.Udp, "AddPortMapping"))
                printf("Port mapping successed!");

            // 發送打洞消息到遠程主機
            P2PPurchHoleAckPacket trashMsg = new P2PPurchHoleAckPacket (purchReqMsg.UserName, purchReqMsg.ToUserName);
            byte[] buffer = UtilityHelper.Serialize(trashMsg);
            client.Send(buffer, buffer.Length, user.RemoteEndPoint);
        }

3、服務端

a、消息處理線程

  private void RecvThreadProc()
        {
            IPEndPoint remotePoint = null;
            byte[] msgBuffer = null;
            while (true)
            {
                msgBuffer = server.Receive(ref remotePoint);
                try
                {
                    object msgObj = UtilityHelper.Deserialize (msgBuffer);
                    switch ((msgObj as Packet).GetCommandType())
                    {
                        case Command.MSG_USERLOGIN:         //用戶登錄
                            ProcUserLoginMsg(msgObj as UserLoginPacket, remotePoint);
                            break;
                        case Command.MSG_USERLOGOUT:        //退出登錄
                            ProcUserLogoutMsg(msgObj as UserLogoutPacket, remotePoint);
                            break;
                        case Command.MSG_GETUSERLIST:       //所有用戶列表
                            ProcGetUserListMsg(msgObj as UserListPacket, remotePoint);
                            break;
                        case Command.MSG_P2PCONNECT:        //點對點連接信息
                            ProcP2PConnectMsg(msgObj as P2PConnectionPacket, remotePoint);
                            break;
                        case Command.MSG_USERACTIVEQUERY:   // 用 戶對服務器輪詢的應答
                            ProcUserActiveQueryMsg(msgObj as UserActiveQueryPacket, remotePoint);
                            break;
                    }
                    Thread.Sleep(100);
                }
                catch { }
            }
        }

b、服務端請求客戶端建立p2p連接

        private void ProcP2PConnectMsg(Packet packet,IPEndPoint remoteEP)
        {
            // 轉換接受的消息
            P2PConnectionPacket transMsg = (P2PConnectionPacket)packet;
            printf("{0}({1}) wants to p2p {2}", remoteEP.Address.ToString(), transMsg.UserName, transMsg.ToUserName);
            // 獲取目標用戶
            PeerEntity toUser = userList.SingleOrDefault(c => c.UserName == transMsg.ToUserName);

            // 轉發Purch Hole請求消息
            P2PPurchHolePacket transMsg2 = new P2PPurchHolePacket (transMsg.UserName, toUser.UserName);
            //轉發消息
            byte[] buffer = UtilityHelper.Serialize(transMsg2);
            server.Send(buffer, buffer.Length, toUser.RemoteEndPoint);

        }

4、測試

a、服務端

b、客戶端

困惑:

1、能不能實現外網通訊,要實現像QQ那樣通訊要做哪些改進。

2、文件續傳如何實現。

3、c#封裝的網絡操作類(像QQ.NET源碼的Net實現)

4、遠程協助的實現。

最後,希望大家共同討論、共同進步!!!

本文配套源碼

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved