110 lines
3.0 KiB
C++
110 lines
3.0 KiB
C++
#include "StreamClient.h"
|
||
#include "../utils/LocalAddr.h"
|
||
|
||
StreamClient::StreamClient()
|
||
: m_localIp("0.0.0.0")
|
||
, m_port(50010)
|
||
, m_readQuitFlag(false)
|
||
, m_writeQuitFlag(false)
|
||
, m_dataCallBack(nullptr)
|
||
, m_handlePtr(nullptr){
|
||
|
||
|
||
}
|
||
|
||
|
||
StreamClient::~StreamClient() {
|
||
m_readQuitFlag = true;
|
||
m_writeQuitFlag = true;
|
||
|
||
if (m_connectTd.joinable()) {
|
||
m_connectTd.join();
|
||
}
|
||
|
||
}
|
||
|
||
void StreamClient::Init() {
|
||
m_localIp = LocalAddr().GetSystemIpAddress();
|
||
std::string target_str = m_localIp +":"+ std::to_string(m_port);
|
||
m_stubTwo = Stream::NewStub(grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
|
||
|
||
m_connectTd = std::thread([this]() {
|
||
this->AllStream();
|
||
});
|
||
|
||
}
|
||
|
||
bool StreamClient::GetPushMsg(WriteData& msg) {
|
||
std::lock_guard<std::mutex> lock(m_msgLock);
|
||
if (m_msgList.empty()) {
|
||
return false;
|
||
}
|
||
else {
|
||
msg = (*m_msgList.front());
|
||
delete m_msgList.front();
|
||
m_msgList.pop_front();
|
||
return true;
|
||
}
|
||
}
|
||
|
||
|
||
void StreamClient::SetPushMsg(WriteData* msg) {
|
||
std::lock_guard<std::mutex> lock(m_msgLock);
|
||
m_msgList.push_back(msg);
|
||
}
|
||
|
||
|
||
void StreamClient::AllStream() {
|
||
|
||
ClientContext context;
|
||
std::unique_ptr<grpc::ClientReaderWriter<RequestInfo, ResponseInfo>> stream(m_stubTwo->AllStream(&context));
|
||
|
||
std::thread reader([this, &stream] {
|
||
ResponseInfo readInfo;
|
||
while (!m_readQuitFlag && stream->Read(&readInfo)) {
|
||
ReadData readData;
|
||
readData.dataType = (READTYPE)readInfo.datatype();
|
||
readData.nameKey = readInfo.namekey();
|
||
readData.strValue = readInfo.strvalue();
|
||
readData.valueType =(DATATYPE)readInfo.valuetype();
|
||
readData.result = readInfo.result();
|
||
|
||
printf("服务端消息:dataType:%d,nameKey:%s, strvalue:%s,valueType:%d\n",
|
||
readData.dataType, readData.nameKey.c_str(), readInfo.strvalue().c_str(),readData.valueType);
|
||
|
||
if (m_dataCallBack) {
|
||
m_dataCallBack(m_handlePtr, readData);
|
||
}
|
||
}
|
||
});
|
||
|
||
std::thread writer([this, &stream] {
|
||
while(!m_writeQuitFlag){
|
||
WriteData writeData;
|
||
if (GetPushMsg(writeData)) {
|
||
RequestInfo request;
|
||
request.set_namekey("hello server." + std::to_string(std::time(nullptr)));
|
||
request.set_datatype((::stream::RequestInfo_Status)writeData.dataType);
|
||
request.set_strvalue(writeData.strValue);
|
||
request.set_valuetype((::stream::RequestInfo_TYPE)writeData.valueType);
|
||
stream->Write(request);
|
||
}
|
||
else {
|
||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||
}
|
||
|
||
}
|
||
stream->WritesDone();
|
||
});
|
||
|
||
reader.join();
|
||
writer.join();
|
||
|
||
Status status = stream->Finish();
|
||
if (!status.ok()) {
|
||
std::cout << "RPC failed: " << status.error_message() << std::endl;
|
||
}
|
||
}
|
||
|
||
|