#include "StreamServer.h" #include "../job/MetaData.h" StreamServer::StreamServer(Machine* p) : m_port(50010) , m_checkQuitFlag(false) , m_dataCallBack(nullptr) , m_machine(p) , m_handlePtr(nullptr){ } StreamServer::~StreamServer() { Stop(); m_checkQuitFlag = true; if (m_checkCloseTd.joinable()) m_checkCloseTd.join(); } //回复后连接中断 请求打印文件层 ::grpc::Status StreamServer::Simple(::grpc::ServerContext* context, const ::stream::RequestInfo* request, ::stream::ResponseAny* response) { ReadData readData; readData.dataType = (READTYPE)(request->datatype()); readData.nameKey = request->namekey(); readData.strValue = request->strvalue(); readData.valueType = (DATATYPE)request->valuetype(); std::list paramLst; //函数参数 for (const ::stream::ParamInfo it : request->item()) { paramLst.emplace_back(Item{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }); //直接加到readData.its上面,后续的请求无法收到,不知道为啥 } if (m_funcDataCallBack) m_funcDataCallBack(m_handlePtr, readData, paramLst, &response); return Status::OK; } Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter* stream) { ClientInfo* cinfo = new ClientInfo(); string addr = context->peer(); size_t pos = addr.find_first_of(':'); printf("\n%s client login...\n", addr.substr(pos+1).c_str()); cinfo->m_clientAddr = context->peer(); cinfo->m_context = context; ClientWrapper::Instance()->AddClient(cinfo); std::thread read([this, &stream, cinfo] { RequestInfo request; while (!cinfo->m_readQuitFlag && stream->Read(&request)) { ReadData readData; readData.dataType = (READTYPE)request.datatype(); readData.nameKey = request.namekey(); readData.strValue = request.strvalue(); readData.valueType = (DATATYPE)request.valuetype(); readData.handleType = (DATAHANDLE)request.handletype(); readData.clientPtr = cinfo; //函数参数 std::list paramLst; // for (const ::stream::ParamInfo it : request.item()) { Item temp{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }; temp.content = it.context(); temp.isEnable = it.isenable(); temp.isAlarm = it.isalarm(); temp.isShow = it.isshow(); temp.start_layer = it.startlayer(); temp.end_layer = it.endlayer(); temp.powder = it.powder(); paramLst.emplace_back(std::move(temp)); //直接加到readData.its上面,后续的请求无法收到,不知道为啥 } printf("客户端消息:dataType:%d,nameKey:%s, strValue:%s,valueType:%d,lst:%zd\n", readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType, paramLst.size()); if (m_dataCallBack) { readData.its = paramLst; m_dataCallBack(m_handlePtr,readData, paramLst); } } printf("read thread exit...\n") ; }); std::thread write([this, &stream, cinfo] { while(!cinfo->m_writeQuitFlag) { WriteData writeData; if (cinfo->GetPushMsg(writeData)) { ResponseInfo response; response.set_result(true); response.set_datatype(writeData.dataType); for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) { ::stream::ParamInfo* paramInfo = response.add_item(); paramInfo->set_namekey((*wd).nameKey); paramInfo->set_strvalue((*wd).strValue); paramInfo->set_valuetype((stream::TYPE)(*wd).valueType); paramInfo->set_context(wd->content); paramInfo->set_isenable(wd->isEnable); paramInfo->set_isalarm(wd->isAlarm); paramInfo->set_isshow(wd->isShow); paramInfo->set_startlayer(wd->start_layer); paramInfo->set_endlayer(wd->end_layer); paramInfo->set_powder(wd->powder); } stream->Write(response); } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } }); read.join(); write.join(); return Status::OK; } void StreamServer::Init() { } void StreamServer::Run() { Stop(); //检测下线线程 m_checkQuitFlag = false; m_checkCloseTd = std::thread([this] { while (!m_checkQuitFlag) { ClientWrapper::Instance()->OfflineCheck(); int count = 10; while (count-- && !m_checkQuitFlag) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } }); //监听线程 m_listenTd = std::thread([this] { std::string server_address("0.0.0.0:" + std::to_string(m_port)); //std::cout << "Server listening on " << server_address << std::endl; ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(this); m_server = builder.BuildAndStart(); m_server->Wait(); }); } void StreamServer::Stop() { ClientWrapper::Instance()->Clear(); if(m_server.get()) m_server->Shutdown(); if (m_listenTd.joinable()) { m_listenTd.join(); } m_checkQuitFlag = true; if (m_checkCloseTd.joinable()) m_checkCloseTd.join(); }