GrpcPrint/TestClient/DataManage/StreamClient.cpp

145 lines
4.3 KiB
C++
Raw Normal View History

2024-05-11 17:43:38 +08:00
#include "StreamClient.h"
#include "../utils/LocalAddr.h"
StreamClient::StreamClient()
: m_localIp("0.0.0.0")
, m_port(50010)
, m_readQuitFlag(false)
, m_writeQuitFlag(false)
, m_dataCallBack(nullptr)
, m_handlePtr(nullptr){
}
StreamClient::~StreamClient() {
Stop();
}
void StreamClient::Init() {
m_localIp = LocalAddr().GetSystemIpAddress();
std::string target_str = m_localIp +":"+ std::to_string(m_port);
m_channelTwo = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());
m_stubTwo = Stream::NewStub(m_channelTwo);
m_connectTd = std::thread([this]() {
this->AllStream();
});
}
void StreamClient::Stop() {
m_readQuitFlag = true;
if (m_readTd.joinable()) m_readTd.join();
m_writeQuitFlag = true;
if (m_writeTd.joinable()) m_writeTd.join();
if (m_connectTd.joinable()) {
m_connectTd.detach(); //用join退不出
}
}
bool StreamClient::GetPushMsg(WriteData& msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
if (m_msgList.empty()) {
return false;
}
else {
msg = m_msgList.front();
m_msgList.pop_front();
return true;
}
}
void StreamClient::SetPushMsg(const WriteData& msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
m_msgList.emplace_back(msg);
}
void StreamClient::AllStream() {
ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<RequestInfo, ResponseInfo>> stream(m_stubTwo->AllStream(&context));
m_readTd = std::thread([this, &stream] {
ResponseInfo readInfo;
while (!m_readQuitFlag && stream->Read(&readInfo)) {
ReadData readData;
readData.dataType = (READTYPE)readInfo.datatype();
std::list<Item> its;
for (const ::stream::ParamInfo& it : readInfo.item()) {
readData.its.emplace_back(Item{ it.namekey(),it.strvalue(),(DATATYPE)it.valuetype()});
//printf("接收到服务端消息dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n",
// readData.dataType, it.namekey().data(), it.strvalue().c_str(), it.valuetype());
}
readData.result = readInfo.result();
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr, readData);
}
}
});
m_writeTd = std::thread([this, &stream] {
while(!m_writeQuitFlag){
WriteData writeData;
if (GetPushMsg(writeData)) {
RequestInfo request;
request.set_namekey(writeData.nameKey);
request.set_datatype(writeData.dataType);
request.set_strvalue(writeData.strValue);
request.set_valuetype((::stream::TYPE)writeData.valueType);
stream->Write(request);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
stream->WritesDone();
});
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RPC failed: " << status.error_message() << std::endl;
2024-05-17 10:57:17 +08:00
//exit(-1);
2024-05-11 17:43:38 +08:00
}
}
2024-05-15 17:59:04 +08:00
bool StreamClient::GetLayerByIndex(int index, ::stream::ResponseAny* response){
2024-05-11 17:43:38 +08:00
// 初始化 gRPC
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
ClientContext context;
::stream::RequestInfo request;
request.set_datatype(LAYERDATAREQ);
request.set_strvalue(std::to_string(index));
request.set_valuetype((stream::TYPE)iINT);
Status status = _stub->Simple(&context, request, response);
return status.ok();
2024-05-15 17:59:04 +08:00
}
int StreamClient::Request(const WriteData& writeData , ::stream::ResponseAny* response) {
// 初始化 gRPC
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
ClientContext context;
::stream::RequestInfo request;
2024-05-17 10:57:17 +08:00
request.set_datatype(writeData.dataType);
2024-05-15 17:59:04 +08:00
request.set_namekey(writeData.nameKey);
request.set_strvalue(writeData.strValue);
2024-05-17 10:57:17 +08:00
request.set_valuetype((stream::TYPE)writeData.valueType);
2024-05-15 17:59:04 +08:00
Status status = _stub->Simple(&context, request, response);
return status.ok();
2024-05-11 17:43:38 +08:00
}