GrpcPrint/PrintS/DataManage/StreamServer.cpp
2024-04-08 13:43:56 +08:00

141 lines
4.2 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "StreamServer.h"
StreamServer::StreamServer(Machine* p)
: m_port(50010)
, m_checkQuitFlag(false)
, m_dataCallBack(nullptr)
, m_machine(p)
, m_handlePtr(nullptr){
}
StreamServer::~StreamServer() {
Stop();
m_checkQuitFlag = true;
if (m_checkCloseTd.joinable()) m_checkCloseTd.join();
}
Status StreamServer::AllStream(ServerContext* context, grpc::ServerReaderWriter<ResponseInfo, RequestInfo>* stream) {
ClientInfo* cinfo = new ClientInfo();
string addr = context->peer();
size_t pos = addr.find_first_of(':');
printf("%s client login...\n", addr.substr(pos+1).c_str());
cinfo->m_clientAddr = context->peer();
cinfo->m_context = context;
{
std::lock_guard<std::mutex> lck(m_clientMutex);
m_clientList.emplace_back(cinfo);
m_machine->UpdateClients(m_clientList);
}
std::thread read([this, &stream, cinfo] {
RequestInfo request;
while (!cinfo->m_readQuitFlag && stream->Read(&request)) {
ReadData readData;
readData.dataType = (READTYPE)request.datatype();
readData.nameKey = request.namekey();
readData.strValue = request.strvalue();
readData.valueType = (DATATYPE)request.valuetype();
readData.clientPtr = cinfo;
printf("客户端消息dataType:%d,nameKey:%s, strValue:%s,valueType:%d",
readData.dataType, readData.nameKey.c_str(), readData.strValue.c_str(), readData.valueType);
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr,readData);
}
}
});
std::thread write([this, &stream, cinfo] {
while(!cinfo->m_writeQuitFlag) {
WriteData writeData;
if (cinfo->GetPushMsg(writeData)) {
ResponseInfo response;
response.set_result(true);
response.set_datatype(writeData.dataType);
response.set_strvalue(writeData.strValue);
response.set_namekey(writeData.nameKey);
response.set_valuetype((::stream::ResponseInfo_TYPE)writeData.valueType);
stream->Write(response);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
read.join();
write.join();
return Status::OK;
}
void StreamServer::Init() {
}
void StreamServer::Run() {
//检测下线线程
if (!m_checkCloseTd.joinable()) {
m_checkCloseTd = std::thread([this] {
while (!m_checkQuitFlag) {
{
std::lock_guard<std::mutex> lck(m_clientMutex);
bool ischange = false;
for (auto client = m_clientList.begin(); client != m_clientList.end();) {
if (!(*client)->IsConnect()) {
printf("%s 下线了...\n", (*client)->m_clientAddr.c_str());
delete (*client);
client = m_clientList.erase(client);
ischange = true;
}
else
++client;
}
if (ischange) m_machine->UpdateClients(m_clientList);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
}
//监听线程
if (!m_listenTd.joinable()) {
m_listenTd = std::thread([this] {
std::string server_address("0.0.0.0:" + std::to_string(m_port));
//std::cout << "Server listening on " << server_address << std::endl;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
m_server = builder.BuildAndStart();
m_server->Wait();
});
}
}
void StreamServer::Stop() {
m_server->Shutdown();
if (m_listenTd.joinable()) {
m_listenTd.join();
}
{
std::lock_guard<std::mutex> lck(m_clientMutex);
for (auto client = m_clientList.begin(); client != m_clientList.end(); ++client) {
delete (*client);
}
m_clientList.clear();
}
}