服务端: 功能:保存所有客户端信息和在线状态,统一分配端口.对掉线的客户端信息通知到进程控制模块 ServerService.h
#ifndef _SERVERSERVICE_H_ #define _SERVERSERVICE_H_ #include <map> #include <string> #include <sstream> #include <fstream> #include "ace/TP_Reactor.h" #include "ace/SOCK_Dgram.h" #include "ace/Task.h" namespace YB { const static ACE_UINT32 iLocalHostPort = 8001; typedef unsignedchar BYTE; static ACE_Thread_Mutex slock; typedef struct STAPPID_ { std::stringsIp; // 客户端IP int iPort; // 客户端连接到服务端的端口 int ClientAcceptPort; // 服务端分配的客户端监听端口 BYTE byAppid; // 模块标识 BYTE byGroup; // 组号 int iTime; // 时计器,维护客户端在线状态 bool bOnline; // 在线状态 }STAppid; /* 服务端UDP数据收发 */ class CMain; class CServerService: public ACE_Event_Handler { public: // 构造、析构函数 CServerService(); ~CServerService(); virtual ACE_HANDLE get_handle(void) const; // 继承基类 virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 有网络数据到达 virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听
bool Open(CMain* cm); void SendToAll(char* buf,int nSize); // 消息群发 intDeleteAppid(BYTE byappid,std::string sip); // 删除指定客户端信息 bool ReadServerIP(); // 读配置IP private: void UpdateClientAllSatte(STAppid stAppid,ACE_INET_Addr taddr); // 更新客户端状态 void UpdateState(YB::BYTE byappid); // 客户端在线 void MsgData(YB::BYTE byappid); // 消息报文 void ChackProtocol(const char* buf,const int nSize); // 解析报文 void ApplyConn(const char *buf,const int nSize); // 应答客户申请连接 void AllotPort(unsigned short &uiPort); // 检查分配客户端端口是否重复,并分配新的 BYTE checkFrame(unsigned char* uc_buf,unsigned short uc_length);// 帧校验 void fixFrame(unsigned char* uc_buf,unsigned char& uc_Length,// 组帧 unsigned char flag,unsigned char dest_addr,unsigned char src_addr); void CheckAppid(BYTE byappid,std::string sIp); public: std::map<BYTE,STAppid> mpInfo; // 注册客户端信息表 private: ACE_INET_Addr addr; ACE_SOCK_Dgram udp; std::stringsServerIP; unsigned short usiPort; // 分配客户端端口值 STAppid stAppid; int iPortCount; CMain* cmn; };
// 定时器类,监视在线客户端, class CTaskTimer : public ACE_Task_Base { public: // 构造、析构函数 CTaskTimer(){timeid = 0;}; virtual ~CTaskTimer(){}; virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听 public: bool Open(CServerService *udp); inthandle_timeout(const ACE_Time_Value ¤t_time,const void *act = 0);// 定时器,清理掉线客户端
private: CServerService *sudp; long timeid; }; // 主调类,负责启动定时器和网络监听 class CMain : public ACE_Task_Base { public: // 构造、析构函数 CMain(){}; ~CMain(){}; public: bool Open(); intClose(); private: CServerService *serudp; CTaskTimer*taskTimer; }; } #endif// end of _SERVERSERVICE_H_
ServerService.cpp #include "./ServerService.h" using namespace YB; bool CMain::Open() { ACE_NEW_RETURN(serudp,CServerService,false); ACE_NEW_RETURN(taskTimer,CTaskTimer,false); serudp->reactor(ACE_Reactor::instance()); if (!serudp->Open(this)) return false; taskTimer->reactor(ACE_Reactor::instance()); if (!taskTimer->Open(serudp)) return false; ACE_OS::sleep(ACE_Time_Value(0,10000)); // 等待10毫秒 ACE_Reactor::instance()->run_reactor_event_loop(); // 启动线程 return true; } int CMain::Close() { taskTimer->handle_close(ACE_INVALID_HANDLE,0); serudp->handle_close(ACE_INVALID_HANDLE,0); if (ACE_Reactor::instance()->reactor_event_loop_done() != 1) { ACE_Reactor::instance()->end_reactor_event_loop(); } return 0; } /////////////////////////////////////////// CServerService::CServerService() { usiPort = 20000; // 初使分配客户端端口值 iPortCount = 0; } CServerService::~CServerService() { } bool CServerService::ReadServerIP() { std::ifstream fle("ServerIp.txt",std::ios::in); if (!fle) return false;
std::ostringstream seamServerIP; seamServerIP<<fle.rdbuf(); sServerIP = seamServerIP.str(); fle.close(); return true; } bool CServerService::Open(CMain* cm) { if (!ReadServerIP()) return false; this->addr.set(iLocalHostPort,sServerIP.c_str());//,ACE_LOCALHOST this->udp.open(addr); this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK); cmn = cm; return true; } ACE_HANDLE CServerService::get_handle() const { return this->udp.get_handle(); } intCServerService::handle_input(ACE_HANDLE) { ACE_INET_Addr taddr; char buf[255] = {0}; int isize = 0; isize = this->udp.recv(buf,255,taddr); stAppid.iPort = taddr.get_port_number(); stAppid.sIp = taddr.get_host_addr();
if (isize > 0 && isize < 255) ChackProtocol(buf,isize); return 0; } intCServerService::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { if (this->udp.get_handle() != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL; this->reactor()->remove_handler(this,m); this->udp.close(); } delete this; return 0; } unsigned char CServerService::checkFrame(unsigned char* uc_buf,unsigned short uc_length) { //检查报文 return 1; } void CServerService::fixFrame( unsigned char* uc_buf, unsigned char& uc_Length, unsigned char flag, unsigned char dest_addr, unsigned char src_addr) { //组装报文 return ; } void CServerService::ChackProtocol(const char* buf,const int nSize) { YB::BYTE *p = (YB::BYTE*)buf;
if (checkFrame(p,nSize)) return; switch (*(p + 11)) { case 0x00:// 心跳 { UpdateState(*(p + 6)); if (*(p + 2) == 0x02) MsgData(*(p + 6)); break; } case 0x01:// 我要处理的类型 { switch (*(p + 15)) { case 0x00:// 正常退出,离线状态 { DeleteAppid(*(p + 23),stAppid.sIp); break; } case 0x02: // 申请连接 { ApplyConn(buf,nSize); break; } default: break; } break; } case 0x02: // 退出 { if (*(p + 15) == 0x04 && *(p + 6) == 0x01) cmn->Close(); break; } default: break; } } void CServerService::ApplyConn(const char *buf,const int nSize) {
ACE_INET_Addr taddr; YB::BYTE isize = 0x0C; charpuf[255] = {0}; char *p = (char*)buf; AllotPort(usiPort); stAppid.ClientAcceptPort = usiPort; stAppid.byAppid = *(p + 6); stAppid.byGroup = *(p + 16); CheckAppid( stAppid.byAppid,stAppid.sIp.c_str() ); taddr.set(usiPort,stAppid.sIp.c_str()); ACE_UINT32 ip = taddr.get_ip_address(); u_shortiprt = taddr.get_port_number(); /*组帧 strcpy(puf,"/x01/x01/x01/x0C/x03"); puf[5] = stAppid.byGroup; memcpy(puf + 6,&ip,sizeof(ACE_UINT32)); memcpy(puf + 6 + sizeof(ACE_UINT32),&iprt,sizeof(u_short)); fixFrame((unsigned char*)puf,isize,0x01,stAppid.byAppid,0x04); taddr.set(stAppid.iPort,stAppid.sIp.c_str()); */ this->udp.send(puf,taddr); /* // 向其他客户端更新信息 isize = 0x0D; memset(puf,0x00,255); strcpy(puf,"/x01/x01/x01/x0D/x01"); puf[5] = stAppid.byGroup; memcpy(puf + 6,sizeof(u_short)); memcpy(puf + 6 + sizeof(ACE_UINT32) + sizeof(u_short),&stAppid.byAppid,sizeof(YB::BYTE)); fixFrame((unsigned char*)puf,0x04); */ SendToAll(puf,isize); // 向新增加客户更新信息 UpdateClientAllSatte(stAppid,taddr); // 增加新的客户端到链表 stAppid.iTime = 1; stAppid.bOnline = true; slock.acquire(); mpInfo.insert(std::make_pair(stAppid.byAppid,stAppid)); slock.release(); } void CServerService::CheckAppid(YB::BYTE byappid,std::string sIp) { std::map<YB::BYTE,STAppid>::iterator mpIter; for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++) { if (byappid == mpIter->second.byAppid && sIp == mpIter->second.sIp ) { DeleteAppid(byappid,sIp); ACE_OS::sleep(1); } } } void CServerService::AllotPort(unsigned short &uiPort) { if (uiPort > 65500) { uiPort = 20000; iPortCount++; } if (iPortCount < 1) { uiPort++; //增加分配的端口号 } else { std::map<YB::BYTE,STAppid>::iterator mpIter; for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++) { if (uiPort == mpIter->second.ClientAcceptPort) { uiPort++; mpIter = mpInfo.begin(); } } } } int CServerService::DeleteAppid(YB::BYTE byappid,std::string sip) { std::map<YB::BYTE,STAppid>::iterator mpIter; YB::BYTE isize = 0x0D; boolb_isfind = false; charpuf[255] = {0}; slock.acquire(); for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++) { if (mpIter->first != byappid) break; if (mpIter->second.sIp != sip) continue;
ACE_INET_Addr taddr(mpIter->second.ClientAcceptPort,sip.c_str()); ACE_UINT32ip = taddr.get_ip_address(); u_short iprt = taddr.get_port_number(); /*组帧 memset(puf,255); isize = 0x0D; strcpy(puf,"/x01/x01/x01/x0D"); puf[5] = mpIter->second.byGroup; memcpy(puf + 6,&mpIter->second.byAppid,0x04); */ mpInfo.erase(mpIter); b_isfind = true; break; } slock.release(); // 广播到各客户端 if (b_isfind) SendToAll(puf,isize); return 0; } void CServerService::SendToAll(char* buf,int nSize) { std::map<YB::BYTE,STAppid>::iterator mpIter; ACE_INET_Addr taddr; for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++) { taddr.set(mpIter->second.iPort,mpIter->second.sIp.c_str()); int bychar = (unsigned char)buf[10] - mpIter->second.byAppid; buf[10] = mpIter->second.byAppid; buf[nSize - 2] -= bychar; this->udp.send(buf,nSize,taddr); } } void CServerService::UpdateState(YB::BYTE byappid)// 客户端在线 { std::map<YB::BYTE,STAppid>:: iterator mpIter;
for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++) { if (mpIter->first != byappid) break; if (stAppid.sIp != mpIter->second.sIp) continue; mpIter->second.iTime = 1; break; } } void CServerService::MsgData(YB::BYTE byappid) { std::map<YB::BYTE,STAppid>::iterator mpIter; for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++) { if (mpIter->first != byappid) break; if (mpIter->second.sIp != stAppid.sIp) continue; ACE_INET_Addr taddr(stAppid.iPort,stAppid.sIp.c_str()); char puf[255] = {0}; unsigned char iszie = 0x04; strcpy(puf,"/xf0/x01/x01/x04"); fixFrame((unsigned char*)puf,iszie,byappid,0x04); this->udp.send(puf,taddr); break; } } void CServerService::UpdateClientAllSatte(STAppid stAppid,ACE_INET_Addr taddr) { std::map<YB::BYTE,STAppid>::iterator mpIter; ACE_INET_Addr taddr1; unsigned char isize = 0x0D; ACE_UINT32ip = 0; u_short iprt = 0; char puf[255] = {0};
ACE_Time_Value t(0,100000); ACE_OS::sleep(t); for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++) { taddr1.set(mpIter->second.ClientAcceptPort,mpIter->second.sIp.c_str()); ip = taddr1.get_ip_address(); iprt = taddr1.get_port_number(); /* memset(puf,"/x01/x01/x01/x0D/x01"); puf[5] = mpIter->second.byGroup; memcpy(puf + 6,0x04); */ this->udp.send(puf,taddr); t.set(0,10000); ACE_OS::sleep(t); } } ////////////////////////////////////////////// ///* boolCTaskTimer::Open(CServerService *udp) { sudp = udp; ACE_Time_Value idlay(1); ACE_Time_Value ival(60); timeid = this->reactor()->schedule_timer(this,idlay,ival); return true; } int CTaskTimer::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { if (timeid) this->reactor()->cancel_timer(this); delete this; return 0; } int CTaskTimer::handle_timeout(const ACE_Time_Value ¤t_time,const void *act) { std::map<YB::BYTE,STAppid>::iterator mpIter; for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++) { mpIter->second.bOnline = mpIter->second.iTime ? true : false; mpIter->second.iTime = 0; } // 删除掉线客户端 for (mpIter = sudp->mpInfo.begin(); mpIter != sudp->mpInfo.end(); mpIter++) { if (!mpIter->second.bOnline) { sudp->DeleteAppid(mpIter->second.byAppid,mpIter->second.sIp); mpIter = sudp->mpInfo.begin(); if (mpIter == sudp->mpInfo.end()) break; } } return 0; }
执行程序
main.cpp #include "./ServerService.h" int ACE_TMAIN(int argc,char* argv[]) { YB::CMain cmain; if (!cmain.Open()) cmain.Close(); return 0; }
客户端DLL 功能:根据模块号与其他客户进行连接.数据转发 DLL.cpp
#include "./NetWork.h" #include "./os_fun.h" class _mydllexport CBaseInNetwork { public: CBaseInNetwork(); virtual ~CBaseInNetwork(); virtual bool InitNetwork(unsignedchar byAppid,int igroup,int itype); virtual void CloseNetwork(); virtual int SendData(const unsigned char* buf,const int nSize,unsignedchar byAppid ); virtual int GetDataLength(); virtual int GetData(unsigned char* buf); private:
CNetWork *pnet; }; CBaseInNetwork::CBaseInNetwork() { } CBaseInNetwork::~CBaseInNetwork() { } bool CBaseInNetwork::InitNetwork( unsignedchar byAppid,int itype) { ACE::init(); bool bRetVal = false; pnet =new CNetWork(); bRetVal = pnet->Open( byAppid,igroup,itype ); return bRetVal; } int CBaseInNetwork::SendData(const unsigned char* buf,unsignedchar byAppid ) { return pnet->SendData( buf,byAppid ); } int CBaseInNetwork::GetDataLength() { return pnet->GetDataLength(); } int CBaseInNetwork::GetData(unsigned char* buf) { return pnet->GetData( buf ); } void CBaseInNetwork::CloseNetwork( ) { pnet->Close(); delete pnet; pnet = NULL; ACE::fini(); }
os_fun.h #ifndef _OS_FUN_H_ #define _OS_FUN_H_ #ifdef WIN32 #include "./win32_fun.hpp" #endif #ifdef linux #include "./linux_fun.hpp" #endif #endif // end of _OS_FUN_H_
win32_fun.hpp #ifndef __WIN32_FUN_H #define __WIN32_FUN_H #include <windows.h> #define _mydllexport extern "C" _declspec(dllexport) BOOL WINAPI DllMain(HANDLE hModule, DWORDul_reason_for_call, LPVOID lpReserved ) { switch (ul_reason_for_call) { case DLL_PROCESS_ATTACH: case DLL_THREAD_ATTACH: case DLL_THREAD_DETACH: case DLL_PROCESS_DETACH: break; } return TRUE; } #endif // end of __WIN32_FUN_H
linux_fun.hpp #ifndef __LINUX_FUN_H #define __LINUX_FUN_H #define _mydllexport #endif // end of __LINUX_FUN_H
BaseNetWork.h
#ifndef _BASENETWORK_H_ #define _BASENETWORK_H_ namespace YB { class CBaseNetWork { protected: CBaseNetWork(void){}; public: virtual ~CBaseNetWork(void){}; virtual bool Open(unsignedchar byAppid,int igroup = 0,int itype = 0) = 0; //strAppid自己 virtual int SendData(const unsigned char* buf,unsignedchar byAppid = 0xff) = 0; virtual int GetData(unsigned char* buf) = 0; virtual int GetDataLength() = 0; virtual void Close() = 0;
}; } #endif // end of _BASENETWORK_H_
#ifndef _NETWORK_H_ #define _NETWORK_H_ #include <map> #include <list> #include <string> #include <fstream> #include <sstream> #include "ace/TP_Reactor.h" #include "ace/SOCK_Dgram.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/SOCK_Connector.h" #include "ace/Task.h" const static ACE_UINT32 iServerPort = 8001; typedef unsignedchar Byte; static ACE_Thread_Mutex mlocka; static ACE_Thread_Mutex mlock_mp; const static Byte ServerAppid = 0x04; class CNetWork; /* 与服务端建立连接,收发信息 */ class CServerUdpcs : public ACE_Event_Handler { public: // 构造、析构函数 CServerUdpcs(){}; ~CServerUdpcs(){}; virtual ACE_HANDLE get_handle()const; virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 网络读事件 virtual inthandle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络事件 public: intOpen_Event(); // 注册事件 bool Open(Byte byappid,Byte bygroup); intSendData(const char* buf,const int nSize); intGetData(char* buf,const int nSize); void SetParentHander(CNetWork *p); // 设置主类指针,引用数据连表
public: intiClientStateSign; // 客户状态变化标志;1有变化,0变化 private: void ChackProtocol(const char* buf,const int nSize); // 解析报文 void UpdateMapInfo(const char* buf,const int nSize); // 更新客户端状态 void ReadDataToList(const char* buf,const int nSize); // 保存数据到连表 void SaveMapinfo(const char* buf); // 保存服务器发送的其他客户端信息 void SaveLocalhost(const char* buf); // 保存本机IP和服务器分配的端口 intGetSourcePort(const char* buf); // 得到端口 std::string GetSourceIp(const char* buf); // 得到IP地址 private: ACE_INET_Addr praddr; ACE_SOCK_Dgram udp; //UDP协议流 CNetWork*net; }; /* 客户端建立监听服务 */ class CClientAcceptor : public ACE_Event_Handler { public: // 构造、析构函数 CClientAcceptor(){}; ~CClientAcceptor(){}; virtual ACE_HANDLE get_handle()const; virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 接受客户端连接 virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络事件 public: intopen(void* avg = 0); void SetParentHander(CNetWork *p); private: ACE_SOCK_Acceptor acp; CNetWork* net; }; class CClientService; // 保存在线客户端连接 typedef struct STAPPIDCS { std::stringsIp; int iPort; Byte byAppid; Byte byGroup; // 组号 CClientService *pcs; // 客户端连接 }STAppidcs; // 保存其他客户发送到本站的数据 typedef struct STLISTDATA { std::stringsAppid; int iLength; Byte *byData; }STListData; /* 点对点数据收发 */ class CClientService : public ACE_Event_Handler { public: // 构造、析构函数 CClientService(void){}; ~CClientService(){}; // 继承基类 virtual ACE_HANDLE get_handle()const; virtual inthandle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE); // 接受数据,保存到连表 virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask mask); // 退出连接,删除资源 public: intOpen(); void SetParentHander(CNetWork *p); intconnect(ACE_INET_Addr addr); // 连接到其他客户端 intSendData(const char* buf,const int nSize); // 发送数据到其他客户端 ACE_SOCK_Stream &peer(){return sockeam;}
private: void AddAppid(STAppidcs STAppidcs); // 保存客户端连接 void DeleteAppid(CClientService *cs); // 删除客户端连接 void ReadDataToList(const char* buf,const int nSize); // 保存数据到连表 void ChackProtocol(const char* buf,const int nSize); // 解析报文 private: ACE_SOCK_Connector con; ACE_SOCK_Streamsockeam; STAppidcs stAppidcs; // 保存最近一个客户端连接信息 CNetWork *net; }; class CTaskTimer; /* 用户服务类,数据保存 */ class CNetWork:public ACE_Task_Base { public: // 构造、析构函数 CNetWork(void); virtual ~CNetWork(void); public: bool Open(Byte byappid,int itype = 0); // 服务器地址;strAppid自己 intSendData(const Byte* buf,Byte byappid = 0xff); intGetDataLength(); //返回数据长度 intGetData(Byte* buf); //返回数据 void Close(); bool ReadServerIP(); //取系统IP intsvc(void); // 线程回调函数
public: std::map<Byte,STAppidcs> mpInfo; // 客户端信息连表 std::list<STListData>lstData; // 数据连表 intiprtype; // 是否主控模块标识 Byte byAppid; // 本机APPID intiport; // 服务端分配PORT, std::string sip; // 本机IP Byte byGroup; // 组号 STAppidcsstAppidcs; // 保存最近一个服务端返回其他客户信息CServerUdpcs.SaveMapinfo使用 CServerUdpcs *pServerUdp; CClientAcceptor *pCAcptor; CClientService*pCService; CTaskTimer*ptimer; std::stringsServerIP; std::stringsLocalIP; bool b_run; private: int GroupSend(const char* buf,const int nSize); // 组群发 int SingleSend(const char* buf,Byte byappid); // 单发
}; // 定时器类 class CTaskTimer : public ACE_Task_Base { public: // 构造、析构函数 CTaskTimer(){timeid = 0;}; virtual ~CTaskTimer(){}; virtual inthandle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); // 关闭网络监听 public: bool Open(CNetWork *p); inthandle_timeout(const ACE_Time_Value ¤t_time,发送心跳报文
private: CNetWork*pnet; long timeid; };
#endif // end of _NETWORK_H_
NetWork.cpp
#include "./NetWork.h"unsigned char checkFrame(unsigned char* uc_buf,unsigned short uc_length) {return 0;}void fixFrame(unsigned char* uc_buf,unsigned char flag,unsigned char src_addr){;}//////////////////////////////////////////////*CNetWork::CNetWork(){}CNetWork::~CNetWork(){}bool CNetWork::Open(Byte byappid,int itype){b_run = false;iprtype = itype;byAppid = byappid;byGroup = (Byte)igroup;ACE_NEW_RETURN(pServerUdp,CServerUdpcs,false);ACE_NEW_RETURN(pCAcptor,CClientAcceptor,false);ACE_NEW_RETURN(ptimer,false);if (!ReadServerIP()) return false;// 向服务端申请连接,登陆到服务端pServerUdp->SetParentHander(this);if (!pServerUdp->Open(byAppid,byGroup)) return false;// 自监听pCAcptor->SetParentHander(this);// 开启线程activate();b_run = true;return true;}int CNetWork::svc(void){ACE_Reactor rt;this->reactor(&rt);this->pServerUdp->reactor(&rt);this->pCAcptor->reactor(&rt); this->ptimer->reactor(&rt);this->pServerUdp->Open_Event();this->pCAcptor->open();this->ptimer->Open(this);rt.run_reactor_event_loop();ACE_OS::sleep(ACE_Time_Value(1));return 0;}int CNetWork::SendData(const Byte* buf,Byte byappid){int isize = 0;if (byappid == 0xff) // 组发{isize = GroupSend((char*)buf,nSize);}else if (byappid == ServerAppid) // 心跳{isize = pServerUdp->SendData((char*)buf,nSize);}else // 单发点到点{isize = SingleSend((char*)buf,byappid);}return isize;}int CNetWork::GetDataLength(){if (iprtype && pServerUdp->iClientStateSign) return (mpInfo.size() + 17);if (lstData.empty()) return 0;std::list<STListData>::iterator lstIter;lstIter = lstData.begin();return lstIter->iLength;}int CNetWork::GetData(Byte* buf){ if (iprtype && pServerUdp->iClientStateSign){std::map<Byte,STAppidcs>::iterator mpIter;unsigned char isize = 0;pServerUdp->iClientStateSign = 0;strcpy((char*)buf,"/x03/x01/x01");buf[3] = mpInfo.size() + 4;isize = 4;for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++){ if (mpIter->second.sIp == sip) buf[isize++] = mpIter->second.byAppid;}fixFrame(buf,byAppid,ServerAppid);return isize;}if(lstData.empty()) return 0;std::list<STListData>::iterator lstIter;int ilen = 0;// 返回用户数据mlocka.acquire();lstIter = lstData.begin();ilen = lstIter->iLength;memcpy(buf,lstIter->byData,ilen);delete []lstIter->byData;lstData.erase(lstIter);mlocka.release();return ilen;}void CNetWork::Close(){std::map<Byte,STAppidcs>::iterator mpIter;Byte isize = 0x0D; charbuf[255] = {0};ACE_INET_Addr taddr(iport,sip.c_str());ACE_UINT32ip = taddr.get_ip_address();u_short iprt = taddr.get_port_number();/*strcpy(buf,"/x01/x01/x01/x0D");buf[5] = byGroup; memcpy(buf + 6,sizeof(ACE_UINT32));memcpy(buf + 6 + sizeof(ACE_UINT32),sizeof(u_short));memcpy(buf + 6 + sizeof(ACE_UINT32) + sizeof(u_short),&byAppid,sizeof(Byte));fixFrame((unsigned char*)buf,ServerAppid,byAppid);*/isize = pServerUdp->SendData(buf,isize); // 关闭监听事件ptimer->handle_close(ACE_INVALID_HANDLE,0);pCAcptor->handle_close(ACE_INVALID_HANDLE,0);pServerUdp->handle_close(ACE_INVALID_HANDLE,0);// 关闭网络事件for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); mpIter++){if (mpIter->second.pcs != NULL){ mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE,0); mpIter->second.pcs = NULL;}}// 清除连表mlock_mp.acquire();mpInfo.clear();mlock_mp.release();if (b_run && this->reactor()->reactor_event_loop_done() != 1) this->reactor()->end_reactor_event_loop(); }int CNetWork::GroupSend(const char* buf,const int nSize){std::map<Byte,STAppidcs>::iterator mpIter;int isize = 0;for (mpIter = mpInfo.begin(); mpIter!= mpInfo.end(); mpIter++){if (mpIter->second.byGroup == byGroup) { isize = SingleSend(buf,mpIter->second.byAppid);}} return isize;}int CNetWork::SingleSend(const char* buf,Byte byappid) {std::map<Byte,STAppidcs>::iterator mpIter;Byte isize = 0;for (mpIter = mpInfo.find(byappid); mpIter != mpInfo.end(); mpIter++){if (mpIter->first != byappid) break; if (mpIter->second.pcs != NULL) // 已有连接{ CClientService* p = mpIter->second.pcs; isize = p->SendData(buf,nSize);}else // 无连接,新建连接到对端{ CClientService* cs; ACE_INET_Addr addr(mpIter->second.iPort,mpIter->second.sIp.c_str()); ACE_NEW_RETURN(cs,CClientService,0); cs->SetParentHander(this); cs->reactor(this->reactor()); if (cs->connect(addr) != -1) { charpuf[255] = {0}; cs->Open(); mpIter->second.pcs = cs; // 将连接保存到连表 strcpy(puf,"/x01/x01/x01/x06/x02"); puf[5] = byGroup; isize = 0x06; fixFrame((unsigned char*)puf,byAppid); isize = mpIter->second.pcs->SendData(puf,isize); // 等待 ACE_Time_Value t(1); ACE_OS::sleep(t); isize = mpIter->second.pcs->SendData(buf,nSize); } else { delete cs; }} }return isize;}bool CNetWork::ReadServerIP(){char sip1[20] = {0};char sip2[20] = {0};FILE* fp = NULL;if ((fp = fopen("./ServerIp_c.txt","r")) == NULL) return false;fscanf(fp,"%s %s",sip1,sip2);fclose(fp);sServerIP = sip1;sLocalIP = sip2;return true;}/////////////////////////////////////////////////////////////////bool CServerUdpcs::Open(Byte byappid,Byte bygroup){ACE_INET_Addr taddr;char puf[255] = {0};Byteisize = 0x06;ACE_Time_Value t(3);ACE_INET_Addr taddr_local(net->sLocalIP.c_str());udp.open(taddr_local);taddr.set(iServerPort,net->sServerIP.c_str());/*strcpy(puf,"/x01/x01/x01/x06/x02");puf[5] = bygroup;fixFrame((unsigned char*)puf,byappid);*/this->udp.send(puf,taddr);// 必须阻塞等待服务端返回分配的端口,否则不能建立监听memset(puf,255);ACE_OS::sleep(ACE_Time_Value(0,10000));isize = this->udp.recv(puf,this->praddr,&t);if (isize > 0 && isize < 255) {ChackProtocol(puf,isize);if (net->iport < 20000) //分配的端口都是>20000的,{ udp.close(); return false;}return true;}udp.close();return false;}int CServerUdpcs::Open_Event(){return (this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK));}ACE_HANDLE CServerUdpcs::get_handle()const{return this->udp.get_handle();}int CServerUdpcs::handle_input(ACE_HANDLE fd){char buf[255] = {0};int isize = this->udp.recv(buf,this->praddr);if (isize > 0 && isize < 255) ChackProtocol(buf,isize);return 0;}int CServerUdpcs::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask) {if (this->udp.get_handle() != ACE_INVALID_HANDLE){ACE_Reactor_Mask m = ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,m);this->udp.close();}delete this;return 0;}void CServerUdpcs::ChackProtocol(const char* buf,const int nSize){Byte *p = (Byte*)buf;if (checkFrame(p,nSize)) return;switch (*(p + 11)){case 0xF0: // 消息应答{ if (*(p + 6) == ServerAppid) ReadDataToList(buf,nSize); break;}case 0x01: // my{ switch (*(p + 15)) { case 0x03:// 返回本机IP和端口号 SaveLocalhost(buf); break; case 0x01:// 保存在线客户端信息(新增客户端) SaveMapinfo(buf); break; case 0x00:// 更新在线客户端信息(客户端掉线) UpdateMapInfo(buf,nSize); break; default: break; } break;}default: break;} }void CServerUdpcs::SetParentHander(CNetWork *p){net = p;}int CServerUdpcs::SendData(const char* buf,const int nSize){return this->udp.send(buf,this->praddr);}int CServerUdpcs::GetData(char* buf,const int nSize){return this->udp.recv(buf,this->praddr);}void CServerUdpcs::SaveLocalhost(const char* buf){net->sip = GetSourceIp(buf);net->iport = GetSourcePort(buf); }void CServerUdpcs::UpdateMapInfo(const char* buf,STAppidcs>::iterator mpIter;Byte byappid = (Byte)buf[23];std::string sip = GetSourceIp(buf); mlock_mp.acquire();for (mpIter = net->mpInfo.find(byappid); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->first != byappid) break;if (mpIter->second.sIp != sip) continue;if (mpIter->second.pcs != NULL){ mpIter->second.pcs->handle_close(ACE_INVALID_HANDLE,0); mpIter->second.pcs = NULL;}net->mpInfo.erase(mpIter);break;}mlock_mp.release();iClientStateSign = 1;}void CServerUdpcs::ReadDataToList(const char* buf,const int nSize){STListData stlData;stlData.iLength = nSize;stlData.byData = new Byte[nSize];memcpy(stlData.byData,buf,nSize);//将数据保存到连表mlocka.acquire();net->lstData.push_front(stlData);mlocka.release();}void CServerUdpcs::SaveMapinfo(const char* buf){std::map<Byte,STAppidcs>::iterator mpIter;Byte *p = (Byte*)buf;STAppidcs stAppidcs; boolb_Insert = false;/*stAppidcs.byAppid = *(p + 23); stAppidcs.byGroup = *(p + 16);stAppidcs.sIp = GetSourceIp(buf);stAppidcs.iPort = GetSourcePort(buf);stAppidcs.pcs = NULL;*/mlock_mp.acquire();for ((mpIter = net->mpInfo.find(stAppidcs.byAppid)); mpIter != net->mpInfo.end(); mpIter++){if (mpIter->first != stAppidcs.byAppid) break;if (mpIter->second.sIp != stAppidcs.sIp) continue;b_Insert = true;break;}if (!b_Insert) net->mpInfo.insert(std::make_pair(stAppidcs.byAppid,stAppidcs));mlock_mp.release();iClientStateSign = 1;}std::string CServerUdpcs::GetSourceIp(const char* buf){ACE_INET_Addr taddr;int iIp = 0;memcpy(&iIp,buf + 17,4);taddr.set(1000,iIp);std::string sip = taddr.get_host_addr();return sip;}int CServerUdpcs::GetSourcePort(const char* buf){int iport = 0;memcpy(&iport,buf + 21,2);return iport;}//////////////////////////////////////////////////////int CClientAcceptor::open(void * avg){ACE_INET_Addr addr(net->iport,net->sip.c_str());this->acp.open(addr,5);return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);}ACE_HANDLE CClientAcceptor::get_handle()const{return this->acp.get_handle();}int CClientAcceptor::handle_input(ACE_HANDLE fd){CClientService* cs = NULL;cs = new CClientService();cs->SetParentHander(net);if (this->acp.accept(cs->peer()) == -1) {delete cs;return 0;}cs->reactor(this->reactor());if (cs->Open() == -1) cs->handle_close(ACE_INVALID_HANDLE,0);return 0;}int CClientAcceptor::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask){if (this->acp.get_handle() != ACE_INVALID_HANDLE){ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,m);this->acp.close();}delete this;return 0;}void CClientAcceptor::SetParentHander(CNetWork *p){net = p;}///////////////////////////////////////////////////int CClientService::Open(){ACE_INET_Addr peeraddr;this->sockeam.get_remote_addr(peeraddr);stAppidcs.iPort = peeraddr.get_port_number();stAppidcs.sIp =peeraddr.get_host_addr();stAppidcs.pcs = this;return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);}ACE_HANDLE CClientService::get_handle()const{return this->sockeam.get_handle();}intCClientService::handle_input(ACE_HANDLE fd){char buf[1024] = {0};int isize = 0;isize = this->sockeam.recv(buf,1024);if (isize <= 0) return -1;if (isize > 0 && isize < 1024) ChackProtocol(buf,isize);return 0;}intCClientService::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask mask) {// 如果对方断开连接,会触发此事件,则会被调用两次(已屏蔽系统自动调用此函数)if (mask == ACE_Event_Handler::WRITE_MASK) return 0;DeleteAppid(this); // 删除外部指针 mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;this->reactor()->remove_handler(this,mask);this->sockeam.close(); delete this;return 0;}void CClientService::SetParentHander(CNetWork *p){net = p;}int CClientService::SendData(const char* buf,const int nSize){intisize = 0; isize = this->sockeam.send_n(buf,nSize);if (isize <= 0) this->handle_close(0,0);return isize;}int CClientService::connect(ACE_INET_Addr addr){ACE_Time_Value itimeout(1);return this->con.connect(sockeam,addr,&itimeout);}void CClientService::ChackProtocol(const char* buf,const int nSize){Byte *p = (Byte*)buf;if ((*(p + 11) == 0x01) && (*(p + 15) == 0x02)){if (!checkFrame(p,nSize)){ stAppidcs.byAppid = *(p + 6);// 请求连接的客户端APPID stAppidcs.byGroup = *(p + 16); AddAppid(stAppidcs); // 增加客户连接 return;}} ReadDataToList(buf,nSize); // 保存用户数据}void CClientService::AddAppid(STAppidcs stAppidcs){std::map<Byte,STAppidcs>::iterator mpIter;for (mpIter = net->mpInfo.find(stAppidcs.byAppid); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->first != stAppidcs.byAppid) break;if (mpIter->second.sIp != stAppidcs.sIp) continue;mpIter->second.pcs = this; break;} }void CClientService::DeleteAppid(CClientService* cs){std::map<Byte,STAppidcs>::iterator mpIter;for (mpIter = net->mpInfo.begin(); mpIter != net->mpInfo.end(); mpIter++) {if (mpIter->second.pcs == cs) { mpIter->second.pcs = NULL; break;}}}void CClientService::ReadDataToList(const char* buf,nSize);// 将数据保存到连表if (net->lstData.size() > 500){std::list<STListData>::iterator lstIter;mlocka.acquire();lstIter = net->lstData.begin();delete []lstIter->byData;net->lstData.erase(lstIter);mlocka.release();}mlocka.acquire();net->lstData.push_back(stlData);mlocka.release();}/////////////////////////////////////////////////*boolCTaskTimer::Open(CNetWork *p){pnet = p;ACE_Time_Value idlay(1);ACE_Time_Value ival(40);timeid = this->reactor()->schedule_timer(this,ival);return true;}int CTaskTimer::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask){if (timeid) this->reactor()->cancel_timer(this); delete this;return 0;}int CTaskTimer::handle_timeout(const ACE_Time_Value& current_time,const void* act) {unsigned char buf[255] = {0};unsigned char isize = 4;/*memcpy(buf,"/x00/x01/x01/x04",4);fixFrame(buf,(pnet->iprtype ? 0x02 : 0x01),pnet->byAppid);*/pnet->SendData(buf,ServerAppid);return 0;} (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|