2024-05-11 17:43:38 +08:00
|
|
|
|
#include "StreamClient.h"
|
|
|
|
|
#include "../utils/LocalAddr.h"
|
|
|
|
|
|
|
|
|
|
StreamClient::StreamClient()
|
|
|
|
|
: m_localIp("0.0.0.0")
|
|
|
|
|
, m_port(50010)
|
|
|
|
|
, m_readQuitFlag(false)
|
|
|
|
|
, m_writeQuitFlag(false)
|
|
|
|
|
, m_dataCallBack(nullptr)
|
|
|
|
|
, m_handlePtr(nullptr){
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamClient::~StreamClient() {
|
|
|
|
|
Stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void StreamClient::Init() {
|
|
|
|
|
m_localIp = LocalAddr().GetSystemIpAddress();
|
|
|
|
|
std::string target_str = m_localIp +":"+ std::to_string(m_port);
|
|
|
|
|
m_channelTwo = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());
|
|
|
|
|
m_stubTwo = Stream::NewStub(m_channelTwo);
|
|
|
|
|
|
|
|
|
|
m_connectTd = std::thread([this]() {
|
|
|
|
|
this->AllStream();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void StreamClient::Stop() {
|
|
|
|
|
m_readQuitFlag = true;
|
|
|
|
|
if (m_readTd.joinable()) m_readTd.join();
|
|
|
|
|
|
|
|
|
|
m_writeQuitFlag = true;
|
|
|
|
|
if (m_writeTd.joinable()) m_writeTd.join();
|
|
|
|
|
|
|
|
|
|
if (m_connectTd.joinable()) {
|
|
|
|
|
m_connectTd.detach(); //用join退不出
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool StreamClient::GetPushMsg(WriteData& msg) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(m_msgLock);
|
|
|
|
|
if (m_msgList.empty()) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
msg = m_msgList.front();
|
|
|
|
|
m_msgList.pop_front();
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2024-05-30 11:18:10 +08:00
|
|
|
|
void StreamClient::PushMsg(const WriteData& msg) {
|
2024-05-11 17:43:38 +08:00
|
|
|
|
std::lock_guard<std::mutex> lock(m_msgLock);
|
|
|
|
|
m_msgList.emplace_back(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void StreamClient::AllStream() {
|
|
|
|
|
|
|
|
|
|
ClientContext context;
|
|
|
|
|
std::unique_ptr<grpc::ClientReaderWriter<RequestInfo, ResponseInfo>> stream(m_stubTwo->AllStream(&context));
|
|
|
|
|
|
|
|
|
|
m_readTd = std::thread([this, &stream] {
|
|
|
|
|
ResponseInfo readInfo;
|
|
|
|
|
while (!m_readQuitFlag && stream->Read(&readInfo)) {
|
|
|
|
|
ReadData readData;
|
|
|
|
|
readData.dataType = (READTYPE)readInfo.datatype();
|
2024-06-04 14:08:12 +08:00
|
|
|
|
|
2024-05-11 17:43:38 +08:00
|
|
|
|
for (const ::stream::ParamInfo& it : readInfo.item()) {
|
|
|
|
|
readData.its.emplace_back(Item{ it.namekey(),it.strvalue(),(DATATYPE)it.valuetype()});
|
|
|
|
|
//printf("接收到服务端消息:dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n",
|
|
|
|
|
// readData.dataType, it.namekey().data(), it.strvalue().c_str(), it.valuetype());
|
|
|
|
|
}
|
|
|
|
|
readData.result = readInfo.result();
|
|
|
|
|
|
|
|
|
|
if (m_dataCallBack) {
|
|
|
|
|
m_dataCallBack(m_handlePtr, readData);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
m_writeTd = std::thread([this, &stream] {
|
|
|
|
|
while(!m_writeQuitFlag){
|
|
|
|
|
WriteData writeData;
|
|
|
|
|
if (GetPushMsg(writeData)) {
|
|
|
|
|
RequestInfo request;
|
|
|
|
|
request.set_namekey(writeData.nameKey);
|
|
|
|
|
request.set_datatype(writeData.dataType);
|
|
|
|
|
request.set_strvalue(writeData.strValue);
|
|
|
|
|
request.set_valuetype((::stream::TYPE)writeData.valueType);
|
2024-05-30 17:25:23 +08:00
|
|
|
|
request.set_handletype((::stream::DATAHANDLE)writeData.handleType);
|
2024-06-04 14:08:12 +08:00
|
|
|
|
|
|
|
|
|
for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) {
|
|
|
|
|
::stream::ParamInfo* paramInfo = request.add_item();
|
|
|
|
|
paramInfo->set_namekey((*wd).nameKey);
|
|
|
|
|
paramInfo->set_strvalue((*wd).strValue);
|
|
|
|
|
paramInfo->set_valuetype((stream::TYPE)(*wd).valueType);
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-11 17:43:38 +08:00
|
|
|
|
stream->Write(request);
|
2024-06-04 14:08:12 +08:00
|
|
|
|
//printf("write code:%d\n", ret);
|
2024-05-11 17:43:38 +08:00
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
stream->WritesDone();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Status status = stream->Finish();
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
|
std::cout << "RPC failed: " << status.error_message() << std::endl;
|
2024-05-17 10:57:17 +08:00
|
|
|
|
//exit(-1);
|
2024-05-11 17:43:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2024-05-15 17:59:04 +08:00
|
|
|
|
bool StreamClient::GetLayerByIndex(int index, ::stream::ResponseAny* response){
|
2024-05-11 17:43:38 +08:00
|
|
|
|
// 初始化 gRPC
|
|
|
|
|
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
|
|
|
|
|
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
|
|
|
|
|
|
|
|
|
|
ClientContext context;
|
|
|
|
|
::stream::RequestInfo request;
|
|
|
|
|
request.set_datatype(LAYERDATAREQ);
|
|
|
|
|
request.set_strvalue(std::to_string(index));
|
|
|
|
|
request.set_valuetype((stream::TYPE)iINT);
|
|
|
|
|
Status status = _stub->Simple(&context, request, response);
|
|
|
|
|
return status.ok();
|
2024-05-15 17:59:04 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int StreamClient::Request(const WriteData& writeData , ::stream::ResponseAny* response) {
|
|
|
|
|
// 初始化 gRPC
|
|
|
|
|
std::string targetStr = m_localIp + ":" + std::to_string(m_port);
|
|
|
|
|
std::unique_ptr<Stream::Stub> _stub = Stream::NewStub(grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()));
|
|
|
|
|
|
|
|
|
|
ClientContext context;
|
|
|
|
|
::stream::RequestInfo request;
|
2024-05-17 10:57:17 +08:00
|
|
|
|
request.set_datatype(writeData.dataType);
|
2024-05-15 17:59:04 +08:00
|
|
|
|
request.set_namekey(writeData.nameKey);
|
|
|
|
|
request.set_strvalue(writeData.strValue);
|
2024-05-17 10:57:17 +08:00
|
|
|
|
request.set_valuetype((stream::TYPE)writeData.valueType);
|
2024-06-04 17:49:56 +08:00
|
|
|
|
|
|
|
|
|
for (auto wd = writeData.items.begin(); wd != writeData.items.end(); ++wd) {
|
|
|
|
|
::stream::ParamInfo* paramInfo = request.add_item();
|
|
|
|
|
paramInfo->set_namekey((*wd).nameKey);
|
|
|
|
|
paramInfo->set_strvalue((*wd).strValue);
|
|
|
|
|
paramInfo->set_valuetype((stream::TYPE)(*wd).valueType);
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-15 17:59:04 +08:00
|
|
|
|
Status status = _stub->Simple(&context, request, response);
|
|
|
|
|
return status.ok();
|
|
|
|
|
|
2024-05-11 17:43:38 +08:00
|
|
|
|
}
|