#include "StreamServer.h" StreamServer::StreamServer() : m_port(50010) , m_checkQuitFlag(false) , m_dataCallBack(nullptr){ } StreamServer::~StreamServer() { m_checkQuitFlag = true; if (m_checkCloseTd.joinable()) m_checkCloseTd.join(); } Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter* stream) { ClientInfo* cinfo = new ClientInfo(); printf("receive %s ...\n", context->peer().c_str()); cinfo->m_clientAddr = context->peer(); cinfo->m_context = context; { std::lock_guard lck(m_clientMutex); m_clientList.emplace_back(cinfo); m_config->UpdateClients(m_clientList); } std::thread read([this, &stream, cinfo] { RequestInfo request; while (!cinfo->m_readQuitFlag && stream->Read(&request)) { ReadData readData; readData.dataType = (READTYPE)request.datatype(); readData.nameKey = request.namekey(); readData.strValue = request.strvalue(); readData.valueType = (DATATYPE)request.valuetype(); readData.clientPtr = cinfo; printf("客户端消息:dataType:%d,nameKey:%s, strValue:%s,valueType:%d", readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType); if (m_dataCallBack) { m_dataCallBack(m_handlePtr,readData); } } }); std::thread write([this, &stream, cinfo] { while(!cinfo->m_writeQuitFlag) { WriteData writeData; if (cinfo->GetPushMsg(writeData)) { ResponseInfo response; response.set_result(true); response.set_datatype((::stream::ResponseInfo_Status)writeData.dataType); response.set_strvalue(writeData.strValue); response.set_namekey(writeData.nameKey); response.set_valuetype((::stream::ResponseInfo_TYPE)writeData.valueType); stream->Write(response); } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } }); read.join(); write.join(); return Status::OK; } void StreamServer::CheckProc() { m_checkCloseTd = std::thread([this] { while (!m_checkQuitFlag) { std::lock_guard lck(m_clientMutex); { for (auto client = m_clientList.begin(); client != m_clientList.end();) { if (!(*client)->IsConnect()) { printf("%s 下线了...\n", (*client)->m_clientAddr.c_str()); delete (*client); client = m_clientList.erase(client); } else ++client; } m_config->UpdateClients(m_clientList); } std::this_thread::sleep_for(std::chrono::seconds(1)); } }); m_checkCloseTd.detach(); } void StreamServer::Run() { m_config = ConfigManager::GetInstance(); m_config->Init(); std::string server_address("0.0.0.0:"+std::to_string(m_port)); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(this); std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); }