NewPrint/PrintClient/zmq/ZmqChatClient.cpp

422 lines
9.6 KiB
C++
Raw Permalink Normal View History

2024-03-05 11:19:31 +08:00
#include "ZmqChatClient.h"
#include <cassert>
#include <algorithm>
using namespace ZmqChatClient;
/**
* @brief CZmqChatClient constructor
*/
CZmqChatClient::CZmqChatClient()
: m_serverRecvPort("5555")
, m_serverSendPort("5556")
, m_flIsRunning(false)
, m_localQuitFlag(false)
, m_externalQuitFlag(m_localQuitFlag)
{
PLOGI << "create CZmqChatClient object";
}
/**
* @brief CZmqChatClient constructor
* @param[in] _extQuitFlag external quit flag. Setting it to 'true' will end send and recv loops
*/
CZmqChatClient::CZmqChatClient(std::atomic<bool> &_extQuitFlag)
: m_serverRecvPort("5555")
, m_serverSendPort("5556")
, m_flIsRunning(false)
, m_localQuitFlag(false)
, m_externalQuitFlag(_extQuitFlag)
{
PLOGI << "create CZmqChatClient object";
}
/**
* @brief Destructor
*
* If client is running - stops client
*/
CZmqChatClient::~CZmqChatClient()
{
PLOGI << "destroy CZmqChatClient object";
stop();
}
/**
* @brief Starts client
*
* Init client parameters and starts send and recv loops
*/
void CZmqChatClient::start()
{
PLOGI << "start client...";
// init client params
PLOGI << "init client params";
init();
//set isRunning flag
m_flIsRunning = true;
// start receive thread
PLOGI << "start receiving thread";
m_receiveThr = std::thread(&CZmqChatClient::startReceiving, this);
//start sending
PLOGI << "start sending";
startSending();
}
/**
* @brief Stops client
*
* Stops send and recv loops, join threads
*/
void CZmqChatClient::stop()
{
PLOGI << "Stopping client...";
if(m_flIsRunning)
{
PLOGI << "Stopping threads...";
if(m_receiveThr.joinable())
{
m_receiveThr.join();
PLOGI << "receiver joined";
}
if(m_sendThr.joinable())
{
m_sendThr.join();
PLOGI << "sender joined";
}
PLOGI << "Ok.";
}
std::cout << "Bye!\n";
}
/**
* @brief init client
*
* Asks user for unput parameters: username, server ip address
* and server receive port.
*/
void CZmqChatClient::init()
{
std::cout << "**************************************************\n"
<< " ZmqClient\n"
<< "**************************************************\n";
/* get userName and check it */
do{
std::cout << "\nEnter your name (leave empty for 'User'): ";
std::getline(std::cin, m_name);
} while(!checkName(m_name) && !m_externalQuitFlag.load());
/* get server ip and check it */
do{
std::cout << "Enter server ip address (leave empty for 'localhost'): ";
std::getline(std::cin, m_ipAddr);
} while(!checkIp(m_ipAddr) && !m_externalQuitFlag.load());
// port chosing on server is not implemented yet...
// /* get server port and check it */
// do{
// std::cout << "Enter server port (leave empty for 5555): ";
// std::getline(std::cin, m_serverRecvPort);
// } while(!checkPort(m_serverRecvPort) && !m_externalQuitFlag.load());
std::cout << "**************************************************\n\n";
}
/**
* @brief Init receiving
*
* Create context and receiving socket,
* sets socket options and connects to server.
* Starts receiving loop
*/
void CZmqChatClient::startReceiving() const
{
/* prepare context and socket */
PLOGI << "[receiving] prepare context and socket";
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
/* set socket options */
PLOGI << "[receiving] set socket options";
socket.setsockopt(ZMQ_RCVTIMEO, g_socketTimeout);
socket.setsockopt(ZMQ_SUBSCRIBE, g_serverFilter.data(), g_serverFilter.size());
/* try connect to server */
PLOGI << "[receiving] try to connect to server...";
try{
std::lock_guard<std::mutex> lock(m_mtx);
socket.connect(formatConnectParam(m_ipAddr, m_serverSendPort));
}
catch( ... )
{
//fail to connect
PLOGI << "...Fail.";
m_localQuitFlag.store(true);//exit
return;
}
PLOGI << "...Ok.";
//start receiving loop
while(!m_localQuitFlag.load() && !m_externalQuitFlag.load())
{
receiveMessage(socket);
}
}
/**
* @brief Init sending
*
* Create context and sending socket,
* sets socket options and connects to server.
* Starts sending loop
*/
void CZmqChatClient::startSending() const
{
/* prepare context and socket */
PLOGI << "[sending] prepare context and socket";
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUSH);
PLOGI << "[sending] set socket options";
socket.setsockopt(ZMQ_SNDTIMEO, g_socketTimeout); //set timeout
/* try connect to server */
PLOGI << "[sending] trye to connect to server...";
try{
std::lock_guard<std::mutex> lock(m_mtx);
socket.connect(formatConnectParam(m_ipAddr, m_serverRecvPort));
}
catch( ... )
{
PLOGI << "...Fail.";
m_localQuitFlag.store(true);
return;
}
PLOGI << "...Ok";
std::string name{}; //username for sending
{
std::lock_guard<std::mutex> lock(m_mtx);
name = m_name;
}
// start send loop
while(!m_localQuitFlag.load() && !m_externalQuitFlag.load())
{
sendMessage(socket, name);
}
}
/**
* @brief get input from std::getline and clear line
*
* @return std::string user input
*/
inline std::string getUserInput()
{
std::string inputStr;
std::getline(std::cin, inputStr);
// delete cin line. // '\033[A' moves cursor up one line
std::cout << "\033[A\33[2K"; // '33[2K' erases the entire line cursor is currently on
return inputStr;
}
/**
* @brief Wait for user message and send in to the server
*
* Gets user message from console, formats it to the package and sends to server
*
* @param[in] _socket socket to send the message
* @param[in] _name username
*/
void CZmqChatClient::sendMessage(zmq::socket_t &_socket, const std::string &_name) const
{
//get message from user
std::string inputStr = getUserInput();
if(!inputStr.empty())
{
PLOGI << "sending: " << inputStr;
//create message
CChatMessage message(_name, inputStr);
// send message to server
try {
_socket.send(message);
} catch ( ... ) {
PLOGI << "send: fail";
return;
}
PLOGI << "send: ok";
}
}
/**
* @brief print CChat message to console
*
* @param[in] _message message to print
*/
inline void displayMessage (CChatMessage &_message)
{
//get data from message
auto [filter, username, message] = _message.getData();
auto br= [](const std::string &inString) { return "[" + inString + "]"; }; // add brackets to string
PLOGI << "received: " << br(filter) << br(username) << br(message);
std::cout << '\r' << br(username) << " > " << message << '\n';
}
/**
* @brief output received messages
*
* Checks ZMQ input queue for messages, and output them to the console
*
* @param[in] _socket socket for receiving messages
*/
void CZmqChatClient::receiveMessage(zmq::socket_t &_socket) const
{
bool rc;
do {
CChatMessage message;
if((rc = _socket.recv(&message, ZMQ_DONTWAIT)) == true)
{
displayMessage(message);
}
} while(rc == true);
usleep(1);
}
/**
* @brief Check user name for validity
*
* @param[in, out] _name [in] - received from user username
* [out] - formatted username
* @return true if input name is valid
* @return false elsewise
*/
bool CZmqChatClient::checkName(std::string &_name) const
{
if(_name.empty())
_name = "User";
if(_name.size() > 25)
{
std::cout << "name must be at least 1 and at max 25 characters long\n";
return false;
}
PLOGI << "user name <- " << _name;
return true;
}
/**
* @brief check input ip string for validity
*
* @param[in, out] _ipaddr input ip address string
* @return true if input is valid
* @return false elsewise
*/
bool CZmqChatClient::checkIp(std::string &_ipaddr) const
{
if(_ipaddr.empty())
{
_ipaddr = "localhost";
}
else
{
std::string ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])";
std::regex ipRegex ("^" + ipRange
+ "\\." + ipRange
+ "\\." + ipRange
+ "\\." + ipRange + "$");
if(!std::regex_match(_ipaddr,ipRegex))
{
std::cout << "invalid ip address.\n";
return false;
}
}
PLOGI << "server ip <- " << _ipaddr;
return true;
}
/**
* @brief check input port for validity ans set
*
* @param[in, out] _port input port
* @return true if port is valid
* @return false elsewise
*/
bool CZmqChatClient::checkPort(std::string &_port) const
{
if(_port.empty())
_port = "5555";
auto portNum = std::stoi(_port);
if(portNum < 1 || portNum > 65536)
{
std::cout << "port must be at least 1 and at max 65535\n";
}
/* set receive port
* recv port = send port + 1 */
m_serverSendPort = std::to_string(std::stoi(_port) + 1);
PLOGI << "server receive port <- " << _port;
PLOGI << "server send port <- " << m_serverSendPort;
return true;
}
/**
* @brief form a connection parametr string
*
* @param[in] _ipaddr server ip address
* @param[in] _port server port
* @return std::string formatted param string
*/
std::string CZmqChatClient::formatConnectParam(const std::string &_ipaddr, const std::string &_port)
{
return ("tcp://" + _ipaddr + ":" + _port);
}
/**
* @brief Construct a new CChatMessage::CChatMessage object from username and message
*
* @param[in] _name username
* @param[in] _message message
*/
CChatMessage::CChatMessage(const std::string &_name, const std::string &_message)
{
std::string message = _name + " " + _message;
this->rebuild(message.size());
memcpy(this->data(), message.data(), message.size());
}
/**
* @brief get data from zmq::message_t
*
* @return CChatMessage::messageDataT tuple od data: filter, username and message
*/
CChatMessage::messageDataT CChatMessage::getData()
{
std::stringstream dataStream( static_cast<char*>(this->data()) ); // convert zmq::message_t to stringstream
dataStream >> m_filter >> m_username; // get filter and username from sstream
dataStream.get(); // remove space
std::getline(dataStream, m_message); //read all left data from sstream
return {m_filter, m_username, m_message};
}