2024-03-05 11:19:31 +08:00

91 lines
2.5 KiB
C++

#include "zeromq.h"
#include "../utils/LocalAddr.h"
#include <thread>
ZeroMq::ZeroMq()
: m_serverIp("192.168.0.188")
, m_publishPort(5555)
, m_pushPullPort(5556)
, m_subQuit(false)
, m_pushQuit(false){
m_localIp = LocalAddr().GetSystemIpAddress(); //本地ip
}
ZeroMq::~ZeroMq() {
m_subQuit = true;
if(m_subThread.joinable()) m_subThread.join();
m_pushQuit = true;
if(m_pushThread.joinable()) m_pushThread.join();
}
void ZeroMq::Publish() {
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5555");
while (true) {
std::string message = "12345\n";
zmq::message_t zmqMessage(message.size());
memcpy(zmqMessage.data(), message.c_str(), message.size());
publisher.send(zmqMessage);
}
}
void ZeroMq::Subscribe() {
m_subThread = std::thread([this]() {
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
char addr[64] = { 0 };
sprintf_s(addr, 63, "tcp://%s:%d", m_localIp.c_str(), m_publishPort);
subscriber.connect(addr);
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); //Subscribe to all topics
while (true) {
zmq::message_t message;
subscriber.recv(&message);
std::string messageStr = std::string(static_cast<char*>(message.data()), message.size());
if (m_paramUpdate){
m_paramUpdate(messageStr);
printf("m_paramUpdate:%p\n", m_paramUpdate);
}
//std::cout << "Received Message: " << messageStr << std::endl;
}
});
}
void ZeroMq::Resque() {
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);
char addr[64] = { 0 };
sprintf_s(addr, 63, "tcp://%s:%d", m_localIp.c_str(), m_publishPort);
socket.connect(addr);
LocalAddr local;
std::string localIp = local.GetSystemIpAddress(); //本地ip
while (true) {
std::string message = "Hello, World " + localIp;
std::cout << "Sending request: " << message << std::endl;
zmq::message_t request(message.size());
memcpy(request.data(), message.c_str(), message.size());
socket.send(request);
zmq::message_t reply;
socket.recv(&reply);
std::string response = std::string(static_cast<char*>(reply.data()), reply.size());
std::cout << "Received response: " << response << std::endl;
Sleep(1000);
}
}