Reactor事件驱动模式
一,什么是Reactor模式?
1.它的中文名叫反应器,是事件驱动中的一种(还有另外一种叫Proactor:主动器)
2.在我们写一些比较小的[服务器-客户端]的时候,经常简单地使用BSD Socket(accept,connect,recv,send等等),
但是这样的话,程序的[Socket处理逻辑]和[业务逻辑]就混合在一起,导致2者的耦合性非常高,Reactor模式正是为了
解决这种耦合性而诞生的
3.Reactor(反应器)是相对于调用者而言的,因为调用之后,当有事件,Reactor就会通知我们,我们就去响应事件。
二,Reactor模式的实现(C++):
(1).实现类简介:
1.多路复用组件接口:IReactor
2.具体多路复用组件:CSelectReactor(在Windows是用select实现,在Linux可以用epoll等等)
3.事件组件接口:IEvent(事件是和Socket关联在一起的)
4.具体事件组件:CAcceptEvent,CReadEvent
(2).实现思路:
1.定义一个具体的多路复用组件,该组件负责:
A.多路复用事件
B.注册一个事件到多路复用事件列表中
C.当有事件发生,判断事件的类型(分别有:Read,Write),然后调用分别调用事件的Read事件或者Write事件
2.定义事件,事件都是继承于IEvent接口类,并且提供2个接口:
A.ReadEvent接口,当有Read事件的时候,该接口会被调用
B.WriteEvent接口,当有Write事件的时候,该接口被调用
(3).实现:
A.多路复用组件接口(IReactor):
/////////////类声明///////////////
typedef map<SOCKET,IEvent*> SOCKET_MAP_DEF;//以Socket为映射的IEvent*列表
typedef map<SOCKET,IEvent*>::iterator SOCKET_MAP_ITER_DEF;
class IReactor
{
public:
virtual int Init() = 0;//初始化
virtual int Dispatch() = 0;//多路复用Dispatch
int RegisterEvent(IEvent* pEvent);//注册一个Event
IReactor();
protected:
SOCKET_MAP_DEF cEventMap;
};
/////////////类实现///////////////
IReactor::IReactor()
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD( 2,2 );
err = WSAStartup( wVersionRequested,&wsaData );
if ( err != 0 ) {
return;
}
if ( LOBYTE( wsaData.wVersion ) != 2 ||
HIBYTE( wsaData.wVersion ) != 2 ) {
WSACleanup( );
return;
}
}
int IReactor::RegisterEvent(IEvent* pEvent)
{
cEventMap.insert(make_pair(pEvent->GetSocket(),pEvent));//注册事件
return TRUE;
}
B.具体多路复用组件(CSelectReactor):
/////////////类声明///////////////
class CSelectReactor : public IReactor//继承于IReactor
{
public:
int Init();
int Dispatch();
private:
typedef struct
{
FD_SET stReadFDSet;
FD_SET stWriteFDSet;
FD_SET stExpFDSet;
}SELECT_REACTOR_DATA;//select的3中FD,分别用于select函数
protected:
SELECT_REACTOR_DATA stFDSet;
};
/////////////类实现///////////////
int CSelectReactor::Init()
{
return 0;
}
int CSelectReactor::Dispatch()
{
while(1)
{
FD_ZERO(&stFDSet.stReadFDSet);
FD_ZERO(&stFDSet.stWriteFDSet);
FD_ZERO(&stFDSet.stExpFDSet);
if (cEventMap.size() == 0)
{
break;
}
//1.根据Event_Type把事件增加到相应的FD中
for (SOCKET_MAP_ITER_DEF i = cEventMap.begin(); i != cEventMap.end(); i++)
{
switch (i->second->GetEventType())
{
case enumEvent_Type_Read:
{
FD_SET(i->second->GetSocket(),&stFDSet.stReadFDSet);
}break;
case enumEvent_Type_Write:
{
FD_SET(i->second->GetSocket(),&stFDSet.stWriteFDSet);
}break;
}
}
//2.开始Select
int nRet = select(0,&stFDSet.stReadFDSet,&stFDSet.stWriteFDSet,&stFDSet.stExpFDSet,NULL);
if (nRet == SOCKET_ERROR)//出错
{
break;
}
else if (nRet == 0)//超时
{
}
else
{
for (SOCKET_MAP_ITER_DEF i = cEventMap.begin(); i != cEventMap.end();)
{
BOOL bIsEvent = FALSE;
if (FD_ISSET(i->second->GetSocket(),&stFDSet.stReadFDSet))//有Read事件
{
i->second->ReadEvent();
bIsEvent = TRUE;
}
if (FD_ISSET(i->second->GetSocket(),&stFDSet.stWriteFDSet))//有Write事件
{
i->second->WriteEvent();
bIsEvent = TRUE;
}
if(bIsEvent && i->second->IsEventPersist() == FALSE)
{
delete i->second;//!!!这里暂时这样做
i = cEventMap.erase(i);
}
else
{
i++;
}
}
}
}
return 0;
}
C.事件组件接口(IEvent):
/////////////类声明///////////////
class IEvent
{
public:
IEvent() { bIsPersist = FALSE; }
VOID SetEvent(SOCKET hSocket,EVENT_TYPE enumEventType);//设置一个Event的事件
SOCKET GetSocket();
EVENT_TYPE GetEventType();
VOID SetEventPersist(BOOL bIsPersist) { this->bIsPersist = bIsPersist; }//设置Event是否[持久性]
BOOL IsEventPersist() { return bIsPersist == TRUE; }
virtual int ReadEvent() = 0;//Read事件接口
virtual int WriteEvent() = 0;//Write事件接口
protected:
SOCKET hSocket;
EVENT_TYPE enumEventType;
BOOL bIsPersist;
};
/////////////类实现///////////////
VOID IEvent::SetEvent(SOCKET hSocket,EVENT_TYPE enumEventType)
{
this->hSocket = hSocket;
this->enumEventType = enumEventType;
}
SOCKET IEvent::GetSocket()
{
return hSocket;
}
EVENT_TYPE IEvent::GetEventType()
{
return enumEventType;
}
D.具体事件组件(CAcceptEvent,CReadEvent):
class CReadEvent : public IEvent
{
public:
int ReadEvent()//Read事件
{
char szBuf[1024 * 2] = {0};
int nRet = recv(GetSocket(),szBuf,1024 * 2,0);//从Socket读取数据
if (nRet == -1)//对方关闭Socket
{
SetEventPersist(FALSE);
closesocket(GetSocket());
return 0;
}
cout<<"ReadBuffer:"<<szBuf<<endl;
return 0;
}
int WriteEvent()
{
return 0;
}
};
class CAcceptEvent : public IEvent
{
public:
int ReadEvent()
{
CReadEvent* pReadEvent = new CReadEvent();//客户端的Event
SOCKADDR_IN stClientAddr = {0};
int nLen = sizeof(SOCKADDR_IN);
SOCKET hClient = accept(GetSocket(),(SOCKADDR*)&stClientAddr,&nLen);//Accept客户端
pReadEvent->SetEvent(hClient,enumEvent_Type_Read);
pReadEvent->SetEventPersist(TRUE);
pSelectReactor->RegisterEvent(pReadEvent);//注册到Reactor中
return 0;
}
int WriteEvent()
{
cout<<"WriteEvent"<<endl;
return 0;
}
CAcceptEvent(CSelectReactor* pSelectReactor)
{
this->pSelectReactor = pSelectReactor;
}
protected:
CSelectReactor* pSelectReactor;
};
E.Main函数:
int main()
{
CSelectReactor cSelect;
CAcceptEvent* pEvent = new CAcceptEvent(&cSelect);
SOCKADDR_IN stSockAddr = {0};
stSockAddr.sin_addr.s_addr = INADDR_ANY;
stSockAddr.sin_family = AF_INET;
stSockAddr.sin_port = htons(9999);
SOCKET hSocket = socket(AF_INET,SOCK_STREAM,0);
bind(hSocket,(SOCKADDR*)&stSockAddr,sizeof(SOCKADDR));
listen(hSocket,5);
pEvent->SetEvent(hSocket,enumEvent_Type_Read);//增加一个Listen的Event
pEvent->SetEventPersist(TRUE);
cSelect.RegisterEvent(pEvent);//注册到Reacotr中
cSelect.Dispatch();//Select多路复用
return 0;
}