在網上看了很多程序(QQ、Azureus、Ants、PPStream)都實現了p2p,以前覺得技術很高深。通過這 段時間的學習才發現,單純的實現p2p在局域網通訊很容易,但是要實現外網穿透(NAT)感覺很困難。最近 看了Azureus和emule源碼,分別是JAVA和C++版,本人對這兩門語言都不熟悉,看起來很吃力。最後只好 根據VC++實現的P2PDemo程序進行了改版,根據設計思路用c#寫了一個Demo出來。通過測試,多個客戶端 在局域網能脫離服務端實現端到端工作。外網的情況要通過路由器,看了Azureus要實現uPnp進行端口映 射,在CodeProject上下載了一個uPnp源碼看,測試結果沒有啟用uPnp路由器。結果現在郁悶了,不知道 下一步怎麼測試,是不是用upnp實現了端口自動映射成功就能實現象QQ那樣通訊。
下面是程序說明:
1、公共類
公共類主要定義一些包結構
a、Packet.cs
[Serializable()]
public abstract class Packet
{
/// <summary>
/// 命令類型
/// </summary>
/// <returns></returns>
public virtual int GetCommandType()
{
return -1;
}
/// <summary>
/// 用戶名
/// </summary>
public string UserName
{
get;
set;
}
public Packet()
{ }
public Packet(string username)
{
this.UserName = username;
}
}
b、MassTextPacket.cs --分片傳輸類
[Serializable()]
public class MassTextPacket:TextPacket
{
private int seqID;
/// <summary>
/// 包序列
/// </summary>
public int SeqID
{
get { return seqID; }
set { seqID = value; }
}
private int seqCount;
/// <summary>
/// 包數量
/// </summary>
public int SeqCount
{
get { return seqCount; }
set { seqCount = value; }
}
private int _CLSD;
public int CLSD
{
get { return _CLSD; }
set { _CLSD = value; }
}
}
2、客戶端
a、消息傳送時進行p2p通訊
private bool SendMessageTo(string toUserName, Packet packet)
{
PeerEntity toUser = userList.Single(c => c.UserName == toUserName);
if (toUser == null)
{
return false;
}
ReceivedACK = false;
for (int i=0; i<MAXRETRY; i++)
{
// 如果對方P2P地址不為0,就試圖以它為目的地址發送數據,
// 如果發送失敗,則認為此P2P地址無效
if (toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)
{
if (packet.GetType() == typeof(TextPacket))
{
TextPacket msgPacket = new TextPacket (toUserName, (packet as TextPacket).Message);
byte[] buffer = UtilityHelper.Serialize (msgPacket);
if (buffer.Length > MAXBUFFERSIZE)
{
MassTextPacket mtp = new MassTextPacket();
mtp.SeqID = 0;
mtp.SeqCount = (int) System.Math.Ceiling(buffer.Length / (decimal)MAXBUFFERSIZE);
mtp.CLSD = mtp.GetHashCode ();
long pos = 0;
long count = buffer.Length < MAXBUFFERSIZE ? buffer.Length : MAXBUFFERSIZE;
while (pos < buffer.Length && pos > 0)
{
byte[] bytes = new byte [count]; ;
for (int k = 0; k < count; k++)
bytes[k] = buffer [pos + k];
//數據組包
mtp.SeqID = mtp.SeqID + 1;
mtp.Message = Convert.ToBase64String(bytes);
//發送數據
byte[] buf = UtilityHelper.Serialize(mtp);
client.Send(buf, buf.Length, toUser.P2PAddress);
Thread.Sleep(100);
}
}
else
client.Send(buffer, buffer.Length, toUser.P2PAddress);
}
else if (packet.GetType() == typeof (FileStreamPacket))
{
FileStreamPacket fsp = packet as FileStreamPacket;
System.IO.FileStream fs = new System.IO.FileStream(fsp.FileName, System.IO.FileMode.Open, System.IO.FileAccess.Read, FileShare.Read);
handle1.Reset();
fsp.SeqID = 0;
fsp.SeqCount = (int)System.Math.Ceiling (fs.Length / (decimal)MAXBUFFERSIZE);
fsp.CLSD = fsp.GetHashCode();
long pos = 0;
long count = fs.Length < MAXBUFFERSIZE ? fs.Length : MAXBUFFERSIZE;
while (pos < fs.Length && count > 0)
{
byte[] buffer = new byte [count];
fs.Seek(pos, SeekOrigin.Begin);
fs.Read(buffer, 0, (int) count);
pos += count;
count = pos + MAXBUFFERSIZE < fs.Length ? MAXBUFFERSIZE : fs.Length - pos;
//數據組包
fsp.SeqID = fsp.SeqID + 1;
fsp.Message = Convert.ToBase64String(buffer);
//發送數據
byte[] buf = UtilityHelper.Serialize(fsp);
client.Send(buf, buf.Length, toUser.P2PAddress);
Thread.Sleep(300);
}
handle1.Set();
}
// 等待接收線程將標記修改
for (int j = 0; j < 10; j++)
{
if (this.ReceivedACK)
{
this.ReceivedACK = false;
return true;
}
else
{
Thread.Sleep(300);
}
}
}
// 構建P2P打洞封包
// 然後通過服務器轉發,請求對方向自己打洞
P2PConnectionPacket transMsg = new P2PConnectionPacket (UserName, toUserName);
byte[] msgBuffer = UtilityHelper.Serialize(transMsg);
client.Send(msgBuffer, msgBuffer.Length, hostPoint);
// 等待對方的P2PCONNECTACK消息
for(int j = 0; j < 10; ++j)
{
toUser = userList.Single(c => c.UserName == toUserName);
if ( toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)
break;
Thread.Sleep(300);
}
}
return false;
}
b、消息接受線程
/// <summary>
/// 接受線程處理
/// </summary>
private void RecvThreadProc()
{
byte[] buffer;
while (true)
{
buffer = client.Receive(ref remotePoint);
Packet packet = UtilityHelper.Deserialize(buffer) as Packet;
Type msgType = packet.GetType();
if (msgType == typeof(UserListAckPacket))
{
// 轉換消息
UserListAckPacket usersMsg = (UserListAckPacket) packet;
// 更新用戶列表
userList.Clear();
foreach (PeerEntity user in usersMsg.Users)
{
userList.Add(user);
}
bUserListComplete = true;
}
else if (msgType == typeof(UserLoginAckPacket))
{
ProcUserLogAckMsg(packet);
}
else if (msgType == typeof(TextPacket))
{
// 轉換消息
TextPacket txtPacket = (TextPacket)packet;
printf("Receive a message: {0}", txtPacket.Message);
// 發送應答消息
P2PAckPacket ackMsg = new P2PAckPacket();
buffer = UtilityHelper.Serialize(ackMsg);
client.Send(buffer, buffer.Length, remotePoint);
}
else if (msgType == typeof(MassTextPacket))
{
lock (this)
{
MassTextPacket fPacket = (MassTextPacket) packet;
if (packets.ContainsKey(fPacket.CLSD))
packets[fPacket.CLSD].Add (fPacket);
else
packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });
printf("PacketID:{0}--SeqNo:{1}--progress: {2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as MassTextPacket).SeqCount, 2) * 100));
//組包
if ((fPacket as MassTextPacket).SeqCount == packets[fPacket.CLSD].Count)
{
List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();
List<byte> values = new List<byte>();
foreach (MassTextPacket mt in temp)
{
byte[] buf = Convert.FromBase64String(mt.Message);
values.AddRange(buf);
}
MassTextPacket value = UtilityHelper.Deserialize(values.ToArray()) as MassTextPacket;
printf("Receive a message: {0}", value.Message);
// 發送應答消息
P2PAckPacket ackMsg = new P2PAckPacket();
buffer = UtilityHelper.Serialize (ackMsg);
client.Send(buffer, buffer.Length, remotePoint);
}
}
}
else if (msgType == typeof(FileStreamPacket))
{
lock (this)
{
FileStreamPacket fPacket = (FileStreamPacket)packet;
if (packets.ContainsKey(fPacket.CLSD))
packets[fPacket.CLSD].Add (fPacket);
else
packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });
printf("PacketID:{0}--SeqNo:{1}--progress: {2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as FileStreamPacket).SeqCount, 2) * 100));
//組包
if ((fPacket as FileStreamPacket).SeqCount == packets[fPacket.CLSD].Count)
{
List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();
System.IO.FileStream fs = new System.IO.FileStream((fPacket as FileStreamPacket).FileName + ".tmp", System.IO.FileMode.Create, System.IO.FileAccess.ReadWrite);
foreach (FileStreamPacket mt in temp)
{
byte[] buf = Convert.FromBase64String(mt.Message);
fs.Write(buf, 0, buf.Length);
}
fs.Flush();
fs.Close();
printf("Receive a file: {0}", (fPacket as FileStreamPacket).FileName);
//清除數據包
packets[fPacket.CLSD].Clear ();
// 發送應答消息
P2PAckPacket ackMsg = new P2PAckPacket();
buffer = UtilityHelper.Serialize (ackMsg);
client.Send(buffer, buffer.Length, remotePoint);
}
}
}
else if (msgType == typeof(P2PAckPacket))
{
this.ReceivedACK = true;
}
else if (msgType == typeof(P2PPurchHolePacket))
{
ProcP2PPurchHoleMsg(packet, remotePoint);
}
else if (msgType == typeof(P2PPurchHoleAckPacket))
{
PeerEntity touser = userList.SingleOrDefault(c => c.UserName == (packet as P2PPurchHoleAckPacket).ToUserName);
//更改本地的P2P連接時使用的IP地址
touser.P2PAddress = touser.RemoteEndPoint;
}
Thread.Sleep(100);
}
}
c.建立p2p會話
private void ProcP2PPurchHoleMsg(Packet packet,IPEndPoint remoteEP)
{
//打洞請求消息
P2PPurchHolePacket purchReqMsg = (P2PPurchHolePacket)packet;
PeerEntity toUser = userList.Single(c => c.UserName == purchReqMsg.ToUserName);
PeerEntity user = userList.Single(c => c.UserName == purchReqMsg.UserName);
toUser.P2PAddress = toUser.RemoteEndPoint;
printf("Set P2P Address for {0}->[{1}]", user.UserName, toUser.P2PAddress.ToString());
//uPnp實現端口映射
if(NAT.AddPortMapping(toUser.P2PAddress.Port, ProtocolType.Udp, "AddPortMapping"))
printf("Port mapping successed!");
// 發送打洞消息到遠程主機
P2PPurchHoleAckPacket trashMsg = new P2PPurchHoleAckPacket (purchReqMsg.UserName, purchReqMsg.ToUserName);
byte[] buffer = UtilityHelper.Serialize(trashMsg);
client.Send(buffer, buffer.Length, user.RemoteEndPoint);
}
3、服務端
a、消息處理線程
private void RecvThreadProc()
{
IPEndPoint remotePoint = null;
byte[] msgBuffer = null;
while (true)
{
msgBuffer = server.Receive(ref remotePoint);
try
{
object msgObj = UtilityHelper.Deserialize (msgBuffer);
switch ((msgObj as Packet).GetCommandType())
{
case Command.MSG_USERLOGIN: //用戶登錄
ProcUserLoginMsg(msgObj as UserLoginPacket, remotePoint);
break;
case Command.MSG_USERLOGOUT: //退出登錄
ProcUserLogoutMsg(msgObj as UserLogoutPacket, remotePoint);
break;
case Command.MSG_GETUSERLIST: //所有用戶列表
ProcGetUserListMsg(msgObj as UserListPacket, remotePoint);
break;
case Command.MSG_P2PCONNECT: //點對點連接信息
ProcP2PConnectMsg(msgObj as P2PConnectionPacket, remotePoint);
break;
case Command.MSG_USERACTIVEQUERY: // 用 戶對服務器輪詢的應答
ProcUserActiveQueryMsg(msgObj as UserActiveQueryPacket, remotePoint);
break;
}
Thread.Sleep(100);
}
catch { }
}
}
b、服務端請求客戶端建立p2p連接
private void ProcP2PConnectMsg(Packet packet,IPEndPoint remoteEP)
{
// 轉換接受的消息
P2PConnectionPacket transMsg = (P2PConnectionPacket)packet;
printf("{0}({1}) wants to p2p {2}", remoteEP.Address.ToString(), transMsg.UserName, transMsg.ToUserName);
// 獲取目標用戶
PeerEntity toUser = userList.SingleOrDefault(c => c.UserName == transMsg.ToUserName);
// 轉發Purch Hole請求消息
P2PPurchHolePacket transMsg2 = new P2PPurchHolePacket (transMsg.UserName, toUser.UserName);
//轉發消息
byte[] buffer = UtilityHelper.Serialize(transMsg2);
server.Send(buffer, buffer.Length, toUser.RemoteEndPoint);
}
4、測試
a、服務端
b、客戶端
困惑:
1、能不能實現外網通訊,要實現像QQ那樣通訊要做哪些改進。
2、文件續傳如何實現。
3、c#封裝的網絡操作類(像QQ.NET源碼的Net實現)
4、遠程協助的實現。
最後,希望大家共同討論、共同進步!!!
本文配套源碼