GrpcPrint/PrintS/DataManage/StreamServer.cpp

167 lines
5.5 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "StreamServer.h"
#include "../job/MetaData.h"
StreamServer::StreamServer(Machine* p)
: m_port(50010)
, m_dataCallBack(nullptr)
, m_machine(p)
, m_handlePtr(nullptr){
}
StreamServer::~StreamServer() {
Stop();
}
//回复后连接中断 请求打印文件层
::grpc::Status StreamServer::Simple(::grpc::ServerContext* context, const ::stream::RequestInfo* request, ::stream::ResponseAny* response) {
ReadData readData;
readData.dataType = (READTYPE)(request->datatype());
readData.nameKey = request->namekey();
readData.strValue = request->strvalue();
readData.valueType = (DATATYPE)request->valuetype();
std::list<Item> paramLst; //函数参数
for (const ::stream::ParamInfo it : request->item()) {
paramLst.emplace_back(Item{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }); //直接加到readData.its上面后续的请求无法收到不知道为啥
}
if (m_funcDataCallBack)
m_funcDataCallBack(m_handlePtr, readData, paramLst, &response);
return Status::OK;
}
Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter<ResponseInfo, RequestInfo>* stream) {
ClientInfo* cinfo = new ClientInfo();
string addr = context->peer();
size_t pos = addr.find_first_of(':');
printf("\n%s client login...\n", addr.substr(pos+1).c_str());
cinfo->m_clientAddr = context->peer();
cinfo->m_context = context;
ClientWrapper::Instance()->AddClient(cinfo);
std::thread read([this, &stream, cinfo] {
RequestInfo request;
while (!cinfo->m_readQuitFlag) {
if (!stream->Read(&request)) { //返回false 认为是client掉线了
printf("client %s 下线了\n", cinfo->m_clientAddr.data());
cinfo->m_writeQuitFlag = true;
ClientWrapper::Instance()->CloseOne(cinfo);
break;
}
ReadData readData;
readData.dataType = (READTYPE)request.datatype();
readData.nameKey = request.namekey();
readData.strValue = request.strvalue();
readData.valueType = (DATATYPE)request.valuetype();
readData.handleType = (DATAHANDLE)request.handletype();
readData.clientPtr = cinfo;
//函数参数
std::list<Item> paramLst; //
for (const ::stream::ParamInfo it : request.item()) {
Item temp{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() };
temp.content = it.context();
temp.isEnable = it.isenable();
temp.isAlarm = it.isalarm();
temp.isShow = it.isshow();
temp.start_layer = it.startlayer();
temp.end_layer = it.endlayer();
temp.powder = it.powder();
paramLst.emplace_back(std::move(temp)); //直接加到readData.its上面后续的请求无法收到不知道为啥
}
printf("客户端消息dataType:%d,nameKey:%s, strValue:%s,valueType:%d,lst:%zd\n",
readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType, paramLst.size());
if (m_dataCallBack) {
//readData.its = paramLst;
m_dataCallBack(m_handlePtr,readData, paramLst);
}
}
//printf("read thread exit...\n") ;
});
std::thread write([this, &stream, cinfo] {
while(!cinfo->m_writeQuitFlag) {
WriteData writeData;
if (cinfo->GetPushMsg(writeData)) {
ResponseInfo response;
response.set_result(true);
response.set_datatype(writeData.dataType);
for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) {
::stream::ParamInfo* paramInfo = response.add_item();
paramInfo->set_namekey((*wd).nameKey);
paramInfo->set_strvalue((*wd).strValue);
paramInfo->set_valuetype((stream::TYPE)(*wd).valueType);
paramInfo->set_context(wd->content);
paramInfo->set_isenable(wd->isEnable);
paramInfo->set_isalarm(wd->isAlarm);
paramInfo->set_isshow(wd->isShow);
paramInfo->set_startlayer(wd->start_layer);
paramInfo->set_endlayer(wd->end_layer);
paramInfo->set_powder(wd->powder);
}
stream->Write(response);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
//printf("write thread exit...\n");
});
read.join();
write.join();
return Status::OK;
}
void StreamServer::Init() {
}
void StreamServer::Run() {
Stop();
//监听线程
m_listenTd = std::thread([this] {
std::string server_address("0.0.0.0:" + std::to_string(m_port));
//std::cout << "Server listening on " << server_address << std::endl;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
m_server = builder.BuildAndStart();
m_server->Wait();
});
}
void StreamServer::Stop() {
ClientWrapper::Instance()->Clear();
if(m_server.get())
m_server->Shutdown();
if (m_listenTd.joinable()) {
m_listenTd.join();
}
}