GrpcPrint/PrintS/DataManage/StreamServer.cpp

155 lines
4.6 KiB
C++
Raw Normal View History

2024-03-15 12:31:34 +08:00
#include "StreamServer.h"
2024-04-23 13:41:16 +08:00
#include "../job/MetaData.h"
2024-03-15 12:31:34 +08:00
2024-03-22 11:28:06 +08:00
StreamServer::StreamServer(Machine* p)
2024-03-15 12:31:34 +08:00
: m_port(50010)
, m_checkQuitFlag(false)
2024-03-22 11:28:06 +08:00
, m_dataCallBack(nullptr)
2024-04-08 13:43:56 +08:00
, m_machine(p)
, m_handlePtr(nullptr){
2024-03-15 12:31:34 +08:00
}
StreamServer::~StreamServer() {
2024-03-22 11:28:06 +08:00
Stop();
2024-04-07 17:09:01 +08:00
m_checkQuitFlag = true;
if (m_checkCloseTd.joinable()) m_checkCloseTd.join();
2024-03-15 12:31:34 +08:00
}
2024-04-24 18:12:41 +08:00
//回复后连接中断 请求打印文件层
2024-05-15 13:38:34 +08:00
::grpc::Status StreamServer::Simple(::grpc::ServerContext* context, const ::stream::RequestInfo* request, ::stream::ResponseAny* response) {
2024-04-23 13:41:16 +08:00
ReadData readData;
readData.dataType = (READTYPE)(request->datatype());
readData.nameKey = request->namekey();
readData.strValue = request->strvalue();
readData.valueType = (DATATYPE)request->valuetype();
2024-05-15 13:38:34 +08:00
if (m_layerDataCallBack)
m_layerDataCallBack(m_handlePtr, readData, &response);
2024-04-23 13:41:16 +08:00
return Status::OK;
}
2024-03-15 12:31:34 +08:00
Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter<ResponseInfo, RequestInfo>* stream) {
ClientInfo* cinfo = new ClientInfo();
2024-04-03 13:55:07 +08:00
string addr = context->peer();
size_t pos = addr.find_first_of(':');
2024-04-24 18:12:41 +08:00
printf("\n%s client login...\n", addr.substr(pos+1).c_str());
2024-03-15 12:31:34 +08:00
cinfo->m_clientAddr = context->peer();
cinfo->m_context = context;
2024-04-10 16:15:33 +08:00
ClientWrapper::Instance()->AddClient(cinfo);
2024-03-15 12:31:34 +08:00
std::thread read([this, &stream, cinfo] {
RequestInfo request;
while (!cinfo->m_readQuitFlag && stream->Read(&request)) {
ReadData readData;
readData.dataType = (READTYPE)request.datatype();
readData.nameKey = request.namekey();
readData.strValue = request.strvalue();
readData.valueType = (DATATYPE)request.valuetype();
2024-05-30 17:25:23 +08:00
readData.handleType = (DATAHANDLE)request.handletype();
2024-03-15 12:31:34 +08:00
readData.clientPtr = cinfo;
//函数参数
std::list<Item> paramLst; //
for (const ::stream::ParamInfo it : request.item()) {
paramLst.emplace_back(Item{ it.namekey(),it.strvalue() ,(DATATYPE)it.valuetype() }); //直接加到readData.its上面后续的请求无法收到不知道为啥
}
2024-04-09 16:53:02 +08:00
printf("客户端消息dataType:%d,nameKey:%s, strValue:%s,valueType:%d\n",
2024-03-15 12:31:34 +08:00
readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType);
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr,readData, paramLst);
2024-03-15 12:31:34 +08:00
}
}
printf("read thread exit...\n") ;
2024-03-15 12:31:34 +08:00
});
std::thread write([this, &stream, cinfo] {
while(!cinfo->m_writeQuitFlag) {
WriteData writeData;
if (cinfo->GetPushMsg(writeData)) {
ResponseInfo response;
response.set_result(true);
2024-04-08 13:43:56 +08:00
response.set_datatype(writeData.dataType);
for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) {
::stream::ParamInfo* paramInfo = response.add_item();
paramInfo->set_namekey((*wd).nameKey);
paramInfo->set_strvalue((*wd).strValue);
paramInfo->set_valuetype((stream::TYPE)(*wd).valueType);
}
2024-03-15 12:31:34 +08:00
stream->Write(response);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
read.join();
write.join();
return Status::OK;
}
2024-03-22 11:28:06 +08:00
void StreamServer::Init() {
}
void StreamServer::Run() {
2024-04-18 11:59:51 +08:00
Stop();
2024-04-08 13:43:56 +08:00
//检测下线线程
2024-04-18 11:59:51 +08:00
m_checkQuitFlag = false;
m_checkCloseTd = std::thread([this] {
while (!m_checkQuitFlag) {
ClientWrapper::Instance()->OfflineCheck();
int count = 10;
while (count-- && !m_checkQuitFlag) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
2024-03-15 12:31:34 +08:00
}
2024-04-18 11:59:51 +08:00
}
});
2024-03-22 11:28:06 +08:00
//监听线程
2024-04-18 11:59:51 +08:00
m_listenTd = std::thread([this] {
std::string server_address("0.0.0.0:" + std::to_string(m_port));
//std::cout << "Server listening on " << server_address << std::endl;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
m_server = builder.BuildAndStart();
m_server->Wait();
2024-05-30 11:18:10 +08:00
2024-04-18 11:59:51 +08:00
});
2024-03-15 12:31:34 +08:00
}
2024-03-22 11:28:06 +08:00
void StreamServer::Stop() {
2024-05-30 11:18:10 +08:00
ClientWrapper::Instance()->Clear();
2024-05-11 17:43:38 +08:00
if(m_server.get())
2024-04-23 13:41:16 +08:00
m_server->Shutdown();
2024-05-30 11:18:10 +08:00
2024-03-22 11:28:06 +08:00
if (m_listenTd.joinable()) {
m_listenTd.join();
}
2024-03-15 12:31:34 +08:00
2024-04-18 11:59:51 +08:00
m_checkQuitFlag = true;
2024-05-11 17:43:38 +08:00
if (m_checkCloseTd.joinable())
m_checkCloseTd.join();
2024-04-18 11:59:51 +08:00
2024-05-30 11:18:10 +08:00
2024-03-15 12:31:34 +08:00
}