GrpcPrint/PrintC/DataManage/StreamClient.cpp

112 lines
3.0 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "StreamClient.h"
#include "../utils/LocalAddr.h"
StreamClient::StreamClient()
: m_localIp("0.0.0.0")
, m_port(50010)
, m_readQuitFlag(false)
, m_writeQuitFlag(false)
, m_dataCallBack(nullptr)
, m_handlePtr(nullptr){
}
StreamClient::~StreamClient() {
m_readQuitFlag = true;
m_writeQuitFlag = true;
if (m_connectTd.joinable()) {
m_connectTd.join();
}
}
void StreamClient::Init() {
m_localIp = LocalAddr().GetSystemIpAddress();
std::string target_str = m_localIp +":"+ std::to_string(m_port);
m_stubTwo = Stream::NewStub(grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
m_connectTd = std::thread([this]() {
this->AllStream();
});
}
bool StreamClient::GetPushMsg(WriteData& msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
if (m_msgList.empty()) {
return false;
}
else {
msg = (*m_msgList.front());
delete m_msgList.front();
m_msgList.pop_front();
return true;
}
}
void StreamClient::SetPushMsg(WriteData* msg) {
std::lock_guard<std::mutex> lock(m_msgLock);
m_msgList.push_back(msg);
}
void StreamClient::AllStream() {
ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<RequestInfo, ResponseInfo>> stream(m_stubTwo->AllStream(&context));
std::thread reader([this, &stream] {
ResponseInfo readInfo;
while (!m_readQuitFlag && stream->Read(&readInfo)) {
ReadData readData;
readData.dataType = (READTYPE)readInfo.datatype();
std::list<Item> its;
for (const ::stream::ParamInfo& it : readInfo.item()) {
readData.its.emplace_back(Item{ it.namekey(),it.strvalue(),(DATATYPE)it.valuetype()});
printf("接收到服务端消息dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n",
readData.dataType, it.namekey().data(), it.strvalue().c_str(), it.valuetype());
}
readData.result = readInfo.result();
if (m_dataCallBack) {
m_dataCallBack(m_handlePtr, readData);
}
}
});
std::thread writer([this, &stream] {
while(!m_writeQuitFlag){
WriteData writeData;
if (GetPushMsg(writeData)) {
RequestInfo request;
request.set_namekey(writeData.nameKey);
request.set_datatype(writeData.dataType);
request.set_strvalue(writeData.strValue);
request.set_valuetype((::stream::TYPE)writeData.valueType);
stream->Write(request);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
stream->WritesDone();
});
reader.join();
writer.join();
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RPC failed: " << status.error_message() << std::endl;
}
}