程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python tcp epoll server

編輯:Python

tcp_epoll_server.py

import signal, os
import threading
import multiprocessing as mp
import socket
import select
import errno
import numpy as np
import time
import struct
class DataReader(object):
def __init__(self):
self.buffer=b''
self.length=0
self.pos=0
def append(self,buffer):
self.buffer+=buffer
self.length=len(self.buffer)
def cursor(self):
return self.pos
def byte_remain(self):
return self.length-self.pos
def read_uint8(self):
result=0
success=False
if self.pos+1<=self.length:
temp=bytes(self.buffer[self.pos:self.pos+1])
result,=struct.unpack("B",temp)
self.pos+=1
success=True
return result,success
def read_uint16(self):
result=0
success=False
if self.pos+2<=self.length:
temp=self.buffer[self.pos:self.pos+2]
result,=struct.unpack("!H",temp)
self.pos+=2
success=True
return result,success
def read_uint32(self):
result=0
success=False
if self.pos+4<=self.length:
temp=self.buffer[self.pos:self.pos+4]
result,=struct.unpack("!I",temp)
self.pos+=4
success=True
return result,success
def read_uint64(self):
result=0
success=False
if self.pos+8<=self.length:
temp=self.buffer[self.pos:self.pos+8]
result,=struct.unpack("!Q",temp)
self.pos+=8
success=True
return result,success
def read_float(self):
result=0
success=False
if self.pos+4<=self.length:
temp=self.buffer[self.pos:self.pos+4]
result,=struct.unpack("!f",temp)
self.pos+=4
success=True
return result,success
def read_double(self):
result=0
success=False
if self.pos+8<=self.length:
temp=self.buffer[self.pos:self.pos+8]
result,=struct.unpack("!d",temp)
self.pos+=8
success=True
return result,success
def read_varint(self):
result=0
success=False
multi=1
length=self._varint_len()
if length>0:
for i in range(length):
temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
v,=struct.unpack("B",temp)
v=v&127
result=result+v*multi
multi*=128
self.pos+=length
success=True
return result,success
def _varint_len(self):
length=0
remain=self.byte_remain()
decodable=False
for i in range(remain):
length+=1
temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
v,=struct.unpack("B",temp)
if v&128==0:
decodable=True
break
if decodable is False:
length=0
return length
def varient_length(number):
length=0;
if number<=(0x7f):
length=1;
elif number<=(0x3fff):
length=2;
elif number<=(0x1fffff):
length=3;
elif number<=(0xfffffff):
length=4;
elif number<=(0x7ffffffff):
length=5;
elif number<=(0x3ffffffffff):
length=6;
elif number<=(0x1ffffffffffff):
length=7;
elif number<=(0xffffffffffffff):
length=8;
return length
def varint_encode(number):
s = b""
while True:
byte = number % 128
number = number // 128
# If there are more digits to encode, set the top bit of this digit
if number > 0:
byte = byte|0x80
s = s + struct.pack("!B", byte)
if number == 0:
return s
class DataWriter(object):
def __init__(self):
self.buffer=b''
def length(self):
return len(self.buffer)
def content(self):
return self.buffer
def write_uint8(self,v):
self.buffer+=struct.pack("B",v)
def write_uint16(self,v):
self.buffer+=struct.pack("!H",v)
def write_uint32(self,v):
self.buffer+=struct.pack("!I",v)
def write_uint64(self,v):
self.buffer+=struct.pack("!Q",v)
def write_float(self,v):
self.buffer+=struct.pack("!f",v)
def write_double(self,v):
self.buffer+=struct.pack("!d",v)
def write_varint(self,v):
length=varient_length(v)
if length>0:
self.buffer+=varint_encode(v)
else:
raise Exception("out of range")
kPaddingSize=100
class TcpPeer(object):
def __init__(self,server,conn):
self.server=server
self.conn=conn
self.msg_count=0
self.last_sent=0
self.intra_delay=0
self.trans_delay=0
self.mid=0
self.buffer=b''
self.dead=False
def __del__(self):
average_intra_delay=0
average_trans_delay=0
if self.msg_count>1:
average_intra_delay=1.0*self.intra_delay/(self.msg_count-1)
if self.msg_count>0:
average_trans_delay=1.0*self.trans_delay/(self.msg_count)
print(self.msg_count,average_intra_delay,average_trans_delay)
def incoming_data(self,buffer):
self.buffer+=buffer
all=len(self.buffer)
close=False
hz=16
if all>=hz:
reader=DataReader()
reader.append(self.buffer)
id,_=reader.read_uint32()
sent_ts,_=reader.read_uint64()
padding_size,_=reader.read_uint32()
remain=b''
if padding_size<reader.byte_remain():
remain=self.buffer[hz+padding_size:all]
if padding_size<=reader.byte_remain():
self.buffer=remain
t=time.time()
receipt_ts=int(round(t * 1000))
if self.msg_count>0:
self.intra_delay+=sent_ts-self.last_sent
delta=0
if receipt_ts>sent_ts:
delta=receipt_ts-sent_ts
self.trans_delay+=delta
self.msg_count+=1
self.last_sent=sent_ts
self.send_responce()
return close
# id (4),send_ts(8),padding_size(4),padding
def send_responce(self):
writer=DataWriter()
t=time.time()
send_time=int(round(t * 1000))
writer.write_uint32(self.mid)
writer.write_uint64(send_time)
writer.write_uint32(kPaddingSize)
p=1
for i in range(kPaddingSize):
writer.write_uint8(p)
self.conn.sendall(writer.content())
self.mid+=1
def read_event(self):
close=False
buffer=b''
length=0
try:
while True:
msg=self.conn.recv(1500)
length+=len(msg)
if msg:
buffer+=msg
else:
if buffer:
self.incoming_data(buffer)
buffer=b''
#print("only close")
close=True
break
except socket.error as e:
err = e.args[0]
#print ("error: "+str(err))
if buffer:
ret=self.incoming_data(buffer)
if ret:
close=True
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
pass
else:
close=True
#print("msglen: "+str(length))
return close
def close_fd(self):
self.dead=True
self.conn.close()
class TcpServer():
def __init__(self, mode, port):
self._thread = None
self._thread_terminate = False
if mode == "localhost":
self.ip = mode
elif mode == "public":
self.ip ="0.0.0.0"
else:
self.ip ="127.0.0.1"
self.port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind((self.ip, self.port))
self._socket.setblocking(False)
self.peers={
}
self._socket.listen(128)
self._epl= select.epoll()
self._epl.register(self._socket.fileno(),select.EPOLLIN)
def loop_start(self):
if self._thread is not None:
return
self._thread_terminate = False
self._thread = threading.Thread(target=self._thread_main)
#self._thread.daemon = True
self._thread.start()
def loop_stop(self, force=False):
if self._thread is None:
return
self._thread_terminate = True
if threading.current_thread() != self._thread:
self._thread.join()
self._thread = None
def loop_once(self):
epoll_list = self._epl.poll(0)
for fd,events in epoll_list:
if fd == self._socket.fileno():
conn,addr =self._socket.accept()
conn.setblocking(False)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
peer=TcpPeer(self,conn)
self.peers.update({
conn.fileno():peer})
self._epl.register(conn.fileno(), select.EPOLLIN)
elif events == select.EPOLLIN:
ret=self.peers[fd].read_event()
if ret:
#print("close")
self._close(fd)
for fd in list(self.peers.keys()):
if self.peers[fd].dead:
self._close(fd)
def _thread_main(self):
while True:
self.loop_once()
if self._thread_terminate is True:
self.shutdown()
break
def _close(self,fd):
if fd==self._socket.fileno():
self._epl.unregister(fd)
self._socket.close()
elif fd in self.peers:
self._epl.unregister(fd)
self.peers[fd].close_fd()
self.peers.pop(fd)
def shutdown(self):
for fd, peer in self.peers.items():
self._epl.unregister(fd)
peer.close_fd()
self.peers.clear()
self._close(self._socket.fileno())
self._epl.close()
Terminate=False
def signal_handler(signum, frame):
global Terminate
Terminate =True
if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
tcp_server=TcpServer("localhost",5555)
while True:
tcp_server.loop_once()
if Terminate:
tcp_server.shutdown()
break

