2005년 10월 25일
TCP 서버 네트워크 엔진 구현 [4]
박범진 (oranze@wemade.com)
IOCP 초기화를 마치고 워커 쓰레드를 생성했으면 이제는 소켓을 IOCP와 연결시켜 주어야 한다. 이 엔진에서는 클라이언트 소켓 객체를 TcpPeer가 감싸고 있으므로, 인터페이스 측면에서 볼 때 TcpPeer 객체를 IOCP에 연결하면 된다. <리스트 4>는 이러한 코드를 보여주고 있다.
<리스트 4> TcpPeer와 IOCP의 연결
void TcpServer::AssociatePeer( TcpPeer *obj )
{
// 10초에 한 번씩, 재전송 주기는 1초에 한 번씩으로 맞춘다.
tcp_keepalive keepAlive = { TRUE, 10000, 1000 };
obj->Lock();
// 킵얼라이브 옵션을 켠다.
// 컴파일하려면 mstcpip.h 헤더 파일이 필요하다(platform sdk 참조).
DWORD tmp;
WSAIoctl( obj->m_sock, SIO_KEEPALIVE_VALS,
&keepAlive, sizeof( keepAlive ), 0, 0, &tmp, NULL, NULL );
// TcpPeer의 소켓을 IOCP와 연결시킨다.
CreateIoCompletionPort(
(HANDLE) obj->m_sock, m_iocpHandle, (DWORD) obj, NULL );
if ( !obj->Recv() || !obj->OnInitComplete() )
{
if ( ClosePeer( obj ) )
return;
}
obj->Unlock();
}
<리스트 4>에서 한 가지 특이한 코드가 보이는데, 소켓과 IOCP를 연결하기 전에 킵얼라이브(keepalive)를 설정하고 있는 부분이다. 흔히 고스트 클라이언트(ghost client)라고 하여 무응답 상태의 클라이언트를 처리하기 위해 NOP(No OPeration) 패킷을 주기적으로 주고받거나 킵얼라이브 세그먼트를 주고받도록 설정한다. 이 코드를 추가 함으로써 원격 호스트의 파워가 갑자기 나가거나 LAN 선이 뽑히는 비정상적인 연결을 감지해 낼 수 있다. 소켓을 IOCP에 연결시킨 이후에 곧바로 TcpPeer::Recv 메쏘드를 호출하는데, 앞서 설명한 것처럼 Recv 메쏘드는 엔진 내부에서 직접 호출하기 때문에, 엔진 사용자는 언제나 패킷의 수신 가능 상태를 보장받을 수 있다.
<리스트 5> TcpServer의 IOCP 워커 쓰레드
void TcpServer::WorkerThread()
{
...
while ( true )
{ // IOCP의 통보를 기다린다.
ret = GetQueuedCompletionStatus(
m_iocpHandle,
&bytesTransferred,
(DWORD *) &obj,
(OVERLAPPED **) &overlapped,
INFINITE );
if ( !obj || !overlapped )
break;
obj->Lock(); // 비동기 통보가 완료됐으므로 참조 카운트를 감소시킨다.
obj->m_refCount--;
if (
!ret ||
!bytesTransferred ||
obj->m_sysClosing ||
!DispatchObject( obj, bytesTransferred, overlapped ) )
{
if ( ClosePeer( obj ) )
continue;
}
obj->Unlock();
}
}
IOCP에 관련된 초기 작업을 모두 마쳤다면 이제는 IOCP를 감시할 워커 쓰레드를 살펴 볼 차례다. <리스트 5>에 워커 쓰레드의 루틴이 있다. 여러 쓰레드로부터의 보호를 위해 크리티컬 섹션에 진입한 뒤, 참조 카운트를 감소시키고 완료된 오버랩드 요청(WSASend, WSARecv)을 처리하고 있다.
<리스트 6> 오버랩드 완료 처리
bool TcpServer::DispatchObject(
TcpPeer *obj, int bytesTransferred, OVERLAPPED *ov )
{
TcpPeer::OVERLAPPEDEX *ovex = (TcpPeer::OVERLAPPEDEX *) ov;
// OVERLAPPEDEX::opcode로 어느 호출인지(WSASend, WSARecv) 구분한다.
switch ( ovex->opcode )
{
case TcpPeer::SEND:
DispatchSend( obj, bytesTransferred );
break;
case TcpPeer::RECV:
DispatchRecv( obj, bytesTransferred );
break;
}
return true;
}
bool TcpServer::DispatchSend( TcpPeer *obj, int bytesTransferred )
{
obj->m_olSend.inProgress = false;
// TcpPeer의 가상 함수를 호출한다.
obj->OnSendComplete(
obj->m_olSend.wsaBuf.buf, obj->m_olSend.wsaBuf.len );
// 송신 버퍼의 크기를 조절한다.
obj->m_sendBufPos -= bytesTransferred;
memmove( obj->m_sendBuf,
&obj->m_sendBuf[ bytesTransferred ], obj->m_sendBufPos );
// 이어서 보낼 패킷이 있는가?
if ( obj->m_sendBufPos || obj->m_usrClosing )
obj->Send( NULL, 0 );
return true;
}
bool TcpServer::DispatchRecv( TcpPeer *obj, int bytesTransferred )
{
obj->m_olRecv.inProgress = false;
obj->m_recvBufPos += bytesTransferred;
// 여기서 패킷을 처리하지 않고 TcpServer_Dispatcher에 등록해 처리한다.
// Dispatcher에 등록하는 것도 일종의 비동기 요청이기 때문에,
// AddPeer 함수 안에서 TcpPeer의 참조 카운트를 증가시킨다.
m_dispatcher.AddPeer( obj );
return true;
}
오버랩드 완료 처리를 어떻게 처리하느냐가 중요한데, 먼저 어떤 요청에 대한 결과인지 앞서 준비해 둔 OVERLAPPEDEX의 opcode를 이용해 구분해야 한다. WSASend에 대한 완료라면 TcpPeer에 패킷의 송신이 완료됐음을 알리고(TcpPeer::OnSendComplete), WSARecv에 대한 완료라면 TcpServer_Dispatcher에 등록해 나중에 처리할 수 있도록 한다. <리스트 6>은 방금 설명한 완료 처리 과정을 보여준다.
TcpPeer의 접속이 끊기거나 다른 비정상적인 이유로 해당 TcpPeer의 완료 통보가 실패로 돌아오는 경우가 있다. 이 때는 TcpPeer를 종료시켜야 하는데 이 때 참조 카운트가 사용된다. 다음은 TcpPeer를 종료시키는 코드를 보여준다.
bool TcpServer::ClosePeer( TcpPeer *obj )
{ ...
// 참조 카운트가 있다는 것은 다른 비동기 통보가 남아 있다는 뜻이다.
// 따라서 지금 삭제하면 안된다.
if ( obj->m_refCount )
return false;
...
// TcpServer의 가상 함수를 호출한다.
OnClosePeer( obj );
return true;
}
지금까지 TcpListener, TcpPeer, TcpServer 객체들이 어떻게 구현되는지 살펴봤다. 이제 엔진 내부에서 사용되는 TcpServer_XXX 객체들에 대해 알아 볼 차례다.
먼저 TcpServer_Dispatcher 객체부터 살펴보자. Dispatcher는 TcpPeer가 수신한 패킷을 처리하기 위한 별도의 쓰레드다. <리스트 6>처럼 TcpPeer가 패킷을 수신하게 되면, 자신을 디스패처(dispatcher)에 등록해 일정 주기로 패킷을 파싱해 처리하도록 한다. <리스트 7>은 디스패처의 쓰레드 루틴을 보여준다.
<리스트 7> TcpServer_Dispatcher
void TcpServer_Dispatcher::Dispatcher()
{
...
while ( true )
{
// dispatchCycle 만큼 쉰다.
if ( WaitForSingleObject(
m_close, m_dispatchCycle ) == WAIT_OBJECT_0 )
break;
Lock();
// m_listWait가 현재 추가된 TcpPeer의 목록이다.
m_listWait.swap( m_listProcess );
Unlock();
// TcpServer::OnBeginDispatch 가상 함수를 호출한다.
m_parent->OnBeginDispatch();
for ( iter = m_listProcess.begin();
iter != m_listProcess.end(); iter++ )
{
obj = *iter;
obj->Lock();
// Dispatcher의 처리도 오버랩드 호출과 같은
// 비동기 호출이기 때문에 증가시켰던 참조 카운트를 감소시킨다.
obj->m_refCount--;
// TcpPeer의 수신 버퍼를 처리한 다음 다시 WSARecv 호출을 한다.
if ( !obj->ProcessRecvBuffer() || !obj->Recv() )
{
m_parent->ClosePeer( obj );
continue;
}
obj->Unlock();
}
// TcpServer::OnEndDispatch 가상 함수를 호출한다.
m_parent->OnEndDispatch();
m_listProcess.clear();
}
}
TcpServer_Acceptor 객체는 이 글에서 가장 먼저 언급했던 TcpListener를 관리하여 클라이언트의 접속 요청을 처리하기 위한 쓰레드다. 이 쓰레드는 소켓의 이벤트 감지를 위해 WSAEvent Select 계열의 함수를 사용한다. 이들 함수의 자세한 사용법은 각자 알아보기로 하고, 지금은 코드의 내용을 토대로 전체적인 개념만 잡아내도록 하자. <리스트 8>은 Acceptor의 쓰레드 루틴을 보여준다.
<리스트 8> TcpServer_Acceptor
void TcpServer_Acceptor::Acceptor()
{
...
for ( int i = 2; i < MAXIMUM_WAIT_OBJECTS; i++ )
events[i] = CreateEvent( NULL, FALSE, FALSE, NULL );
while ( true )
{ // TcpListener를 위한 윈속 이벤트를 대기한다.
ret = WSAWaitForMultipleEvents(
eventCount, events, FALSE, INFINITE, FALSE );
// 감지된 TcpListener의 접속을 처리한다.
ProcessWinsockEvent( events, eventCount );
}
for ( i = 2; i < MAXIMUM_WAIT_OBJECTS; i++ )
CloseHandle( events[i] );
}
void TcpServer_Acceptor::AddNewListener( HANDLE *events, int *eventCount )
{
...
for ( ; iter != m_listListener.end(); iter++ )
{ // 새 TcpListener를 등록한다.
WSAResetEvent( events[ *eventCount ] );
WSAEventSelect( (*iter)->m_sock, events[ *eventCount ], FD_ACCEPT );
(*eventCount)++;
}
}
void TcpServer_Acceptor::ProcessWinsockEvent( HANDLE *events, int eventCount )
{
...
for ( int i = 2; i < eventCount; i++, iter++ )
{
listener = *iter;
WSAEnumNetworkEvents( listener->m_sock, events[i], &eventResult );
if ( !eventResult.lNetworkEvents )
continue;
// 클라이언트의 접속을 수락한다.
addrLen = sizeof( addr );
sock = accept( listener->m_sock, &addr, &addrLen );
...
// TcpServer::OnAccept 가상 함수를 호출한다.
obj = m_parent->OnAccept( listener, sock );
if ( !obj )
{
closesocket( sock );
continue;
}
// 새 접속으로 생성된 TcpPeer를 IOCP와 연결시킨다.
m_parent->AssociatePeer( obj );
}
}
마지막으로 TcpServer_Connector 객체는 TcpPeer를 이용해 원격 호스트에 접속을 하기 위한 쓰레드다. 이 쓰레드 역시 소켓의 이벤트 감지를 위해 WSAEventSelect 계열의 함수를 이용하며, 전체 코드 구성이 TcpServer_Acceptor와 유사하다. 지면 관계상 소스는 생략하도록 하겠다.
분산 네트워크 서버에 대한 궁금증
이로써 TCP 서버 네트워크 엔진에 대한 인터페이스 구조부터 구현까지의 모든 설명을 마쳤다. 다음 호부터는 분산 네트워크 서버에 대해 알아볼텐데, 그 동안 독자들의 날카로운 지적을 부탁하며 이만 글을 줄이고자 한다.
# by | 2005/10/25 23:57 | Socket Programming | 트랙백 | 덧글(1)












