#include "StreamServer.h" StreamServer::StreamServer(Machine* p) : m_port(50010) , m_checkQuitFlag(false) , m_dataCallBack(nullptr) , m_machine(p) , m_handlePtr(nullptr){ } StreamServer::~StreamServer() { Stop(); m_checkQuitFlag = true; if (m_checkCloseTd.joinable()) m_checkCloseTd.join(); } Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter* stream) { ClientInfo* cinfo = new ClientInfo(); string addr = context->peer(); size_t pos = addr.find_first_of(':'); printf("%s client login...\n", addr.substr(pos+1).c_str()); cinfo->m_clientAddr = context->peer(); cinfo->m_context = context; ClientWrapper::Instance()->AddClient(cinfo); std::thread read([this, &stream, cinfo] { RequestInfo request; while (!cinfo->m_readQuitFlag && stream->Read(&request)) { ReadData readData; readData.dataType = (READTYPE)request.datatype(); readData.nameKey = request.namekey(); readData.strValue = request.strvalue(); readData.valueType = (DATATYPE)request.valuetype(); readData.clientPtr = cinfo; printf("客户端消息:dataType:%d,nameKey:%s, strValue:%s,valueType:%d\n", readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType); if (m_dataCallBack) { m_dataCallBack(m_handlePtr,readData); } } }); std::thread write([this, &stream, cinfo] { while(!cinfo->m_writeQuitFlag) { WriteData writeData; if (cinfo->GetPushMsg(writeData)) { ResponseInfo response; response.set_result(true); response.set_datatype(writeData.dataType); response.set_strvalue(writeData.strValue); response.set_namekey(writeData.nameKey); response.set_valuetype((::stream::ResponseInfo_TYPE)writeData.valueType); stream->Write(response); } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } }); read.join(); write.join(); return Status::OK; } void StreamServer::Init() { } void StreamServer::Run() { //检测下线线程 if (!m_checkCloseTd.joinable()) { m_checkCloseTd = std::thread([this] { while (!m_checkQuitFlag) { ClientWrapper::Instance()->OfflineCheck(); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } }); } //监听线程 if (!m_listenTd.joinable()) { m_listenTd = std::thread([this] { std::string server_address("0.0.0.0:" + std::to_string(m_port)); //std::cout << "Server listening on " << server_address << std::endl; ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(this); m_server = builder.BuildAndStart(); m_server->Wait(); }); } } void StreamServer::Stop() { m_server->Shutdown(); if (m_listenTd.joinable()) { m_listenTd.join(); } { std::lock_guard lck(m_clientMutex); for (auto client = m_clientList.begin(); client != m_clientList.end(); ++client) { delete (*client); } m_clientList.clear(); } }