GrpcPrint/PrintS/DataManage/StreamServer.cpp

104 lines
3.2 KiB
C++
Raw Normal View History

2024-03-15 12:31:34 +08:00
#include "StreamServer.h"
StreamServer::StreamServer()
: m_port(50010)
, m_checkQuitFlag(false)
, m_dataCallBack(nullptr){
}
StreamServer::~StreamServer() {
m_checkQuitFlag = true;
if (m_checkCloseTd.joinable()) m_checkCloseTd.join();
}
Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter<ResponseInfo, RequestInfo>* stream) {
ClientInfo* cinfo = new ClientInfo();
printf("receive %s ...\n", context->peer().c_str());
cinfo->m_clientAddr = context->peer();
cinfo->m_context = context;
{
std::lock_guard<std::mutex> lck(m_clientMutex);
m_clientList.emplace_back(cinfo);
}
std::thread read([this, &stream, cinfo] {
RequestInfo request;
while (!cinfo->m_readQuitFlag && stream->Read(&request)) {
ReadData readData;
readData.dataType = (READTYPE)request.datatype();
readData.nameKey = request.namekey();
readData.strValue = request.strvalue();
readData.valueType = (DATATYPE)request.valuetype();
readData.clientPtr = cinfo;
printf("客户端消息dataType:%d,nameKey:%s, strValue:%s,valueType:%d",
readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType);
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr,readData);
}
}
});
std::thread write([this, &stream, cinfo] {
while(!cinfo->m_writeQuitFlag) {
WriteData writeData;
if (cinfo->GetPushMsg(writeData)) {
ResponseInfo response;
response.set_result(true);
response.set_datatype((::stream::ResponseInfo_Status)writeData.dataType);
response.set_strvalue(writeData.strValue);
response.set_namekey(writeData.nameKey);
response.set_valuetype((::stream::ResponseInfo_TYPE)writeData.valueType);
stream->Write(response);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
read.join();
write.join();
return Status::OK;
}
void StreamServer::CheckProc() {
m_checkCloseTd = std::thread([this] {
while (!m_checkQuitFlag) {
for (auto client = m_clientList.begin(); client != m_clientList.end();) {
if (!(*client)->IsConnect()) {
printf("%s 下线了...\n", (*client)->m_clientAddr.c_str());
delete (*client);
client = m_clientList.erase(client);
}
else
++client;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
m_checkCloseTd.detach();
}
void StreamServer::Run() {
std::string server_address("0.0.0.0:"+std::to_string(m_port));
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}