The client I use during test, tcp_client.cc.

#include <stdio.h>
#include <assert.h>
#include <signal.h>
#include <iostream>
#include <string>
#include <memory.h>
#include <chrono>
#include <errno.h> // for errno and strerror_r
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> //for sockaddr_in
#include <arpa/inet.h> //inet_addr
#include <netinet/tcp.h> //TCP_NODELAY
inline int64_t WallTimeNowInUsec(){
std::chrono::system_clock::duration d = std::chrono::system_clock::now().time_since_epoch();
std::chrono::microseconds mic = std::chrono::duration_cast<std::chrono::microseconds>(d);
return mic.count();
}
inline int64_t TimeMillis(){
return WallTimeNowInUsec()/1000;
}
static uint16_t HostToNet16(uint16_t x) { return __builtin_bswap16(x); }
static uint32_t HostToNet32(uint32_t x) { return __builtin_bswap32(x); }
static uint64_t HostToNet64(uint64_t x) { return __builtin_bswap64(x); }
static uint16_t NetToHost16(uint16_t x) { return HostToNet16(x); }
static uint32_t NetToHost32(uint32_t x) { return HostToNet32(x); }
static uint64_t NetToHost64(uint64_t x) { return HostToNet64(x); }
const int kPaddingSize=100;
const int kBufferSize=1500;
class TcpClient{
public:
TcpClient(int many);
~TcpClient();
bool Init(const char *ip,uint16_t port);
bool Loop();
private:
int SendMessage();
int WaitReply();
void ParserBuffer();
void CloseFd();
int many_=0;
int sockfd_=-1;
int mid_=0;
int counter_=0;
int64_t last_send_delay_=0;
int64_t inter_send_delay_=0;
int64_t trans_delay_=0;
std::string rb_;
};
TcpClient::TcpClient(int many):many_(many){}
TcpClient::~TcpClient(){
CloseFd();
double average_inter_delay=0;
double average_trans_delay=0;
double temp=0.0;
if(counter_>1){
average_inter_delay=1.0*inter_send_delay_/(counter_-1);
}
if(counter_>0){
average_trans_delay=1.0*trans_delay_/counter_;
}
std::cout<<"dtor "<<counter_<<" "<<average_inter_delay<<" "<<average_trans_delay<<std::endl;
}
bool TcpClient::Init(const char *ip,uint16_t port){
bool success=true;
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr(ip);
servaddr.sin_port = htons(port);
int flag = 1;
if ((sockfd_= socket(AF_INET, SOCK_STREAM, 0)) < 0){
std::cout<<"Error : Could not create socket"<<std::endl;
success=false;
return success;
}
setsockopt (sockfd_, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
if (connect(sockfd_,(struct sockaddr *)&servaddr,sizeof(servaddr)) != 0) {
std::cout<<"connection with the server failed"<<std::endl;
CloseFd();
success=false;
return success;
}
return success;
}
bool TcpClient::Loop(){
bool done=true;
if(mid_<many_&&sockfd_>0){
int sent=SendMessage();
if(sent>0&&WaitReply()>0){
done=false;
}
}
return done;
}
int TcpClient::SendMessage(){
char buffer[kBufferSize]={0};
int off=0;
int64_t now=TimeMillis();
uint32_t id=HostToNet32(mid_);
uint64_t sent_ts=HostToNet64(now);
uint32_t pad_size=HostToNet32(kPaddingSize);
mid_++;
memcpy(buffer+off,(void*)&id,sizeof(id));
off+=sizeof(id);
memcpy(buffer+off,(void*)&sent_ts,sizeof(sent_ts));
off+=sizeof(sent_ts);
memcpy(buffer+off,(void*)&pad_size,sizeof(pad_size));
off+=sizeof(pad_size);
off+=kPaddingSize;
return write(sockfd_,buffer,off);
}
int TcpClient::WaitReply(){
char buffer[kBufferSize]={0};
int ret=recv(sockfd_, buffer, kBufferSize, 0);
if(ret>0){
int old=rb_.size();
rb_.resize(old+ret);
memcpy(&rb_[old],buffer,ret);
ParserBuffer();
}
return ret;
}
void TcpClient::ParserBuffer(){
int hz=16;
int off=0;
uint32_t id=0;
uint64_t send_time=0;
uint32_t pad_size=0;
if (rb_.size()>=hz){
memcpy(&id,&rb_[off],sizeof(id));
off+=sizeof(id);
id=NetToHost32(id);
memcpy(&send_time,&rb_[off],sizeof(send_time));
off+=sizeof(send_time);
send_time=NetToHost64(send_time);
memcpy(&pad_size,&rb_[off],sizeof(pad_size));
off+=sizeof(pad_size);
pad_size=NetToHost32(pad_size);
off+=pad_size;
if(rb_.size()>=off){
int remain=rb_.size()-off;
int64_t now=TimeMillis();
if(counter_>0){
inter_send_delay_+=send_time-last_send_delay_;
}
if(now>send_time){
trans_delay_+=now-send_time;
}
counter_++;
last_send_delay_=send_time;
if(remain>0){
const char *data=&rb_[off];
std::string copy(data,remain);
copy.swap(rb_);
}else{
std::string null_str;
null_str.swap(rb_);
}
}
}
}
void TcpClient::CloseFd(){
if(sockfd_>0){
close(sockfd_);
sockfd_=-1;
}
}
static volatile bool g_running=true;
void signal_exit_handler(int sig)
{
g_running=false;
}
int main(int argc, char *argv[]){
signal(SIGTERM, signal_exit_handler);
signal(SIGINT, signal_exit_handler);
signal(SIGHUP, signal_exit_handler);//ctrl+c
signal(SIGTSTP, signal_exit_handler);//ctrl+z
std::string server_ip="127.0.0.1";
uint16_t server_port=5555;
int many=100;
TcpClient client(many);
if(client.Init(server_ip.c_str(),server_port)){
while(!client.Loop()&&g_running){}
}
return 0;
}

Reference:
[1] How To Use Linux epoll with Python
[2] python socket編程 tcp 簡單示例


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved