TCP客戶端連接TCP服務器端有幾種應用狀態:
1.與服務器的連接已建立
2.與服務器的連接已斷開
3.與服務器的連接發生異常
應用程序可按需求合理處理這些邏輯,比如:
1.連接斷開後自動重連
2.連接斷開後選擇備用地址重連
3.所有狀態變化上報告警
本文描述的TcpClient實現了狀態變化的事件通知機制。
代碼如下:
/// <summary>
/// 異步TCP客戶端
/// </summary>
public class AsyncTcpClient : IDisposable
{
#region Fields
private TcpClient tcpClient;
private bool disposed = false;
private int retries = 0;
#endregion
#region Ctors
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteEP">遠端服務器終結點</param>
public AsyncTcpClient(IPEndPoint remoteEP)
: this(new[] { remoteEP.Address }, remoteEP.Port)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteEP">遠端服務器終結點</param>
/// <param name="localEP">本地客戶端終結點</param>
public AsyncTcpClient(IPEndPoint remoteEP, IPEndPoint localEP)
: this(new[] { remoteEP.Address }, remoteEP.Port, localEP)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteIPAddress">遠端服務器IP地址</param>
/// <param name="remotePort">遠端服務器端口</param>
public AsyncTcpClient(IPAddress remoteIPAddress, int remotePort)
: this(new[] { remoteIPAddress }, remotePort)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteIPAddress">遠端服務器IP地址</param>
/// <param name="remotePort">遠端服務器端口</param>
/// <param name="localEP">本地客戶端終結點</param>
public AsyncTcpClient(
IPAddress remoteIPAddress, int remotePort, IPEndPoint localEP)
: this(new[] { remoteIPAddress }, remotePort, localEP)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteHostName">遠端服務器主機名</param>
/// <param name="remotePort">遠端服務器端口</param>
public AsyncTcpClient(string remoteHostName, int remotePort)
: this(Dns.GetHostAddresses(remoteHostName), remotePort)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteHostName">遠端服務器主機名</param>
/// <param name="remotePort">遠端服務器端口</param>
/// <param name="localEP">本地客戶端終結點</param>
public AsyncTcpClient(
string remoteHostName, int remotePort, IPEndPoint localEP)
: this(Dns.GetHostAddresses(remoteHostName), remotePort, localEP)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteIPAddresses">遠端服務器IP地址列表</param>
/// <param name="remotePort">遠端服務器端口</param>
public AsyncTcpClient(IPAddress[] remoteIPAddresses, int remotePort)
: this(remoteIPAddresses, remotePort, null)
{
}
/// <summary>
/// 異步TCP客戶端
/// </summary>
/// <param name="remoteIPAddresses">遠端服務器IP地址列表</param>
/// <param name="remotePort">遠端服務器端口</param>
/// <param name="localEP">本地客戶端終結點</param>
public AsyncTcpClient(
IPAddress[] remoteIPAddresses, int remotePort, IPEndPoint localEP)
{
this.Addresses = remoteIPAddresses;
this.Port = remotePort;
this.LocalIPEndPoint = localEP;
this.Encoding = Encoding.Default;
if (this.LocalIPEndPoint != null)
{
this.tcpClient = new TcpClient(this.LocalIPEndPoint);
}
else
{
this.tcpClient = new TcpClient();
}
Retries = 3;
RetryInterval = 5;
}
#endregion
#region Properties
/// <summary>
/// 是否已與服務器建立連接
/// </summary>
public bool Connected { get { return tcpClient.Client.Connected; } }
/// <summary>
/// 遠端服務器的IP地址列表
/// </summary>
public IPAddress[] Addresses { get; private set; }
/// <summary>
/// 遠端服務器的端口
/// </summary>
public int Port { get; private set; }
/// <summary>
/// 連接重試次數
/// </summary>
public int Retries { get; set; }
/// <summary>
/// 連接重試間隔
/// </summary>
public int RetryInterval { get; set; }
/// <summary>
/// 遠端服務器終結點
/// </summary>
public IPEndPoint RemoteIPEndPoint
{
get { return new IPEndPoint(Addresses[0], Port); }
}
/// <summary>
/// 本地客戶端終結點
/// </summary>
protected IPEndPoint LocalIPEndPoint { get; private set; }
/// <summary>
/// 通信所使用的編碼
/// </summary>
public Encoding Encoding { get; set; }
#endregion
#region Connect
/// <summary>
/// 連接到服務器
/// </summary>
/// <returns>異步TCP客戶端</returns>
public AsyncTcpClient Connect()
{
if (!Connected)
{
// start the async connect operation
tcpClient.BeginConnect(
Addresses, Port, HandleTcpServerConnected, tcpClient);
}
return this;
}
/// <summary>
/// 關閉與服務器的連接
/// </summary>
/// <returns>異步TCP客戶端</returns>
public AsyncTcpClient Close()
{
if (Connected)
{
retries = 0;
tcpClient.Close();
RaiseServerDisconnected(Addresses, Port);
}
return this;
}
#endregion
#region Receive
private void HandleTcpServerConnected(IAsyncResult ar)
{
try
{
tcpClient.EndConnect(ar);
RaiseServerConnected(Addresses, Port);
retries = 0;
}
catch (Exception ex)
{
ExceptionHandler.Handle(ex);
if (retries > 0)
{
Logger.Debug(string.Format(CultureInfo.InvariantCulture,
"Connect to server with retry {0} failed.", retries));
}
retries++;
if (retries > Retries)
{
// we have failed to connect to all the IP Addresses,
// connection has failed overall.
RaiseServerExceptionOccurred(Addresses, Port, ex);
return;
}
else
{
Logger.Debug(string.Format(CultureInfo.InvariantCulture,
"Waiting {0} seconds before retrying to connect to server.",
RetryInterval));
Thread.Sleep(TimeSpan.FromSeconds(RetryInterval));
Connect();
return;
}
}
// we are connected successfully and start asyn read operation.
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
tcpClient.GetStream().BeginRead(
buffer, 0, buffer.Length, HandleDatagramReceived, buffer);
}
private void HandleDatagramReceived(IAsyncResult ar)
{
NetworkStream stream = tcpClient.GetStream();
int numberOfReadBytes = 0;
try
{
numberOfReadBytes = stream.EndRead(ar);
}
catch
{
numberOfReadBytes = 0;
}
if (numberOfReadBytes == 0)
{
// connection has been closed
Close();
return;
}
// received byte and trigger event notification
byte[] buffer = (byte[])ar.AsyncState;
byte[] receivedBytes = new byte[numberOfReadBytes];
Buffer.BlockCopy(buffer, 0, receivedBytes, 0, numberOfReadBytes);
RaiseDatagramReceived(tcpClient, receivedBytes);
RaisePlaintextReceived(tcpClient, receivedBytes);
// then start reading from the network again
stream.BeginRead(
buffer, 0, buffer.Length, HandleDatagramReceived, buffer);
}
#endregion
#region Events
/// <summary>
/// 接收到數據報文事件
/// </summary>
public event EventHandler<TcpDatagramReceivedEventArgs<byte[]>> DatagramReceived;
/// <summary>
/// 接收到數據報文明文事件
/// </summary>
public event EventHandler<TcpDatagramReceivedEventArgs<string>> PlaintextReceived;
private void RaiseDatagramReceived(TcpClient sender, byte[] datagram)
{
if (DatagramReceived != null)
{
DatagramReceived(this,
new TcpDatagramReceivedEventArgs<byte[]>(sender, datagram));
}
}
private void RaisePlaintextReceived(TcpClient sender, byte[] datagram)
{
if (PlaintextReceived != null)
{
PlaintextReceived(this,
new TcpDatagramReceivedEventArgs<string>(
sender, this.Encoding.GetString(datagram, 0, datagram.Length)));
}
}
/// <summary>
/// 與服務器的連接已建立事件
/// </summary>
public event EventHandler<TcpServerConnectedEventArgs> ServerConnected;
/// <summary>
/// 與服務器的連接已斷開事件
/// </summary>
public event EventHandler<TcpServerDisconnectedEventArgs> ServerDisconnected;
/// <summary>
/// 與服務器的連接發生異常事件
/// </summary>
public event EventHandler<TcpServerExceptionOccurredEventArgs> ServerExceptionOccurred;
private void RaiseServerConnected(IPAddress[] ipAddresses, int port)
{
if (ServerConnected != null)
{
ServerConnected(this,
new TcpServerConnectedEventArgs(ipAddresses, port));
}
}
private void RaiseServerDisconnected(IPAddress[] ipAddresses, int port)
{
if (ServerDisconnected != null)
{
ServerDisconnected(this,
new TcpServerDisconnectedEventArgs(ipAddresses, port));
}
}
private void RaiseServerExceptionOccurred(
IPAddress[] ipAddresses, int port, Exception innerException)
{
if (ServerExceptionOccurred != null)
{
ServerExceptionOccurred(this,
new TcpServerExceptionOccurredEventArgs(
ipAddresses, port, innerException));
}
}
#endregion
#region Send
/// <summary>
/// 發送報文
/// </summary>
/// <param name="datagram">報文</param>
public void Send(byte[] datagram)
{
if (datagram == null)
throw new ArgumentNullException("datagram");
if (!Connected)
{
RaiseServerDisconnected(Addresses, Port);
throw new InvalidProgramException(
"This client has not connected to server.");
}
tcpClient.GetStream().BeginWrite(
datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient);
}
private void HandleDatagramWritten(IAsyncResult ar)
{
((TcpClient)ar.AsyncState).GetStream().EndWrite(ar);
}
/// <summary>
/// 發送報文
/// </summary>
/// <param name="datagram">報文</param>
public void Send(string datagram)
{
Send(this.Encoding.GetBytes(datagram));
}
#endregion
#region IDisposable Members
/// <summary>
/// Performs application-defined tasks associated with freeing,
/// releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources
/// </summary>
/// <param name="disposing"><c>true</c> to release both managed
/// and unmanaged resources; <c>false</c>
/// to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
try
{
Close();
if (tcpClient != null)
{
tcpClient = null;
}
}
catch (SocketException ex)
{
ExceptionHandler.Handle(ex);
}
}
disposed = true;
}
}
#endregion
}
使用舉例
代碼如下:
class Program
{
static AsyncTcpClient client;
static void Main(string[] args)
{
LogFactory.Assign(new ConsoleLogFactory());
// 測試用,可以不指定由系統選擇端口
IPEndPoint remoteEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9999);
IPEndPoint localEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9998);
client = new AsyncTcpClient(remoteEP, localEP);
client.Encoding = Encoding.UTF8;
client.ServerExceptionOccurred +=
new EventHandler<TcpServerExceptionOccurredEventArgs>(client_ServerExceptionOccurred);
client.ServerConnected +=
new EventHandler<TcpServerConnectedEventArgs>(client_ServerConnected);
client.ServerDisconnected +=
new EventHandler<TcpServerDisconnectedEventArgs>(client_ServerDisconnected);
client.PlaintextReceived +=
new EventHandler<TcpDatagramReceivedEventArgs<string>>(client_PlaintextReceived);
client.Connect();
Console.WriteLine("TCP client has connected to server.");
Console.WriteLine("Type something to send to server...");
while (true)
{
try
{
string text = Console.ReadLine();
client.Send(text);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
static void client_ServerExceptionOccurred(
object sender, TcpServerExceptionOccurredEventArgs e)
{
Logger.Debug(string.Format(CultureInfo.InvariantCulture,
"TCP server {0} exception occurred, {1}.",
e.ToString(), e.Exception.Message));
}
static void client_ServerConnected(
object sender, TcpServerConnectedEventArgs e)
{
Logger.Debug(string.Format(CultureInfo.InvariantCulture,
"TCP server {0} has connected.", e.ToString()));
}
static void client_ServerDisconnected(
object sender, TcpServerDisconnectedEventArgs e)
{
Logger.Debug(string.Format(CultureInfo.InvariantCulture,
"TCP server {0} has disconnected.", e.ToString()));
}
static void client_PlaintextReceived(
object sender, TcpDatagramReceivedEventArgs<string> e)
{
Console.Write(string.Format("Server : {0} --> ",
e.TcpClient.Client.RemoteEndPoint.ToString()));
Console.WriteLine(string.Format("{0}", e.Datagram));
}
}
補充代碼
代碼如下:
/// <summary>
/// Internal class to join the TCP client and buffer together
/// for easy management in the server
/// </summary>
internal class TcpClientState
{
/// <summary>
/// Constructor for a new Client
/// </summary>
/// <param name="tcpClient">The TCP client</param>
/// <param name="buffer">The byte array buffer</param>
public TcpClientState(TcpClient tcpClient, byte[] buffer)
{
if (tcpClient == null)
throw new ArgumentNullException("tcpClient");
if (buffer == null)
throw new ArgumentNullException("buffer");
this.TcpClient = tcpClient;
this.Buffer = buffer;
}
/// <summary>
/// Gets the TCP Client
/// </summary>
public TcpClient TcpClient { get; private set; }
/// <summary>
/// Gets the Buffer.
/// </summary>
public byte[] Buffer { get; private set; }
/// <summary>
/// Gets the network stream
/// </summary>
public NetworkStream NetworkStream
{
get { return TcpClient.GetStream(); }
}
}
代碼如下:
/// <summary>
/// 與客戶端的連接已建立事件參數
/// </summary>
public class TcpClientConnectedEventArgs : EventArgs
{
/// <summary>
/// 與客戶端的連接已建立事件參數
/// </summary>
/// <param name="tcpClient">客戶端</param>
public TcpClientConnectedEventArgs(TcpClient tcpClient)
{
if (tcpClient == null)
throw new ArgumentNullException("tcpClient");
this.TcpClient = tcpClient;
}
/// <summary>
/// 客戶端
/// </summary>
public TcpClient TcpClient { get; private set; }
}
代碼如下:
/// <summary>
/// 與客戶端的連接已斷開事件參數
/// </summary>
public class TcpClientDisconnectedEventArgs : EventArgs
{
/// <summary>
/// 與客戶端的連接已斷開事件參數
/// </summary>
/// <param name="tcpClient">客戶端</param>
public TcpClientDisconnectedEventArgs(TcpClient tcpClient)
{
if (tcpClient == null)
throw new ArgumentNullException("tcpClient");
this.TcpClient = tcpClient;
}
/// <summary>
/// 客戶端
/// </summary>
public TcpClient TcpClient { get; private set; }
}
代碼如下:
/// <summary>
/// 與服務器的連接發生異常事件參數
/// </summary>
public class TcpServerExceptionOccurredEventArgs : EventArgs
{
/// <summary>
/// 與服務器的連接發生異常事件參數
/// </summary>
/// <param name="ipAddresses">服務器IP地址列表</param>
/// <param name="port">服務器端口</param>
/// <param name="innerException">內部異常</param>
public TcpServerExceptionOccurredEventArgs(
IPAddress[] ipAddresses, int port, Exception innerException)
{
if (ipAddresses == null)
throw new ArgumentNullException("ipAddresses");
this.Addresses = ipAddresses;
this.Port = port;
this.Exception = innerException;
}
/// <summary>
/// 服務器IP地址列表
/// </summary>
public IPAddress[] Addresses { get; private set; }
/// <summary>
/// 服務器端口
/// </summary>
public int Port { get; private set; }
/// <summary>
/// 內部異常
/// </summary>
public Exception Exception { get; private set; }
/// <summary>
/// Returns a <see cref="System.String"/> that represents this instance.
/// </summary>
/// <returns>
/// A <see cref="System.String"/> that represents this instance.
/// </returns>
public override string ToString()
{
string s = string.Empty;
foreach (var item in Addresses)
{
s = s + item.ToString() + ',';
}
s = s.TrimEnd(',');
s = s + ":" + Port.ToString(CultureInfo.InvariantCulture);
return s;
}
}