#include "StreamClient.h" #include "../utils/LocalAddr.h" StreamClient::StreamClient() : m_localIp("0.0.0.0") , m_port(50010) , m_readQuitFlag(false) , m_writeQuitFlag(false) , m_dataCallBack(nullptr) , m_handlePtr(nullptr){ } StreamClient::~StreamClient() { Stop(); } void StreamClient::Init() { m_localIp = LocalAddr().GetSystemIpAddress(); std::string target_str = m_localIp +":"+ std::to_string(m_port); m_channelTwo = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); m_stubTwo = Stream::NewStub(m_channelTwo); m_connectTd = std::thread([this]() { this->AllStream(); }); } void StreamClient::Stop() { m_readQuitFlag = true; if (m_readTd.joinable()) m_readTd.join(); m_writeQuitFlag = true; if (m_writeTd.joinable()) m_writeTd.join(); if (m_connectTd.joinable()) { m_connectTd.detach(); //用join退不出 } } bool StreamClient::GetPushMsg(WriteData& msg) { std::lock_guard lock(m_msgLock); if (m_msgList.empty()) { return false; } else { msg = m_msgList.front(); m_msgList.pop_front(); return true; } } void StreamClient::PushMsg(const WriteData& msg) { std::lock_guard lock(m_msgLock); m_msgList.emplace_back(msg); } void StreamClient::AllStream() { ClientContext context; std::unique_ptr> stream(m_stubTwo->AllStream(&context)); m_readTd = std::thread([this, &stream] { ResponseInfo readInfo; while (!m_readQuitFlag && stream->Read(&readInfo)) { ReadData readData; readData.dataType = (READTYPE)readInfo.datatype(); for (const ::stream::ParamInfo& it : readInfo.item()) { readData.its.emplace_back(Item{ it.namekey(),it.strvalue(),(DATATYPE)it.valuetype() ,it.context(),it.isenable(),it.isalarm(),it.isshow() ,it.startlayer(),it.endlayer(),it.powder() }); //printf("接收到服务端消息:dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n", // readData.dataType, it.namekey().data(), it.strvalue().c_str(), it.valuetype()); } readData.result = readInfo.result(); if (m_dataCallBack) { m_dataCallBack(m_handlePtr, readData); } } }); m_writeTd = std::thread([this, &stream] { while(!m_writeQuitFlag){ WriteData writeData; if (GetPushMsg(writeData)) { RequestInfo request; request.set_namekey(writeData.nameKey); request.set_datatype(writeData.dataType); request.set_strvalue(writeData.strValue); request.set_valuetype((::stream::TYPE)writeData.valueType); request.set_handletype((::stream::DATAHANDLE)writeData.handleType); for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) { ::stream::ParamInfo* paramInfo = request.add_item(); paramInfo->set_namekey((*wd).nameKey); paramInfo->set_strvalue((*wd).strValue); paramInfo->set_valuetype((stream::TYPE)(*wd).valueType); paramInfo->set_startlayer((*wd).start_layer); paramInfo->set_endlayer((*wd).end_layer); paramInfo->set_powder((*wd).powder); } stream->Write(request); //printf("write code:%d\n", ret); } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } stream->WritesDone(); }); Status status = stream->Finish(); if (!status.ok()) { std::cout << "RPC failed: " << status.error_message() << std::endl; //exit(-1); } } bool StreamClient::GetLayerByIndex(int index, ::stream::ResponseAny* response){ // 初始化 gRPC std::string targetStr = m_localIp + ":" + std::to_string(m_port); std::unique_ptr _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials())); ClientContext context; ::stream::RequestInfo request; request.set_datatype(LAYERDATAREQ); request.set_strvalue(std::to_string(index)); request.set_valuetype((stream::TYPE)iINT); Status status = _stub->Simple(&context, request, response); return status.ok(); } int StreamClient::Request(const WriteData& writeData , ::stream::ResponseAny* response) { // 初始化 gRPC std::string targetStr = m_localIp + ":" + std::to_string(m_port); std::unique_ptr _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials())); ClientContext context; ::stream::RequestInfo request; request.set_datatype(writeData.dataType); request.set_namekey(writeData.nameKey); request.set_strvalue(writeData.strValue); request.set_valuetype((stream::TYPE)writeData.valueType); for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) { ::stream::ParamInfo* paramInfo = request.add_item(); paramInfo->set_namekey((*wd).nameKey); paramInfo->set_strvalue((*wd).strValue); paramInfo->set_valuetype((stream::TYPE)(*wd).valueType); } Status status = _stub->Simple(&context, request, response); return status.ok(); }