博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
IO处理线程
阅读量:5959 次
发布时间:2019-06-19

本文共 8195 字,大约阅读时间需要 27 分钟。

hot3.png

客户IO处理,是在工作线程,_WorkerThreadProc中完成的

函数,在完成端口上调用GetQueuedCompletionStatus函数等待IO完成,并调用自定义函数HandleIO来处理IO,具体代码如下:

DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam){    #ifdef _DEBUG        ::OutputDebugString("Worker Thread startup...\n");    #endif //_DEBUG    CIOCPServer *pThis = (CIOCPServer*)lpParam;    CIOCPBuffer *pBuffer;    DWORD dwKey;    DWORD dwTrans;    LPOVERLAPPED lpol;    while(TRUE)    {        //在关联到此完成端口的所有套接字上等待IO完成        BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE);        if(dwTrans == -1)        {            #ifdef _DEBUG                ::OutputDebugString("Worker Thread startup...\n");            #endif //_DEBUG                            ::ExitThread(0);        }                pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol);        int nError = NO_ERROR;        if(!bOK)        {            SOCKET s;            if(pBuffer->nOperation == OP_ACCEPT)            {                s=pThis->m_sListen;            }            else            {                if(dwKey == 0)                    break;                s=((CIOCPContext*)dwKey)->s;            }            DWORD dwFlags = 0;            if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags))            {                nError = ::WSAGetLastError();            }        }        pThis->HandleIO(dwKey,pBuffer,dwTrans,nError);    }    #ifdef _DEBUG        ::OutputDebugString("Worker Thread out...\n");    #endif //_DEBUG    return 0;}

SendText成员函数用于在连接上发送数据,执行时先申请一个缓冲区对象,把用户将要发送的数据复制到里面,然后调用postSend成员函数投递这个缓冲区对象

BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen){    CIOCPBuffer *pBuffer = AllocateBuffer(nLen);    if(pBuffer != NULL)    {        memcpy(pBuffer->buff,pszText,nLen);        return PostSend(pContext,pBuffer);    }    return FALSE;}

 

下面的HandleIO函数是关键,

处理完成的IO,投递新的IO请求,释放完成的缓冲区对象,关闭客户上下文对象

下面是主要的实现代码:

void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError){    CIOCPContext *pContext = (CIOCPContext*)dwKey;    #ifdef _DEBUG        ::OutputDebugString("HandleIO startup..\n");    #endif //_DEBUG    //减少套接字未决IO计数    if(pContext!=NULL)    {        ::EnterCriticalSection(&pContext->Lock);        if(pBuffer->nOperation == OP_READ)            pContext->nOutstandingRecv--;        else if(pBuffer->nOperation == OP_WRITE)            pContext->nOutstandingSend--;        ::LeaveCriticalSection(&pContext->Lock);        //检查套接字是否已经打开        if(pContext->bClosing)        {            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG                        if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            //释放已关闭套接字的未决IO            ReleaseBuffer(pBuffer);            return;        }    }    else    {        RemovePendingAccept(pBuffer);    }    //检查套接字上发生的错误,然后直接关闭套接字    if(nError!=NO_ERROR)    {        if(pBuffer->nOperation != OP_ACCEPT)        {            OnConnectionError(pContext,pBuffer,nError);            CloseAConnection(pContext);            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG        }        else//在监听套接字上发生错误        {            if(pBuffer->sClient != INVALID_SOCKET)            {                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG        }        ReleaseBuffer(pBuffer);        return;    }    //开始处理    if(pBuffer->nOperation == OP_ACCEPT)    {        if(dwTrans == 0)        {            #ifdef _DEBUG                ::OutputDebugString("HandleIO startup..\n");            #endif //_DEBUG            if(pBuffer->sClient != INVALID_SOCKET)            {                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }        }        else        {            //为接收新连接的申请客户上下文对象            CIOCPContext *pClient = AllocateContext(pBuffer->sClient);            if(pClient != NULL)            {                if(AddAConnection(pCliebt))                {                    //取得用户地址                    int nLocalLen,nRmoteLen;                    m_lpfnGetAcceptExSockaddrs(                            pBuffer->buff,                            pBuffer->nLen-((sizeof(sockaddr_in)+16)*2),                            sizeof(sockaddr_in)+16,                            sizeof(sockaddr_in)+16,                            (SOCKADDR **)&pLocalAddr,                            &nLocalLen,                            (SOCKADDR **)&pRemoteAddr,                            &nRmoteLen);                    memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen);                    memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen);                    //关联新连接到完成端口对象                    ::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0);                    //通知用户                    pBuffer->nLen = dwTrans;                    OnConnectionEstablished(pClient,pBuffer);                    //向新连接投递Read请求                    for(int i=0;i<5;i++)                    {                        CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);                        if(p != NULL)                        {                            if(!PostRecv(pClient,p))                            {                                CloseAConnection(pClient);                                break;                            }                        }                    }                }                else                {                    CloseAConnection(pClient);                    ReleaseContext(pClient);                }            }            else            {                //资源不足,关闭与客户的连接即可                ::closesocket(pBuffer->sClient);                pBuffer->sClient = INVALID_SOCKET;            }        }        //Accept请求完成,释放IO缓冲区        ReleaseBuffer(pBuffer);        //通知监听线程继续再投递一个Accept请求        ::InterlockedDecrement(&m_nRepostCount);        ::SetEvent(m_hRepostEvent);    }    else if(pBuffer->nOperation == OP_READ)    {        if(dwTrans == 0)        {            //先通知用户            pBuffer->nLen = 0;            OnConnectionClosing(pContext,pBuffer);            //再关闭连接            CloseAConnection(pContext);            //释放客户上下文和缓冲区对象            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            ReleaseBuffer(pBuffer);        }        else        {            pBuffer->nLen = dwTrans;            //按照IO投递的顺序读取接收到的数据            CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer);            while(p!=NULL)            {                OnReadCompleted(pContext,p);                //增加要读的序列号的值                ::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence);                //释放IO                ReleaseBuffer(p);                p = GetNextReadBuffer(pContext,NULL);            }            //继续投递一个新的请求            pBuffer = AllocateBuffer(BUFFER_SIZE);            if(pBuffer==NULL || !PostRecv(pContext,pBuffer))            {                CloseAConnection(pContext);            }        }    }    else if(pBuffer->nOperation == OP_WRITE)    {        if(dwTrans == 0)        {            //先通知用户            pBuffer->nLen = 0;            OnConnectionClosing(pContext,pBuffer);            //再关闭连接            CloseAConnection(pContext);            //释放客户上下文和缓冲区对象            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)            {                ReleaseContext(pContext);            }            ReleaseBuffer(pBuffer);        }        else        {            //写操作完成,通知用户            pBuffer->nLen = dwTrans;            OnWriteCompleted(pContext,pBuffer);            //释放SendText函数申请缓冲区            ReleaseBuffer(pBuffer);        }    }}

转载于:https://my.oschina.net/u/204616/blog/545200

你可能感兴趣的文章
事情的两面性
查看>>
只要会营销,shi都能卖出去?
查看>>
sed单行处理命令奇偶行输出
查看>>
VC++深入详解学习笔记1
查看>>
安装配置discuz
查看>>
线程互互斥锁
查看>>
KVM虚拟机&openVSwitch杂记(1)
查看>>
win7下ActiveX注册错误0x80040200解决参考
查看>>
《.NET应用架构设计:原则、模式与实践》新书博客--试读-1.1-正确认识软件架构...
查看>>
2013 Linux领域年终盘点
查看>>
linux学习之查看程序端口占用情况
查看>>
相逢在栀枝花开的季节
查看>>
linux下git自动补全命令
查看>>
Ubuntu14.04LTS更新源
查看>>
Linux报“Unknown HZ value! (288) Assume 100”错误
查看>>
mysql多实例实例化数据库
查看>>
我的友情链接
查看>>
golang xml和json的解析与生成
查看>>
javascript 操作DOM元素样式
查看>>
Android 内存管理 &Memory Leak & OOM 分析
查看>>