GrpcPrint/PrintS/DataManage/StreamServer.cpp

162 lines
4.9 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "StreamServer.h"
#include "../job/MetaData.h"
StreamServer::StreamServer(Machine* p)
: m_port(50010)
, m_checkQuitFlag(false)
, m_dataCallBack(nullptr)
, m_machine(p)
, m_handlePtr(nullptr){
}
StreamServer::~StreamServer() {
Stop();
m_checkQuitFlag = true;
if (m_checkCloseTd.joinable()) m_checkCloseTd.join();
}
//回复后连接中断 请求打印文件层
::grpc::Status StreamServer::Simple(::grpc::ServerContext* context, const ::stream::RequestInfo* request, ::stream::ResponseAny* response) {
ReadData readData;
readData.dataType = (READTYPE)(request->datatype());
readData.nameKey = request->namekey();
readData.strValue = request->strvalue();
readData.valueType = (DATATYPE)request->valuetype();
std::list<Item> paramLst; //函数参数
for (const ::stream::ParamInfo it : request->item()) {
paramLst.emplace_back(Item{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }); //直接加到readData.its上面后续的请求无法收到不知道为啥
}
if (m_layerDataCallBack)
m_layerDataCallBack(m_handlePtr, readData, paramLst, &response);
return Status::OK;
}
Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter<ResponseInfo, RequestInfo>* stream) {
ClientInfo* cinfo = new ClientInfo();
string addr = context->peer();
size_t pos = addr.find_first_of(':');
printf("\n%s client login...\n", addr.substr(pos+1).c_str());
cinfo->m_clientAddr = context->peer();
cinfo->m_context = context;
ClientWrapper::Instance()->AddClient(cinfo);
std::thread read([this, &stream, cinfo] {
RequestInfo request;
while (!cinfo->m_readQuitFlag && stream->Read(&request)) {
ReadData readData;
readData.dataType = (READTYPE)request.datatype();
readData.nameKey = request.namekey();
readData.strValue = request.strvalue();
readData.valueType = (DATATYPE)request.valuetype();
readData.handleType = (DATAHANDLE)request.handletype();
readData.clientPtr = cinfo;
//函数参数
std::list<Item> paramLst; //
for (const ::stream::ParamInfo it : request.item()) {
paramLst.emplace_back(Item{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }); //直接加到readData.its上面后续的请求无法收到不知道为啥
}
printf("客户端消息dataType:%d,nameKey:%s, strValue:%s,valueType:%d\n",
readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType);
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr,readData, paramLst);
}
}
printf("read thread exit...\n") ;
});
std::thread write([this, &stream, cinfo] {
while(!cinfo->m_writeQuitFlag) {
WriteData writeData;
if (cinfo->GetPushMsg(writeData)) {
ResponseInfo response;
response.set_result(true);
response.set_datatype(writeData.dataType);
for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) {
::stream::ParamInfo* paramInfo = response.add_item();
paramInfo->set_namekey((*wd).nameKey);
paramInfo->set_strvalue((*wd).strValue);
paramInfo->set_valuetype((stream::TYPE)(*wd).valueType);
}
stream->Write(response);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
read.join();
write.join();
return Status::OK;
}
void StreamServer::Init() {
}
void StreamServer::Run() {
Stop();
//检测下线线程
m_checkQuitFlag = false;
m_checkCloseTd = std::thread([this] {
while (!m_checkQuitFlag) {
ClientWrapper::Instance()->OfflineCheck();
int count = 10;
while (count-- && !m_checkQuitFlag) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
//监听线程
m_listenTd = std::thread([this] {
std::string server_address("0.0.0.0:" + std::to_string(m_port));
//std::cout << "Server listening on " << server_address << std::endl;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
m_server = builder.BuildAndStart();
m_server->Wait();
});
}
void StreamServer::Stop() {
ClientWrapper::Instance()->Clear();
if(m_server.get())
m_server->Shutdown();
if (m_listenTd.joinable()) {
m_listenTd.join();
}
m_checkQuitFlag = true;
if (m_checkCloseTd.joinable())
m_checkCloseTd.join();
}