GrpcPrint/TestClient/DataManage/StreamClient.cpp
2024-05-15 17:59:04 +08:00

145 lines
4.3 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "StreamClient.h"
#include "../utils/LocalAddr.h"
StreamClient::StreamClient()
: m_localIp("0.0.0.0")
, m_port(50010)
, m_readQuitFlag(false)
, m_writeQuitFlag(false)
, m_dataCallBack(nullptr)
, m_handlePtr(nullptr){
}
StreamClient::~StreamClient() {
Stop();
}
void StreamClient::Init() {
m_localIp = LocalAddr().GetSystemIpAddress();
std::string target_str = m_localIp +":"+ std::to_string(m_port);
m_channelTwo = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());
m_stubTwo = Stream::NewStub(m_channelTwo);
m_connectTd = std::thread([this]() {
this->AllStream();
});
}
void StreamClient::Stop() {
m_readQuitFlag = true;
if (m_readTd.joinable()) m_readTd.join();
m_writeQuitFlag = true;
if (m_writeTd.joinable()) m_writeTd.join();
if (m_connectTd.joinable()) {
m_connectTd.detach(); //用join退不出
}
}
bool StreamClient::GetPushMsg(WriteData& msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
if (m_msgList.empty()) {
return false;
}
else {
msg = m_msgList.front();
m_msgList.pop_front();
return true;
}
}
void StreamClient::SetPushMsg(const WriteData& msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
m_msgList.emplace_back(msg);
}
void StreamClient::AllStream() {
ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<RequestInfo, ResponseInfo>> stream(m_stubTwo->AllStream(&context));
m_readTd = std::thread([this, &stream] {
ResponseInfo readInfo;
while (!m_readQuitFlag && stream->Read(&readInfo)) {
ReadData readData;
readData.dataType = (READTYPE)readInfo.datatype();
std::list<Item> its;
for (const ::stream::ParamInfo& it : readInfo.item()) {
readData.its.emplace_back(Item{ it.namekey(),it.strvalue(),(DATATYPE)it.valuetype()});
//printf("接收到服务端消息dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n",
// readData.dataType, it.namekey().data(), it.strvalue().c_str(), it.valuetype());
}
readData.result = readInfo.result();
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr, readData);
}
}
});
m_writeTd = std::thread([this, &stream] {
while(!m_writeQuitFlag){
WriteData writeData;
if (GetPushMsg(writeData)) {
RequestInfo request;
request.set_namekey(writeData.nameKey);
request.set_datatype(writeData.dataType);
request.set_strvalue(writeData.strValue);
request.set_valuetype((::stream::TYPE)writeData.valueType);
stream->Write(request);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
stream->WritesDone();
});
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RPC failed: " << status.error_message() << std::endl;
exit(-1);
}
}
bool StreamClient::GetLayerByIndex(int index, ::stream::ResponseAny* response){
// 初始化 gRPC
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
ClientContext context;
::stream::RequestInfo request;
request.set_datatype(LAYERDATAREQ);
request.set_strvalue(std::to_string(index));
request.set_valuetype((stream::TYPE)iINT);
Status status = _stub->Simple(&context, request, response);
return status.ok();
}
int StreamClient::Request(const WriteData& writeData , ::stream::ResponseAny* response) {
// 初始化 gRPC
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
ClientContext context;
::stream::RequestInfo request;
request.set_datatype(REGISTFUNC);
request.set_namekey(writeData.nameKey);
request.set_strvalue(writeData.strValue);
request.set_valuetype((stream::TYPE)iSTRING);
Status status = _stub->Simple(&context, request, response);
return status.ok();
}