寫自己的socket框架(二),socket框架
1、開始正常監聽以後,就要開始接受數據了,整體流程圖如下:
![](https://www.aspphp.online/bianchen/UploadFiles_4619/201701/2017012018490980.jpg)
2、上一節看到我們在程序初始化的時候,初始化了很多個SocketConnection,用於管理客戶端的鏈接,那應用層如何來操作,又什麼時候來接受數據?於是我們便有了SocketSession,用於給應用層來管理整個會話過程,代碼如下:
![](https://www.aspphp.online/bianchen/UploadFiles_4619/201701/2017012018490975.gif)
![]()
public class SocketSession : IDisposable
{
public string SessionId { get; private set; }
private System.Net.Sockets.Socket _connectSocket;
private IProtocol _protocol;
private SocketConnection _connect;
public SocketConnection Connection { get { return _connect; } }
private MemoryStream _memStream;
private delegate void ReceiveDataHandler(SocketAsyncEventArgs e);
private ReceiveDataHandler ReceiveHandler;
private delegate void ReceiveReadPackageHandler(byte[] b, int offset, SocketAsyncEventArgs e);
private ReceiveReadPackageHandler ReadPackageHandler;
public System.Net.Sockets.Socket ConnectSocket
{
get
{
return _connectSocket;
}
private set { }
}
public SocketSession(string sessionId)
{
this.SessionId = sessionId;
}
public SocketSession(System.Net.Sockets.Socket client, SocketConnection connect)
: this(Guid.NewGuid().ToString())
{
this._connectSocket = client;
this._connect = connect;
this._protocol = connect.Pool.AppServer.AppProtocol;
_memStream = new MemoryStream();
ReceiveHandler = ReceiveData;
ReadPackageHandler = this.ReadPackage;
}
internal void ReceiveData(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.Close();
return;
}
if (e.BytesTransferred <= 0)
{
this.Close();
return;
}
try
{
if (this.Connection.Flag == SocketFlag.Busy)
{
byte[] buffer = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, 0, buffer, 0, e.BytesTransferred);
ReadPackage(buffer, 0, e);
buffer = null;
}
}
catch (Exception ex)
{
this.Close();
return;
}
}
internal void ReceiveAsync(SocketAsyncEventArgs e)
{
if (e == null)
{
return;
}
bool isCompleted = true;
try
{
isCompleted = this._connectSocket.ReceiveAsync(e);
}
catch (Exception ex)
{
LogHelper.Debug(this.SessionId + ex.ToString());
this.Close();
}
if (!isCompleted)
{
this.ReceiveHandler.BeginInvoke(e, ReceiveHandlerCallBack, ReceiveHandler);
}
}
void ReceiveHandlerCallBack(IAsyncResult result)
{
try
{
(result.AsyncState as ReceiveDataHandler).EndInvoke(result);
}
catch (Exception e)
{
LogHelper.Debug(e.Message);
}
}
internal void OnDataRecevied(SessionEventArgs arg)
{
if (DataRecevied != null)
{
this._memStream.SetLength(0);
DataRecevied.Invoke(this, arg);
}
}
internal void Close()
{
try
{
this._connectSocket.Close();
}
catch (Exception ex)
{
LogHelper.Debug("關閉socket異常" + ex.ToString());
}
if (this.Closed != null)
{
this.Closed();
}
}
internal Action Closed;
internal Action<SocketSession, SessionEventArgs> DataRecevied;
public void Dispose()
{
if (_memStream != null)
{
_memStream.Close();
_memStream.Dispose();
_memStream = null;
}
}
public void Send(byte[] data)
{
try
{
if (this.Connection.Flag == SocketFlag.Busy)
{
this._connectSocket.Send(data);
}
}
catch (Exception ex)
{
this.Close();
}
}
private void ReadPackage(byte[] data, int offset, SocketAsyncEventArgs e)
{
if (data == null || data.Length == 0)
{
return;
}
if (offset >= data.Length)
{
return;
}
if (offset == 0)
{
if (_memStream.Length > 0)
{
_memStream.Write(data, 0, data.Length);
data = _memStream.ToArray();
}
}
//粘包處理
OnReceivedCallBack(data, offset, e);
data = null;
}
private void OnReceivedCallBack(byte[] buffer, int offset, SocketAsyncEventArgs e)
{
byte[] data = this._protocol.OnDataReceivedCallBack(buffer, ref offset);
if (offset == -1)
{
this.Close();
return;
}
if (data == null || data.Length == 0)
{
this._memStream.Write(buffer, offset, buffer.Length - offset);
this.ReceiveAsync(e);
return;
}
SessionEventArgs session_args = new SessionEventArgs();
session_args.Data = data;
this.OnDataRecevied(session_args);
if (offset < buffer.Length)
{
this.ReadPackageHandler.BeginInvoke(buffer, offset, e, ReadPackageCallBack, ReadPackageHandler);
}
else
{
this.ReceiveAsync(e);
}
data = null;
}
void ReadPackageCallBack(IAsyncResult result)
{
try
{
(result.AsyncState as ReceiveReadPackageHandler).EndInvoke(result);
}
catch (Exception ex)
{
LogHelper.Debug(ex.Message);
}
}
}
View Code
細心的童鞋可以發現,在ReceiveAsync方法裡面,接收數據的地方,當同步接收完成的時候,我們調用了一個異步委托ReceiveHandler.BeginInvoke。
在解析出一個獨立的包,並且緩沖區的數據裡面還有多余的包的時候,我們也調用了一個異步的委托ReadPackageHandler.BeginInvoke。
如果緩沖區比較大,比如我現在是8K,而單個包很小,客戶端又發送比較頻繁的時候。會導致在解析包的時候,形成一個短暫的遞歸。遞歸就會不停的壓堆,資源得不到釋放。
運行一段時間後,有可能導致OutOfMemoryException,如果一直是同步接收數據,在Receive的地方,也有可能形成一個遞歸。於是便采用了異步調用的方式。
3、因為socket屬於無邊界的,代碼層面的每一次Send,並不是真正意義上的直接發送給服務器,而只是寫到了緩沖區,由系統來決定什麼時候發。如果客戶 端發送非常頻繁的情況下,就可能導致服務器從緩沖區取出來的包,是由多個包一起組成的。從緩沖區取出來的包,並不能保證是一個獨立的應用層的包,需要按既定的協議來解析包。
我們先假定一個簡單的協議,一個包的前4個字節,表明這個包內容的長度。代碼如下:
![](https://www.aspphp.online/bianchen/UploadFiles_4619/201701/2017012018490975.gif)
![]()
public class DefaultProtocol : IProtocol
{
public byte[] OnDataReceivedCallBack(byte[] data, ref int offset)
{
int length = BitConverter.ToInt32(data, offset);
int package_head = 4;
int package_length = length + package_head;
byte[] buffer = null;
if (length > 0)
{
if (offset + package_length <= data.Length)
{
buffer = new byte[length];
Array.Copy(data, offset + package_head, buffer, 0, length);
offset += package_length;
}
}
else
{
offset = -1;
}
return buffer;
}
}
View Code
如果協議無法正常解析,則offset=-1,並關閉掉該鏈接。如果在解析完一個包以後,還有剩余的包, 於是在拋給應用層以後,便繼續解析。如果單個包比較大,緩沖區一次放不下的時候,我們將數據暫時寫入到內存流裡面,然後將下一次接收到的數據,一並拿出來解析。
4、接收數據已經准備完畢以後,就需要將SocketConnection和SocketSession關聯起來,代碼如下:
![](https://www.aspphp.online/bianchen/UploadFiles_4619/201701/2017012018490975.gif)
![]()
public class AppServer : IAppServer
{
public delegate void DataRecevieHandler(SocketSession o, SessionEventArgs e);
public delegate void NewConnectionHandler(SocketSession o, EventArgs e);
public delegate void OnErrorHandler(Exception e);
public event DataRecevieHandler DataRecevied;
public event NewConnectionHandler NewConnected;
public event OnErrorHandler OnError;
private ISocketListener _listener;
private SocketConnectionPool _connectPool;
public AppServer(ServerConfig serverConfig)
{
this.AppConfig = serverConfig;
if (this.AppProtocol == null)
{
this.AppProtocol = new DefaultProtocol();
}
_connectPool = new SocketConnectionPool(this);
_connectPool.Connected = OnConnected;
_listener = new SocketListener(this.AppConfig);
_listener.NewClientAccepted += new NewClientAcceptHandler(listener_NewClientAccepted);
_listener.Error += new ErrorHandler(_listener_Error);
}
void OnDataRecevied(SocketSession session, SessionEventArgs e)
{
if (this.DataRecevied != null)
{
DataRecevied.BeginInvoke(session, e, DataReceviedCallBack, DataRecevied);
}
}
public bool Start()
{
_connectPool.Init();
return _listener.Start();
}
public void Stop()
{
_listener.Stop();
}
void _listener_Error(ISocketListener listener, Exception e)
{
if (this.OnError != null)
{
this.OnError.Invoke(e);
}
}
void listener_NewClientAccepted(ISocketListener listener, System.Net.Sockets.Socket client, object state)
{
_connectPool.Push(client);
}
public void OnConnected(System.Net.Sockets.Socket client, SocketConnection connect)
{
var session = new SocketSession(client, connect);
session.DataRecevied = OnDataRecevied;
connect.Initialise(session);
if (NewConnected != null)
{
NewConnected.BeginInvoke(session, EventArgs.Empty, NewConnectedCallBack, NewConnected);
}
if (connect.RecevieEventArgs != null)
{
session.ReceiveAsync(connect.RecevieEventArgs);
}
}
void DataReceviedCallBack(IAsyncResult result)
{
try
{
(result.AsyncState as DataRecevieHandler).EndInvoke(result);
}
catch (Exception e)
{
LogHelper.Debug(e.Message);
}
}
void NewConnectedCallBack(IAsyncResult result)
{
try
{
(result.AsyncState as NewConnectionHandler).EndInvoke(result);
}
catch (Exception e)
{
LogHelper.Debug(e.Message);
}
}
public ServerConfig AppConfig
{
get;
set;
}
public IProtocol AppProtocol
{
get;
set;
}
}
View Code
到這裡,整個接收包的流程就結束了,但是發送的地方,我們發現是同步在發送,如果有特別需要的可以考慮寫成異步方式,但我個人更傾向於,這一塊留給應用層處理,在應用層寫一個發送隊列,然後有獨立的線程來管理這個發送隊列。
開發安卓游戲怎構架服務器並使客戶端與服務器傳數據?
1.首先要有一個數據庫 常用的mysql oracle mssql都可以
2.其次 要有一個服務器
看你用什麼通信協議?
HTTP協議的web服務器 有tomcat, apache, weblogic等等
socket協議的話可以自己寫一個服務器 或用現成的框架可以用apache mina2.0 很實用
3.android手機端程序要寫一段程序連接到服務器傳輸數據
HTTP協議的話 api支持 HttpClient
Socket的話api也支持 自己寫就是了 apache mina也行
利用socket進行面向連接的網絡編程程序框架給我寫一個吧
這個。。。。。你要源代碼?