程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> 異步網絡消息處理框架,異步消息框架

異步網絡消息處理框架,異步消息框架

編輯:關於C語言

異步網絡消息處理框架,異步消息框架


最近一段時間將原來寫的kendynet網絡框架重寫了大部分的代碼,讓提供的接口更清晰,對用戶更友好。

整個框架的架構分層3層:

1)單線程,基於原始數據流的網絡接口,在這一層上,沒有提供封包的處理,定時器事件等等。使用者可以在此之上按自己的需求做進一步的封裝。

2)單線程,提供connection,封包處理,接收發送超時處理。

3)網絡邏輯分離的異步網絡框架,抽象出三個主要的類型:asynnet_t,sock_ident和msgdisp_t.

asynnet_t:網絡處理引擎,使用者創建實例的時候可以傳入pollercount參數,其中每一個poller都會在單獨的線程中運行.

sock_ident:邏輯層操作的套接口封裝,可以安全的在多線程環境下使用.

msgdisp_t:消息分離器,每個分離器有一個對應的消息隊列用於接收從網絡線程傳遞過來的消息.

msgdisp_t提供了兩種使用模式:

第一種:典型的線程池模式。在這種模式下,可以創建一個消息分離器,多個邏輯線程對這個消息分離器調用

msg_loop.

第二種:共用網絡層模式。在一個進程中啟動N個線程,每個線程運行一個不同服務,所有這些服務共用網絡通信層.

在這種情況下,網絡消息需要路由到正確的服務那裡.可以每個線程都創建一個消息分離器,各線程在自己的消息分離器上

調用msg_loop處理只屬於自己的消息.

下面是一個異步網絡服務器的示例:

復制代碼
#include <stdio.h>
#include <stdlib.h>
#include "core/msgdisp.h"
#include "testcommon.h"

uint32_t recvsize = 0;
uint32_t recvcount = 0;

///int32_t asynnet_bind(msgdisp_t disp,sock_ident sock,void *ud,int8_t raw,uint32_t send_timeout,uint32_t recv_timeout)
void asynconnect(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnect\n");
    disp->bind(disp,sock,1,0,30*1000);
}

void asynconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnected\n");
   ++client_count;
}

void asyndisconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port,uint32_t err)
{
    --client_count;
}


int32_t asynprocesspacket(msgdisp_t disp,sock_ident sock,rpacket_t rpk)
{
    recvsize += rpk_len(rpk);
    recvcount++;
    asyn_send(sock,wpk_create_by_other((struct packet*)rpk));
    return 1;
}


void asynconnectfailed(msgdisp_t disp,const char *ip,int32_t port,uint32_t reason)
{

}


int main(int argc,char **argv)
{
    setup_signal_handler();
    InitNetSystem();
    asynnet_t asynet = asynnet_new(1);
    msgdisp_t  disp = new_msgdisp(asynet,
                                  asynconnect,
                                  asynconnected,
                                  asyndisconnected,
                                  asynprocesspacket,
                                  asynconnectfailed);
    int32_t err = 0;
    disp->listen(disp,argv[1],atoi(argv[2]),&err);
    uint32_t tick,now;
    tick = now = GetSystemMs();
    while(!stop){
        msg_loop(disp,50);
        now = GetSystemMs();
        if(now - tick > 1000)
        {
            uint32_t elapse = now-tick;
            recvsize = (recvsize/elapse)/1000;
            printf("client_count:%d,recvsize:%d,recvcount:%d\n",client_count,recvsize,recvcount);
            tick = now;
            packet_send_count = 0;
            recvcount = 0;
            recvsize = 0;
        }
    }
    CleanNetSystem();
    return 0;
}
復制代碼

項目代碼在:https://github.com/sniperHW/luanet

目前只實現了對linux,tcp的網絡支持,後續將會先完善這部分代碼,並在此之上提供基於user level thread的RPC支持.

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved