多线程命名管道通信的实现


时间:2022-07-13 02:51:08



转载自: /xinhaijulan/archive//07/31/1789147.html

上篇介绍了【多线程命名管道通信的设计 】,本篇进行多线程命名管道通信的实现。



代码 // Handle of create Named Pipe thread.HANDLE hThreadCreatePipes;// Create Named Pipe thread .hThreadCreatePipes = CreateThread( NULL, // no security attribute 0, // default stack size FUNCreatePipes, // thread procthis, // thread parameter 0, // not suspended NULL); // returns thread ID // Close Handle.CloseHandle(hThreadCreatePipes); // Set handle null.hThreadCreatePipes = NULL;


代码 DWORD WINAPI FUNCreatePipes(LPVOID lpParameter);DWORD WINAPI FUNCreatePipes(LPVOID lpParameter){BOOL fConnected = FALSE; LPTSTR lpszPipename = TEXT("\\\\.\\pipe\\MyPipe"); HANDLE hPipeServer;HANDLE hThreadListenPipes; while(TRUE){hPipeServer = CreateNamedPipe( lpszPipename, // pipe name PIPE_ACCESS_DUPLEX, // read/write access PIPE_TYPE_MESSAGE | // message type pipe PIPE_READMODE_MESSAGE | // message-read mode PIPE_WAIT,// blocking mode PIPE_UNLIMITED_INSTANCES, // max. instances BUFSIZ, // output buffer size BUFSIZ, // input buffer size 0, // client time-out NULL);// default security attribute if (hPipeServer == INVALID_HANDLE_VALUE) {AfxMessageBox("Create named pipes failed.");return -1;}// Wait for the client to connect; if it succeeds, // the function returns a nonzero value. If the function// returns zero, GetLastError returns ERROR_PIPE_CONNECTED. fConnected = ConnectNamedPipe(hPipeServer, NULL);if (fConnected) {// Create listen pipes thread. hThreadListenPipes = CreateThread( NULL, // no security attribute 0, // default stack size FunListenPipes, // thread prochPipeServer, // thread parameter 0, // not suspended NULL); // returns thread ID // Close handle.CloseHandle(hThreadListenPipes); // Set handle null.hThreadListenPipes = NULL;}else {// Close handle.CloseHandle(hPipeServer); // Set handle null.hPipeServer = NULL;}}return 0;}


代码 DWORD WINAPI FunListenPipes(LPVOID lpParameter);DWORD WINAPI FunListenPipes(LPVOID lpParameter){HANDLE hPipe = (HANDLE)lpParameter;HANDLE hHeap= GetProcessHeap();TCHAR* pchRequest = (TCHAR*)HeapAlloc(hHeap, 0, BUFSIZ*sizeof(TCHAR));TCHAR* pchReply = (TCHAR*)HeapAlloc(hHeap, 0, BUFSIZ*sizeof(TCHAR));DWORD cbBytesRead = 0, cbReplyBytes = 0, cbWritten = 0; BOOL fSuccess = FALSE;CString strRequest = "";CString strReply = "";while (TRUE) { fSuccess = ReadFile( hPipe, // handle to pipe pchRequest, // buffer to receive data BUFSIZ*sizeof(TCHAR), // size of buffer &cbBytesRead,// number of bytes read NULL); // not overlapped I/O if (!fSuccess || cbBytesRead == 0){ break;}// Process the incoming message.GetAnswerToRequest(pchRequest, pchReply, &cbReplyBytes); // Write the reply to the pipe. fSuccess = WriteFile( hPipe, // handle to pipe pchReply,// buffer to write from cbReplyBytes, // number of bytes to write &cbWritten, // number of bytes written NULL); // not overlapped I/O if (!fSuccess || cbReplyBytes != cbWritten){ break;}strRequest = pchRequest;strReply = pchReply;}// Flush the pipe to allow the client to read the pipe's contents // before disconnecting. Then disconnect the pipe, and close the // handle to this pipe instance. FlushFileBuffers(hPipe); DisconnectNamedPipe(hPipe); CloseHandle(hPipe); hPipe = NULL;HeapFree(hHeap, 0, pchRequest);HeapFree(hHeap, 0, pchReply);return 1;}VOID GetAnswerToRequest( LPTSTR pchRequest, LPTSTR pchReply, LPDWORD pchBytes ){// Check the outgoing message to make sure it's not too long for the buffer.if (FAILED(strcpy(pchReply, pchRequest))){*pchBytes = 0;pchReply[0] = 0;return;}*pchBytes = (lstrlen(pchReply)+1)*sizeof(TCHAR);}





创建文件 CString strServerIP="This is the server ip";CString lpPipeName="\\\\" + strServerIP + "\\pipe\\MyPipe";LPTSTR lpszPipename = new TCHAR[lpPipeName.GetLength()+1];_tcscpy(lpszPipename, lpPipename); HANDLE m_hPipeClient = CreateFile(lpszPipename,GENERIC_WRITE|GENERIC_READ,0,NULL, OPEN_EXISTING,0,NULL);if(m_hPipeClient == INVALID_HANDLE_VALUE){CString strLastError;strLastError.Format("%d",GetLastError());AfxMessageBox("Error open pipes, the Last Error number is:"+strLastError);return;}else{AfxMessageBox("Success open pipes");}


设置管道状态 DWORD dwMode = PIPE_READMODE_MESSAGE; BOOL fSuccess = SetNamedPipeHandleState( m_hPipeClient, // pipe handle &dwMode,// new pipe mode NULL, // don't set maximum bytes NULL); // don't set maximum time if ( ! fSuccess) {AfxMessageBox("Failed SetNamedPipeHandleState");}


创建N个管道线程 //Set thread number.int nThreadNumber=10;HANDLE m_hThreadSendReadData;for (int i=0;i<nThreadNumber;++i){m_hThreadSendReadData = CreateThread(NULL,0,FunSendReadData,this,0,NULL);CloseHandle(m_hThreadSendReadData);m_hThreadSendReadData = NULL;}


读写管道 DWORD WINAPI FunSendReadData(LPVOID lpParameter);DWORD WINAPI FunSendReadData(LPVOID lpParameter){DWORD dwRead,dwWritten;TCHAR chBuf[BUFSIZ]; BOOL fSuccess = FALSE; DWORD cbRead, cbToWrite, cbWritten, dwMode;LPTSTR lpvMessage=TEXT("This is the data sent.");while(TRUE){cbToWrite = (lstrlen(lpvMessage)+1)*sizeof(TCHAR); fSuccess = WriteFile( pDlg->m_hPipeClient, // pipe handle lpvMessage, // message cbToWrite, // message length &cbWritten, // bytes written NULL); if (!fSuccess){break;}do { // Read from the pipe. fSuccess = ReadFile( pDlg->m_hPipeClient, // pipe handle chBuf, // buffer to receive reply BUFSIZ*sizeof(TCHAR), // size of buffer &cbRead, // number of bytes read NULL); // not overlapped if ( ! fSuccess && GetLastError() != ERROR_MORE_DATA ){break; }} while (!fSuccess); // repeat loop if ERROR_MORE_DATA }return 0;}


