#include "zeromq.h" #include "../utils/LocalAddr.h" #include ZeroMq::ZeroMq() : m_serverIp("192.168.0.188") , m_publishPort(5555) , m_pushPullPort(5556) , m_subQuit(false) , m_pushQuit(false){ m_localIp = LocalAddr().GetSystemIpAddress(); //本地ip } ZeroMq::~ZeroMq() { m_subQuit = true; if(m_subThread.joinable()) m_subThread.join(); m_pushQuit = true; if(m_pushThread.joinable()) m_pushThread.join(); } void ZeroMq::Publish() { zmq::context_t context(1); zmq::socket_t publisher(context, ZMQ_PUB); publisher.bind("tcp://*:5555"); while (true) { std::string message = "12345\n"; zmq::message_t zmqMessage(message.size()); memcpy(zmqMessage.data(), message.c_str(), message.size()); publisher.send(zmqMessage); } } void ZeroMq::Subscribe() { m_subThread = std::thread([this]() { zmq::context_t context(1); zmq::socket_t subscriber(context, ZMQ_SUB); char addr[64] = { 0 }; sprintf_s(addr, 63, "tcp://%s:%d", m_localIp.c_str(), m_publishPort); subscriber.connect(addr); subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); //Subscribe to all topics while (true) { zmq::message_t message; subscriber.recv(&message); std::string messageStr = std::string(static_cast(message.data()), message.size()); if (m_paramUpdate){ m_paramUpdate(messageStr); printf("m_paramUpdate:%p\n", m_paramUpdate); } //std::cout << "Received Message: " << messageStr << std::endl; } }); } void ZeroMq::Resque() { zmq::context_t context(1); zmq::socket_t socket(context, ZMQ_REQ); char addr[64] = { 0 }; sprintf_s(addr, 63, "tcp://%s:%d", m_localIp.c_str(), m_publishPort); socket.connect(addr); LocalAddr local; std::string localIp = local.GetSystemIpAddress(); //本地ip while (true) { std::string message = "Hello, World " + localIp; std::cout << "Sending request: " << message << std::endl; zmq::message_t request(message.size()); memcpy(request.data(), message.c_str(), message.size()); socket.send(request); zmq::message_t reply; socket.recv(&reply); std::string response = std::string(static_cast(reply.data()), reply.size()); std::cout << "Received response: " << response << std::endl; Sleep(1000); } }