diff --git a/basic-logic/and/MyNode.cpp b/basic-logic/and/MyNode.cpp index 0169a806..afec0538 100644 --- a/basic-logic/and/MyNode.cpp +++ b/basic-logic/and/MyNode.cpp @@ -82,9 +82,9 @@ bool MyNode::doAnd() try { std::lock_guard inputGuard(_inputMutex); - for(uint32_t i = 0; i < _inputs.size(); i++) + for(auto& input : _inputs) { - if(!_inputs[i]->booleanValue) return false; + if(!input->booleanValue) return false; } return true; } @@ -104,7 +104,11 @@ void MyNode::input(const Flows::PNodeInfo info, uint32_t index, const Flows::PVa try { if(index >= _inputs.size()) return; - Flows::PVariable& input = message->structValue->at("payload"); + + Flows::PVariable myMessage = std::make_shared(); + *myMessage = *message; + + Flows::PVariable& input = myMessage->structValue->at("payload"); if(input->type != Flows::VariableType::tBoolean) { input->booleanValue = (bool)*input; diff --git a/basic-logic/fallingedge/MyNode.cpp b/basic-logic/fallingedge/MyNode.cpp index 64ce24fc..d1c3e1a8 100644 --- a/basic-logic/fallingedge/MyNode.cpp +++ b/basic-logic/fallingedge/MyNode.cpp @@ -69,7 +69,6 @@ void MyNode::input(Flows::PNodeInfo info, uint32_t index, Flows::PVariable messa output(0, message); } _lastInput = *input; - } catch(const std::exception& ex) { diff --git a/basic-logic/not/MyNode.cpp b/basic-logic/not/MyNode.cpp index bf8fc339..0c9c1ade 100644 --- a/basic-logic/not/MyNode.cpp +++ b/basic-logic/not/MyNode.cpp @@ -44,7 +44,10 @@ void MyNode::input(const Flows::PNodeInfo info, uint32_t index, const Flows::PVa { try { - Flows::PVariable& input = message->structValue->at("payload"); + Flows::PVariable myMessage = std::make_shared(); + *myMessage = *message; + + Flows::PVariable& input = myMessage->structValue->at("payload"); if(input->type != Flows::VariableType::tBoolean) { input->booleanValue = (bool)*input; diff --git a/basic-logic/or/MyNode.cpp b/basic-logic/or/MyNode.cpp index 6efdc584..4f8dbddf 100644 --- a/basic-logic/or/MyNode.cpp +++ b/basic-logic/or/MyNode.cpp @@ -104,7 +104,11 @@ void MyNode::input(const Flows::PNodeInfo info, uint32_t index, const Flows::PVa try { if(index >= _inputs.size()) return; - Flows::PVariable& input = message->structValue->at("payload"); + + Flows::PVariable myMessage = std::make_shared(); + *myMessage = *message; + + Flows::PVariable& input = myMessage->structValue->at("payload"); if(input->type != Flows::VariableType::tBoolean) { input->booleanValue = (bool)*input; diff --git a/http/http-request/MyNode.cpp b/http/http-request/MyNode.cpp index d27d70ce..05763a23 100644 --- a/http/http-request/MyNode.cpp +++ b/http/http-request/MyNode.cpp @@ -222,13 +222,14 @@ void MyNode::input(const Flows::PNodeInfo info, uint32_t index, const Flows::PVa { std::string postRequest = "POST " + _path + " HTTP/1.1\r\nUser-Agent: Homegear\r\nHost: " + _hostname + ":" + std::to_string(_port) + "\r\n" + _basicAuth + "Connection: Close\r\nContent-Length: " + std::to_string(content.size()) + "\r\n\r\n"; postRequest.insert(postRequest.end(), content.begin(), content.end()); + _httpClient->sendRequest(postRequest, result); } else if(_method == "PATCH") { std::string patchRequest = "PATCH " + _path + " HTTP/1.1\r\nUser-Agent: Homegear\r\nHost: " + _hostname + ":" + std::to_string(_port) + "\r\n" + _basicAuth + "Connection: Close\r\nContent-Length: " + std::to_string(content.size()) + "\r\n\r\n"; patchRequest.insert(patchRequest.end(), content.begin(), content.end()); + _httpClient->sendRequest(patchRequest, result); } - _httpClient->get(_path, result); Flows::PVariable message = std::make_shared(Flows::VariableType::tStruct); diff --git a/modbus/modbus-host/Modbus.cpp b/modbus/modbus-host/Modbus.cpp index d2dced0d..04fed600 100644 --- a/modbus/modbus-host/Modbus.cpp +++ b/modbus/modbus-host/Modbus.cpp @@ -29,14 +29,17 @@ #include "Modbus.h" +namespace MyNode +{ + Modbus::Modbus(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings) { - try - { - _bl = bl; + try + { + _bl = bl; _out = output; - _settings = settings; - _started = false; + _settings = settings; + _started = false; _connected = false; BaseLib::Modbus::ModbusInfo modbusInfo; @@ -50,8 +53,8 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptr info = std::make_shared(); info->newData = false; - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->invert = std::get<2>(element); info->buffer1.resize(info->count, 0); @@ -63,8 +66,8 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptr info = std::make_shared(); info->newData = false; - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->invert = std::get<2>(element); info->readOnConnect = std::get<3>(element); @@ -76,8 +79,8 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptr info = std::make_shared(); info->newData = false; - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->invert = std::get<2>(element); info->buffer1.resize(info->count, 0); @@ -89,8 +92,8 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptr info = std::make_shared(); info->newData = false; - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->byteCount = info->count / 8 + (info->count % 8 == 0 ? 0 : 1); info->buffer1.resize(info->byteCount, 0); @@ -102,8 +105,8 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptr info = std::make_shared(); info->newData = false; - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->byteCount = info->count / 8 + (info->count % 8 == 0 ? 0 : 1); info->readOnConnect = std::get<2>(element); @@ -114,114 +117,114 @@ Modbus::Modbus(std::shared_ptr bl, std::shared_ptrreadDiscreteInputs) { std::shared_ptr info = std::make_shared(); - info->start = (uint32_t)std::get<0>(element); - info->end = (uint32_t)std::get<1>(element); + info->start = (uint32_t) std::get<0>(element); + info->end = (uint32_t) std::get<1>(element); info->count = info->end - info->start + 1; info->byteCount = info->count / 8 + (info->count % 8 == 0 ? 0 : 1); info->buffer1.resize(info->byteCount, 0); info->buffer2.resize(info->byteCount, 0); _readDiscreteInputs.emplace_back(info); } - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } Modbus::~Modbus() { - try - { - waitForStop(); + try + { + waitForStop(); _modbus.reset(); - _bl.reset(); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + _bl.reset(); + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::start() { - try - { - if(_started) return; - _started = true; - - _bl->threadManager.start(_listenThread, true, &Modbus::listen, this); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + try + { + if(_started) return; + _started = true; + + _bl->threadManager.start(_listenThread, true, &Modbus::listen, this); + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::stop() { - try - { - _started = false; - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + try + { + _started = false; + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::waitForStop() { - try - { - _started = false; - _bl->threadManager.join(_listenThread); - disconnect(); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + try + { + _started = false; + _bl->threadManager.join(_listenThread); + disconnect(); + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::readWriteRegister(std::shared_ptr& info) @@ -320,18 +323,18 @@ void Modbus::listen() int64_t endTime; int64_t timeToSleep; - while(_started) - { - try - { - if(!_modbus->isConnected()) - { - if(!_started) return; - connect(); - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - if(!_started) return; - continue; - } + while(_started) + { + try + { + if(!_modbus->isConnected()) + { + if(!_started) return; + connect(); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + if(!_started) return; + continue; + } std::list> registers; { @@ -393,7 +396,7 @@ void Modbus::listen() break; } - if (!std::equal(registerElement->buffer2.begin(), registerElement->buffer2.end(), registerElement->buffer1.begin())) + if(!std::equal(registerElement->buffer2.begin(), registerElement->buffer2.end(), registerElement->buffer1.begin())) { registerElement->buffer1 = registerElement->buffer2; @@ -401,19 +404,19 @@ void Modbus::listen() std::vector destinationData2; std::unordered_map data; - for (auto& node : registerElement->nodes) + for(auto& node : registerElement->nodes) { destinationData.clear(); destinationData.insert(destinationData.end(), registerElement->buffer1.begin() + (node.startRegister - registerElement->start), registerElement->buffer1.begin() + (node.startRegister - registerElement->start) + node.count); destinationData2.resize(destinationData.size() * 2); - if (node.invertRegisters) + if(node.invertRegisters) { - for (uint32_t i = 0; i < destinationData.size(); i++) + for(uint32_t i = 0; i < destinationData.size(); i++) { - if (registerElement->invert) + if(registerElement->invert) { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] >> 8; destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] & 0xFF; @@ -421,15 +424,17 @@ void Modbus::listen() else { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] & 0xFF; - destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] >> 8; + destinationData2[(i * 2) + 1] = + destinationData[destinationData.size() - i - 1] >> 8; } } else { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] & 0xFF; - destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] >> 8; + destinationData2[(i * 2) + 1] = + destinationData[destinationData.size() - i - 1] >> 8; } else { @@ -441,11 +446,11 @@ void Modbus::listen() } else { - for (uint32_t i = 0; i < destinationData.size(); i++) + for(uint32_t i = 0; i < destinationData.size(); i++) { - if (registerElement->invert) + if(registerElement->invert) { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[i] >> 8; destinationData2[(i * 2) + 1] = destinationData[i] & 0xFF; @@ -458,7 +463,7 @@ void Modbus::listen() } else { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[i] & 0xFF; destinationData2[(i * 2) + 1] = destinationData[i] >> 8; @@ -474,40 +479,40 @@ void Modbus::listen() Flows::PVariable dataElement = std::make_shared(Flows::VariableType::tArray); dataElement->arrayValue->reserve(4); - dataElement->arrayValue->push_back(std::make_shared((int32_t)ModbusType::tHoldingRegister)); + dataElement->arrayValue->push_back(std::make_shared((int32_t) ModbusType::tHoldingRegister)); dataElement->arrayValue->push_back(std::make_shared(node.startRegister)); dataElement->arrayValue->push_back(std::make_shared(node.count)); dataElement->arrayValue->push_back(std::make_shared(destinationData2)); auto dataIterator = data.find(node.id); - if (dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); + if(dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); else dataIterator->second->arrayValue->push_back(dataElement); } Flows::PArray parameters = std::make_shared(); parameters->push_back(std::make_shared()); - for (auto& element : data) + for(auto& element : data) { parameters->at(0) = element.second; _invoke(element.first, "packetReceived", parameters, false); } } - if (_settings->delay > 0) + if(_settings->delay > 0) { - if (_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); + if(_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); else { int32_t maxIndex = _settings->delay / 1000; int32_t rest = _settings->delay % 1000; - for (int32_t i = 0; i < maxIndex; i++) + for(int32_t i = 0; i < maxIndex; i++) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (!_started) break; + if(!_started) break; } - if (!_started) break; - if (rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); + if(!_started) break; + if(rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); } - if (!_started) break; + if(!_started) break; } } if(!_modbus->isConnected()) continue; @@ -531,7 +536,7 @@ void Modbus::listen() break; } - if (!std::equal(registerElement->buffer2.begin(), registerElement->buffer2.end(), registerElement->buffer1.begin())) + if(!std::equal(registerElement->buffer2.begin(), registerElement->buffer2.end(), registerElement->buffer1.begin())) { registerElement->buffer1 = registerElement->buffer2; @@ -539,19 +544,19 @@ void Modbus::listen() std::vector destinationData2; std::unordered_map data; - for (auto& node : registerElement->nodes) + for(auto& node : registerElement->nodes) { destinationData.clear(); destinationData.insert(destinationData.end(), registerElement->buffer1.begin() + (node.startRegister - registerElement->start), registerElement->buffer1.begin() + (node.startRegister - registerElement->start) + node.count); destinationData2.resize(destinationData.size() * 2); - if (node.invertRegisters) + if(node.invertRegisters) { - for (uint32_t i = 0; i < destinationData.size(); i++) + for(uint32_t i = 0; i < destinationData.size(); i++) { - if (registerElement->invert) + if(registerElement->invert) { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] >> 8; destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] & 0xFF; @@ -559,15 +564,17 @@ void Modbus::listen() else { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] & 0xFF; - destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] >> 8; + destinationData2[(i * 2) + 1] = + destinationData[destinationData.size() - i - 1] >> 8; } } else { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[destinationData.size() - i - 1] & 0xFF; - destinationData2[(i * 2) + 1] = destinationData[destinationData.size() - i - 1] >> 8; + destinationData2[(i * 2) + 1] = + destinationData[destinationData.size() - i - 1] >> 8; } else { @@ -579,11 +586,11 @@ void Modbus::listen() } else { - for (uint32_t i = 0; i < destinationData.size(); i++) + for(uint32_t i = 0; i < destinationData.size(); i++) { - if (registerElement->invert) + if(registerElement->invert) { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[i] >> 8; destinationData2[(i * 2) + 1] = destinationData[i] & 0xFF; @@ -596,7 +603,7 @@ void Modbus::listen() } else { - if (node.invertBytes) + if(node.invertBytes) { destinationData2[i * 2] = destinationData[i] & 0xFF; destinationData2[(i * 2) + 1] = destinationData[i] >> 8; @@ -612,40 +619,40 @@ void Modbus::listen() Flows::PVariable dataElement = std::make_shared(Flows::VariableType::tArray); dataElement->arrayValue->reserve(4); - dataElement->arrayValue->push_back(std::make_shared((int32_t)ModbusType::tInputRegister)); + dataElement->arrayValue->push_back(std::make_shared((int32_t) ModbusType::tInputRegister)); dataElement->arrayValue->push_back(std::make_shared(node.startRegister)); dataElement->arrayValue->push_back(std::make_shared(node.count)); dataElement->arrayValue->push_back(std::make_shared(destinationData2)); auto dataIterator = data.find(node.id); - if (dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); + if(dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); else dataIterator->second->arrayValue->push_back(dataElement); } Flows::PArray parameters = std::make_shared(); parameters->push_back(std::make_shared()); - for (auto& element : data) + for(auto& element : data) { parameters->at(0) = element.second; _invoke(element.first, "packetReceived", parameters, false); } } - if (_settings->delay > 0) + if(_settings->delay > 0) { - if (_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); + if(_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); else { int32_t maxIndex = _settings->delay / 1000; int32_t rest = _settings->delay % 1000; - for (int32_t i = 0; i < maxIndex; i++) + for(int32_t i = 0; i < maxIndex; i++) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (!_started) break; + if(!_started) break; } - if (!_started) break; - if (rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); + if(!_started) break; + if(rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); } - if (!_started) break; + if(!_started) break; } } if(!_modbus->isConnected()) continue; @@ -711,53 +718,53 @@ void Modbus::listen() break; } - if (!std::equal(coilElement->buffer2.begin(), coilElement->buffer2.end(), coilElement->buffer1.begin())) + if(!std::equal(coilElement->buffer2.begin(), coilElement->buffer2.end(), coilElement->buffer1.begin())) { coilElement->buffer1 = coilElement->buffer2; std::vector destinationData; std::unordered_map data; - for (auto& node : coilElement->nodes) + for(auto& node : coilElement->nodes) { destinationData = BaseLib::BitReaderWriter::getPosition(coilElement->buffer1, node.startRegister - coilElement->start, node.count); Flows::PVariable dataElement = std::make_shared(Flows::VariableType::tArray); dataElement->arrayValue->reserve(4); - dataElement->arrayValue->push_back(std::make_shared((int32_t)ModbusType::tCoil)); + dataElement->arrayValue->push_back(std::make_shared((int32_t) ModbusType::tCoil)); dataElement->arrayValue->push_back(std::make_shared(node.startRegister)); dataElement->arrayValue->push_back(std::make_shared(node.count)); dataElement->arrayValue->push_back(std::make_shared(destinationData)); auto dataIterator = data.find(node.id); - if (dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); + if(dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); else dataIterator->second->arrayValue->push_back(dataElement); } Flows::PArray parameters = std::make_shared(); parameters->push_back(std::make_shared()); - for (auto& element : data) + for(auto& element : data) { parameters->at(0) = element.second; _invoke(element.first, "packetReceived", parameters, false); } } - if (_settings->delay > 0) + if(_settings->delay > 0) { - if (_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); + if(_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); else { int32_t maxIndex = _settings->delay / 1000; int32_t rest = _settings->delay % 1000; - for (int32_t i = 0; i < maxIndex; i++) + for(int32_t i = 0; i < maxIndex; i++) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (!_started) break; + if(!_started) break; } - if (!_started) break; - if (rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); + if(!_started) break; + if(rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); } - if (!_started) break; + if(!_started) break; } } if(!_modbus->isConnected()) continue; @@ -781,53 +788,53 @@ void Modbus::listen() break; } - if (!std::equal(discreteInputElement->buffer2.begin(), discreteInputElement->buffer2.end(), discreteInputElement->buffer1.begin())) + if(!std::equal(discreteInputElement->buffer2.begin(), discreteInputElement->buffer2.end(), discreteInputElement->buffer1.begin())) { discreteInputElement->buffer1 = discreteInputElement->buffer2; std::vector destinationData; std::unordered_map data; - for (auto& node : discreteInputElement->nodes) + for(auto& node : discreteInputElement->nodes) { destinationData = BaseLib::BitReaderWriter::getPosition(discreteInputElement->buffer1, node.startRegister - discreteInputElement->start, node.count); Flows::PVariable dataElement = std::make_shared(Flows::VariableType::tArray); dataElement->arrayValue->reserve(4); - dataElement->arrayValue->push_back(std::make_shared((int32_t)ModbusType::tDiscreteInput)); + dataElement->arrayValue->push_back(std::make_shared((int32_t) ModbusType::tDiscreteInput)); dataElement->arrayValue->push_back(std::make_shared(node.startRegister)); dataElement->arrayValue->push_back(std::make_shared(node.count)); dataElement->arrayValue->push_back(std::make_shared(destinationData)); auto dataIterator = data.find(node.id); - if (dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); + if(dataIterator == data.end() || !dataIterator->second) data.emplace(node.id, std::make_shared(Flows::PArray(new Flows::Array({dataElement})))); else dataIterator->second->arrayValue->push_back(dataElement); } Flows::PArray parameters = std::make_shared(); parameters->push_back(std::make_shared()); - for (auto& element : data) + for(auto& element : data) { parameters->at(0) = element.second; _invoke(element.first, "packetReceived", parameters, false); } } - if (_settings->delay > 0) + if(_settings->delay > 0) { - if (_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); + if(_settings->delay < 1000) std::this_thread::sleep_for(std::chrono::milliseconds(_settings->delay)); else { int32_t maxIndex = _settings->delay / 1000; int32_t rest = _settings->delay % 1000; - for (int32_t i = 0; i < maxIndex; i++) + for(int32_t i = 0; i < maxIndex; i++) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (!_started) break; + if(!_started) break; } - if (!_started) break; - if (rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); + if(!_started) break; + if(rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); } - if (!_started) break; + if(!_started) break; } } if(!_modbus->isConnected()) continue; @@ -850,20 +857,20 @@ void Modbus::listen() if(rest > 0) std::this_thread::sleep_for(std::chrono::milliseconds(rest)); } startTime = BaseLib::HelperFunctions::getTimeMicroseconds(); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } - } + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } + } } void Modbus::setConnectionState(bool connected) @@ -875,9 +882,9 @@ void Modbus::setConnectionState(bool connected) { std::lock_guard registersGuard(_readRegistersMutex); - for (auto& element : _readRegisters) + for(auto& element : _readRegisters) { - for (auto& node : element->nodes) + for(auto& node : element->nodes) { _invoke(node.id, "setConnectionState", parameters, false); } @@ -886,9 +893,9 @@ void Modbus::setConnectionState(bool connected) { std::lock_guard registersGuard(_writeRegistersMutex); - for (auto& element : _writeRegisters) + for(auto& element : _writeRegisters) { - for (auto& node : element->nodes) + for(auto& node : element->nodes) { _invoke(node.id, "setConnectionState", parameters, false); } @@ -912,7 +919,7 @@ void Modbus::setConnectionState(bool connected) void Modbus::connect() { std::lock_guard modbusGuard(_modbusMutex); - try + try { _modbus->setSlaveId(_settings->slaveId); _modbus->connect(); @@ -962,7 +969,7 @@ void Modbus::connect() } setConnectionState(true); - return; + return; } catch(const std::exception& ex) { @@ -981,30 +988,30 @@ void Modbus::connect() void Modbus::disconnect() { - try - { - std::lock_guard modbusGuard(_modbusMutex); + try + { + std::lock_guard modbusGuard(_modbusMutex); _connected = false; - _modbus->disconnect(); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + _modbus->disconnect(); + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startRegister, uint32_t count, bool invertBytes, bool invertRegisters) { - try - { + try + { NodeInfo info; info.type = type; info.id = node; @@ -1016,9 +1023,9 @@ void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startRegi if(type == ModbusType::tHoldingRegister) { std::lock_guard registersGuard(_readRegistersMutex); - for (auto& element : _readRegisters) + for(auto& element : _readRegisters) { - if (startRegister >= element->start && (startRegister + count - 1) <= element->end) + if(startRegister >= element->start && (startRegister + count - 1) <= element->end) { element->nodes.emplace_back(info); } @@ -1027,9 +1034,9 @@ void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startRegi else if(type == ModbusType::tInputRegister) { std::lock_guard registersGuard(_readInputRegistersMutex); - for (auto& element : _readInputRegisters) + for(auto& element : _readInputRegisters) { - if (startRegister >= element->start && (startRegister + count - 1) <= element->end) + if(startRegister >= element->start && (startRegister + count - 1) <= element->end) { element->nodes.emplace_back(info); } @@ -1039,19 +1046,19 @@ void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startRegi Flows::PArray parameters = std::make_shared(); parameters->push_back(std::make_shared(_modbus->isConnected())); _invoke(parameters->at(0)->stringValue, "setConnectionState", parameters, false); - } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startCoil, uint32_t count) @@ -1067,9 +1074,9 @@ void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startCoil if(type == ModbusType::tCoil) { std::lock_guard registersGuard(_readCoilsMutex); - for (auto& element : _readCoils) + for(auto& element : _readCoils) { - if (startCoil >= element->start && (startCoil + count - 1) <= element->end) + if(startCoil >= element->start && (startCoil + count - 1) <= element->end) { element->nodes.emplace_back(info); } @@ -1078,9 +1085,9 @@ void Modbus::registerNode(std::string& node, ModbusType type, uint32_t startCoil else { std::lock_guard registersGuard(_readDiscreteInputsMutex); - for (auto& element : _readDiscreteInputs) + for(auto& element : _readDiscreteInputs) { - if (startCoil >= element->start && (startCoil + count - 1) <= element->end) + if(startCoil >= element->start && (startCoil + count - 1) <= element->end) { element->nodes.emplace_back(info); } @@ -1140,19 +1147,27 @@ void Modbus::writeRegisters(uint32_t startRegister, uint32_t count, bool invertB if(startRegister >= element->start && (startRegister + count - 1) <= element->end) { element->newData = true; - if (invertRegisters) + if(invertRegisters) { for(uint32_t i = startRegister - element->start; i < (startRegister - element->start) + count; i++) { if(element->invert) { - if(invertBytes) element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t)value[i * 2]) << 8) | value[i * 2 + 1]; - else element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t)value[i * 2 + 1]) << 8) | value[i * 2]; + if(invertBytes) + element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t) value[i * 2]) + << 8) | value[i * 2 + 1]; + else + element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t) value[i * 2 + 1]) + << 8) | value[i * 2]; } else { - if(invertBytes) element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t)value[i * 2 + 1]) << 8) | value[i * 2]; - else element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t)value[i * 2]) << 8) | value[i * 2 + 1]; + if(invertBytes) + element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t) value[i * 2 + 1]) + << 8) | value[i * 2]; + else + element->buffer1[((startRegister - element->start) + count) - i - 1] = (((uint16_t) value[i * 2]) + << 8) | value[i * 2 + 1]; } } } @@ -1162,13 +1177,13 @@ void Modbus::writeRegisters(uint32_t startRegister, uint32_t count, bool invertB { if(element->invert) { - if(invertBytes) element->buffer1[i] = (((uint16_t)value[i * 2]) << 8) | value[i * 2 + 1]; - else element->buffer1[i] = (((uint16_t)value[i * 2 + 1]) << 8) | value[i * 2]; + if(invertBytes) element->buffer1[i] = (((uint16_t) value[i * 2]) << 8) | value[i * 2 + 1]; + else element->buffer1[i] = (((uint16_t) value[i * 2 + 1]) << 8) | value[i * 2]; } else { - if(invertBytes) element->buffer1[i] = (((uint16_t)value[i * 2 + 1]) << 8) | value[i * 2]; - else element->buffer1[i] = (((uint16_t)value[i * 2]) << 8) | value[i * 2 + 1]; + if(invertBytes) element->buffer1[i] = (((uint16_t) value[i * 2 + 1]) << 8) | value[i * 2]; + else element->buffer1[i] = (((uint16_t) value[i * 2]) << 8) | value[i * 2 + 1]; } } } @@ -1232,4 +1247,6 @@ void Modbus::writeCoils(uint32_t startCoil, uint32_t count, bool retry, std::vec { _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); } +} + } \ No newline at end of file diff --git a/modbus/modbus-host/Modbus.h b/modbus/modbus-host/Modbus.h index 6507ed5a..10966f37 100644 --- a/modbus/modbus-host/Modbus.h +++ b/modbus/modbus-host/Modbus.h @@ -36,6 +36,9 @@ #include #include +namespace MyNode +{ + class Modbus { public: @@ -47,37 +50,44 @@ class Modbus tInputRegister = 3 }; - struct ModbusSettings - { - std::string server; - int32_t port = 502; - uint32_t interval = 100; + struct ModbusSettings + { + std::string server; + int32_t port = 502; + uint32_t interval = 100; uint32_t delay = 0; uint8_t slaveId = 255; bool debug = false; - std::vector> readRegisters; - std::vector> readInputRegisters; + std::vector> readRegisters; + std::vector> readInputRegisters; std::vector> writeRegisters; std::vector> readCoils; std::vector> writeCoils; std::vector> readDiscreteInputs; - }; + }; + + Modbus(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings); + + virtual ~Modbus(); + + void start(); + + void stop(); + + void waitForStop(); + + void setInvoke(std::function value) { _invoke.swap(value); } - Modbus(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings); - virtual ~Modbus(); + void registerNode(std::string& node, ModbusType type, uint32_t startRegister, uint32_t count, bool invertBytes, bool invertRegisters); - void start(); - void stop(); - void waitForStop(); + void registerNode(std::string& node, ModbusType type, uint32_t startCoil, uint32_t count); - void setInvoke(std::function value) { _invoke.swap(value); } + void writeRegisters(uint32_t startRegister, uint32_t count, bool invertBytes, bool invertRegisters, bool retry, std::vector& value); - void registerNode(std::string& node, ModbusType type, uint32_t startRegister, uint32_t count, bool invertBytes, bool invertRegisters); - void registerNode(std::string& node, ModbusType type, uint32_t startCoil, uint32_t count); - void writeRegisters(uint32_t startRegister, uint32_t count, bool invertBytes, bool invertRegisters, bool retry, std::vector& value); void writeCoils(uint32_t startCoil, uint32_t count, bool retry, std::vector& value); + private: - struct NodeInfo + struct NodeInfo { ModbusType type = ModbusType::tHoldingRegister; std::string id; @@ -106,7 +116,7 @@ class Modbus uint32_t start = 0; uint32_t end = 0; uint32_t count = 0; - uint32_t byteCount = 0; + uint32_t byteCount = 0; bool readOnConnect = false; std::list nodes; std::vector buffer1; @@ -133,17 +143,17 @@ class Modbus std::vector value; }; - std::shared_ptr _bl; - std::shared_ptr _out; - std::shared_ptr _settings; - std::function _invoke; + std::shared_ptr _bl; + std::shared_ptr _out; + std::shared_ptr _settings; + std::function _invoke; std::mutex _modbusMutex; - std::shared_ptr _modbus; + std::shared_ptr _modbus; std::atomic_bool _connected; - std::thread _listenThread; - std::atomic_bool _started; + std::thread _listenThread; + std::atomic_bool _started; std::mutex _readRegistersMutex; std::list> _readRegisters; std::mutex _writeRegistersMutex; @@ -161,14 +171,23 @@ class Modbus std::mutex _readDiscreteInputsMutex; std::list> _readDiscreteInputs; - Modbus(const Modbus&); - Modbus& operator=(const Modbus&); - void connect(); - void disconnect(); + Modbus(const Modbus&); + + Modbus& operator=(const Modbus&); + + void connect(); + + void disconnect(); + void setConnectionState(bool connected); - void listen(); + + void listen(); + void readWriteRegister(std::shared_ptr& info); + void readWriteCoil(std::shared_ptr& info); }; +} + #endif diff --git a/mqtt/mqtt-broker/Mqtt.cpp b/mqtt/mqtt-broker/Mqtt.cpp index 64fbe10a..3484ef01 100644 --- a/mqtt/mqtt-broker/Mqtt.cpp +++ b/mqtt/mqtt-broker/Mqtt.cpp @@ -29,7 +29,11 @@ #include "Mqtt.h" -Mqtt::Mqtt(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings) : BaseLib::IQueue(bl.get(), 2, 1000) +namespace MyNode +{ + +Mqtt::Mqtt(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings) + : BaseLib::IQueue(bl.get(), 2, 1000) { try { @@ -172,11 +176,11 @@ uint32_t Mqtt::getLength(std::vector packet, uint32_t& lengthBytes) if(pos >= packet.size()) return 0; encodedByte = packet[pos]; lengthBytes++; - value += ((uint32_t)(encodedByte & 127)) * multiplier; + value += ((uint32_t) (encodedByte & 127)) * multiplier; multiplier *= 128; pos++; if(multiplier > 128 * 128 * 128) return 0; - } while ((encodedByte & 128) != 0); + } while((encodedByte & 128) != 0); return value; } @@ -188,7 +192,7 @@ std::vector Mqtt::getLengthBytes(uint32_t length) { char byte = length % 128; length = length / 128; - if (length > 0) byte = byte | 128; + if(length > 0) byte = byte | 128; result.push_back(byte); } while(length > 0); return result; @@ -198,26 +202,26 @@ void Mqtt::printConnectionError(char resultCode) { switch(resultCode) { - case 0: //No error - break; - case 1: - _out->printError("Error: Connection refused. Unacceptable protocol version."); - break; - case 2: - _out->printError("Error: Connection refused. Client identifier rejected. Please change the client identifier in mqtt.conf."); - break; - case 3: - _out->printError("Error: Connection refused. Server unavailable."); - break; - case 4: - _out->printError("Error: Connection refused. Bad username or password."); - break; - case 5: - _out->printError("Error: Connection refused. Unauthorized."); - break; - default: - _out->printError("Error: Connection refused. Unknown error: " + std::to_string(resultCode)); - break; + case 0: //No error + break; + case 1: + _out->printError("Error: Connection refused. Unacceptable protocol version."); + break; + case 2: + _out->printError("Error: Connection refused. Client identifier rejected. Please change the client identifier in mqtt.conf."); + break; + case 3: + _out->printError("Error: Connection refused. Server unavailable."); + break; + case 4: + _out->printError("Error: Connection refused. Bad username or password."); + break; + case 5: + _out->printError("Error: Connection refused. Unauthorized."); + break; + default: + _out->printError("Error: Connection refused. Unknown error: " + std::to_string(resultCode)); + break; } } @@ -329,7 +333,7 @@ void Mqtt::ping() { try { - std::vector ping { (char)0xC0, 0 }; + std::vector ping{(char) 0xC0, 0}; std::vector pong(5); int32_t i = 0; while(_started) @@ -420,12 +424,12 @@ void Mqtt::listen() length = getLength(data, lengthBytes); dataLength = length + lengthBytes + 1; } - if(bytesReceived == (unsigned)bufferMax) + if(bytesReceived == (unsigned) bufferMax) { //Check if packet size is exactly a multiple of bufferMax if(data.size() == dataLength) break; } - } while(bytesReceived == (unsigned)bufferMax || dataLength > data.size()); + } while(bytesReceived == (unsigned) bufferMax || dataLength > data.size()); } catch(BaseLib::SocketClosedException& ex) { @@ -479,10 +483,12 @@ void Mqtt::processData(std::vector& data) { int16_t id = 0; uint8_t type = 0; - if(data.size() == 2 && data.at(0) == (char)0xD0 && data.at(1) == 0) type = 0xD0; + if(data.size() == 2 && data.at(0) == (char) 0xD0 && data.at(1) == 0) type = 0xD0; else if(data.size() == 4 && data[0] == 0x20 && data[1] == 2 && data[2] == 0 && data[3] == 0) type = 0x20; //CONNACK - else if(data.size() == 4 && data[0] == 0x40 && data[1] == 2) id = (((uint16_t)data[2]) << 8) + (uint8_t)data[3]; //PUBACK - else if(data.size() == 5 && data[0] == (char)0x90 && data[1] == 3) id = (((uint16_t)data[2]) << 8) + (uint8_t)data[3]; //SUBACK + else if(data.size() == 4 && data[0] == 0x40 && data[1] == 2) + id = (((uint16_t) data[2]) << 8) + (uint8_t) data[3]; //PUBACK + else if(data.size() == 5 && data[0] == (char) 0x90 && data[1] == 3) + id = (((uint16_t) data[2]) << 8) + (uint8_t) data[3]; //SUBACK if(type != 0) { _requestsByTypeMutex.lock(); @@ -509,7 +515,7 @@ void Mqtt::processData(std::vector& data) { std::shared_ptr request = requestIterator->second; _requestsMutex.unlock(); - if(data[0] == (char)request->getResponseControlByte()) + if(data[0] == (char) request->getResponseControlByte()) { request->response = data; { @@ -555,7 +561,8 @@ void Mqtt::processPublish(std::vector& data) } uint8_t qos = data[0] & 6; bool retain = data[0] & 1; - uint32_t topicLength = 1 + lengthBytes + 2 + (((uint16_t)data[1 + lengthBytes]) << 8) + (uint8_t)data[1 + lengthBytes + 1]; + uint32_t topicLength = 1 + lengthBytes + 2 + (((uint16_t) data[1 + lengthBytes]) + << 8) + (uint8_t) data[1 + lengthBytes + 1]; uint32_t payloadPos = (qos > 0) ? topicLength + 2 : topicLength; if(payloadPos >= data.size()) { @@ -568,7 +575,7 @@ void Mqtt::processPublish(std::vector& data) } else if(qos == 2) { - std::vector puback { 0x40, 2, data[topicLength], data[topicLength + 1] }; + std::vector puback{0x40, 2, data[topicLength], data[topicLength + 1]}; send(puback); } std::string topic(data.data() + (1 + lengthBytes + 2), topicLength - (1 + lengthBytes + 2)); @@ -759,18 +766,18 @@ void Mqtt::reconnectThread() _invoke(node, "setConnectionState", parameters, false); } } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Mqtt::reconnect() @@ -784,18 +791,18 @@ void Mqtt::reconnect() _bl->threadManager.join(_reconnectThread); _bl->threadManager.start(_reconnectThread, true, &Mqtt::reconnectThread, this); } - catch(const std::exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(BaseLib::Exception& ex) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); - } - catch(...) - { - _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); - } + catch(const std::exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(BaseLib::Exception& ex) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__, ex.what()); + } + catch(...) + { + _out->printEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); + } } void Mqtt::connect() @@ -959,7 +966,7 @@ void Mqtt::disconnect() try { _connected = false; - std::vector disconnect = { (char)0xE0, 0 }; + std::vector disconnect = {(char) 0xE0, 0}; if(_socket->connected()) _socket->proofwrite(disconnect); _socket->close(); } @@ -1005,9 +1012,9 @@ void Mqtt::registerNode(std::string& node) std::lock_guard nodesGuard(_nodesMutex); _nodes.emplace(node); - Flows::PArray parameters = std::make_shared(); - parameters->push_back(std::make_shared(_socket && _socket->connected())); - _invoke(node, "setConnectionState", parameters, false); + Flows::PArray parameters = std::make_shared(); + parameters->push_back(std::make_shared(_socket && _socket->connected())); + _invoke(node, "setConnectionState", parameters, false); } catch(const std::exception& ex) { @@ -1214,3 +1221,5 @@ void Mqtt::processQueueEntry(int32_t index, std::shared_ptrprintEx(__FILE__, __LINE__, __PRETTY_FUNCTION__); } } + +} \ No newline at end of file diff --git a/mqtt/mqtt-broker/Mqtt.h b/mqtt/mqtt-broker/Mqtt.h index 43571777..b0f31c5a 100644 --- a/mqtt/mqtt-broker/Mqtt.h +++ b/mqtt/mqtt-broker/Mqtt.h @@ -37,6 +37,9 @@ #include #include +namespace MyNode +{ + class Mqtt : public BaseLib::IQueue { public: @@ -74,16 +77,21 @@ class Mqtt : public BaseLib::IQueue }; Mqtt(std::shared_ptr bl, std::shared_ptr output, std::shared_ptr settings); + virtual ~Mqtt(); void start(); + void stop(); + void waitForStop(); void setInvoke(std::function value) { _invoke.swap(value); } void registerNode(std::string& node); + void registerTopic(std::string& node, std::string& topic); + void unregisterTopic(std::string& node, std::string& topic); /** @@ -93,12 +101,15 @@ class Mqtt : public BaseLib::IQueue * @param payload The MQTT payload. */ void queueMessage(std::string& topic, std::string& payload, bool retain); + private: class QueueEntrySend : public BaseLib::IQueueEntry { public: QueueEntrySend() {} + QueueEntrySend(std::shared_ptr& message) { this->message = message; } + virtual ~QueueEntrySend() {} std::shared_ptr message; @@ -108,7 +119,9 @@ class Mqtt : public BaseLib::IQueue { public: QueueEntryReceived() {} + QueueEntryReceived(std::vector& data) { this->data = data; } + virtual ~QueueEntryReceived() {} std::vector data; @@ -121,9 +134,11 @@ class Mqtt : public BaseLib::IQueue std::condition_variable conditionVariable; bool mutexReady = false; std::vector response; + uint8_t getResponseControlByte() { return _responseControlByte; } Request(uint8_t responseControlByte) { _responseControlByte = responseControlByte; }; + virtual ~Request() {}; private: uint8_t _responseControlByte; @@ -138,6 +153,7 @@ class Mqtt : public BaseLib::IQueue std::vector response; RequestByType() {}; + virtual ~RequestByType() {}; }; @@ -167,15 +183,25 @@ class Mqtt : public BaseLib::IQueue std::map> _requestsByType; Mqtt(const Mqtt&); + Mqtt& operator=(const Mqtt&); + void connect(); + void reconnect(); + void reconnectThread(); + void disconnect(); + void processMessages(); + void processQueueEntry(int32_t index, std::shared_ptr& entry); + std::vector getLengthBytes(uint32_t length); + uint32_t getLength(std::vector packet, uint32_t& lengthBytes); + void printConnectionError(char resultCode); /** @@ -185,16 +211,28 @@ class Mqtt : public BaseLib::IQueue * @param data The data to publish. */ void publish(const std::string& topic, const std::vector& data, bool retain); + void ping(); + void getResponseByType(const std::vector& packet, std::vector& responseBuffer, uint8_t responseType, bool errors = true); + void getResponse(const std::vector& packet, std::vector& responseBuffer, uint8_t responseType, int16_t packetId, bool errors = true); + void listen(); + void processData(std::vector& data); + void processPublish(std::vector& data); + void subscribe(std::string& topic); + void unsubscribe(std::string& topic); + void send(const std::vector& data); + std::string& escapeTopic(std::string& topic); }; +} + #endif