diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4a7ebb5c4..6d89d35a6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -245,6 +245,7 @@ jobs: shell: bash run: | export CTEST_OUTPUT_ON_FAILURE=TRUE + export MACOSX_DEPLOYMENT_TARGET=15.0 conan create . --build=missing -pr conan/profiles/macos -o '&:shared=${{ matrix.shared }}' -o '&:with_docs=False' - name: Clean conan cache diff --git a/CMakeLists.txt b/CMakeLists.txt index c1590e5f5..89a419cd5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ set(AGENT_VERSION_MAJOR 2) set(AGENT_VERSION_MINOR 7) set(AGENT_VERSION_PATCH 0) -set(AGENT_VERSION_BUILD 7) +set(AGENT_VERSION_BUILD 8) set(AGENT_VERSION_RC "") # This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 4a921e32d..7c9dca66c 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -171,9 +171,11 @@ set(AGENT_SOURCES # src/parser HEADER_FILE_ONLY "${SOURCE_DIR}/parser/xml_parser.hpp" + "${SOURCE_DIR}/parser/json_parser.hpp" # src/parser SOURCE_FILES_ONLY + "${SOURCE_DIR}/parser/json_parser.cpp" "${SOURCE_DIR}/parser/xml_parser.cpp" # src/pipeline HEADER_FILE_ONLY @@ -328,7 +330,6 @@ find_package(Boost REQUIRED) find_package(LibXml2 REQUIRED) find_package(date REQUIRED) find_package(OpenSSL REQUIRED) -find_package(nlohmann_json REQUIRED) find_package(mqtt_cpp REQUIRED) find_package(RapidJSON REQUIRED) @@ -403,7 +404,7 @@ target_link_libraries( agent_lib PUBLIC boost::boost LibXml2::LibXml2 date::date openssl::openssl - nlohmann_json::nlohmann_json mqtt_cpp::mqtt_cpp + mqtt_cpp::mqtt_cpp rapidjson BZip2::BZip2 $<$:pthread> diff --git a/src/mtconnect/agent.cpp b/src/mtconnect/agent.cpp index f34205c1a..1f21e2ec3 100644 --- a/src/mtconnect/agent.cpp +++ b/src/mtconnect/agent.cpp @@ -325,7 +325,6 @@ namespace mtconnect { } } - std::lock_guard lock(m_circularBuffer); if (m_circularBuffer.addToBuffer(observation) != 0) { for (auto &sink : m_sinks) diff --git a/src/mtconnect/buffer/checkpoint.cpp b/src/mtconnect/buffer/checkpoint.cpp index 9e39341d2..ebdc1c15f 100644 --- a/src/mtconnect/buffer/checkpoint.cpp +++ b/src/mtconnect/buffer/checkpoint.cpp @@ -154,7 +154,7 @@ namespace mtconnect { } else { - m_observations[id] = dynamic_pointer_cast(obs->getptr()); + m_observations[id] = obs; } } @@ -170,7 +170,7 @@ namespace mtconnect { for (const auto &event : checkpoint.m_observations) { if (!m_filter || m_filter->count(event.first) > 0) - m_observations[event.first] = dynamic_pointer_cast(event.second->getptr()); + m_observations[event.first] = event.second; } } diff --git a/src/mtconnect/buffer/circular_buffer.hpp b/src/mtconnect/buffer/circular_buffer.hpp index 3ac0d475a..171d12923 100644 --- a/src/mtconnect/buffer/circular_buffer.hpp +++ b/src/mtconnect/buffer/circular_buffer.hpp @@ -126,36 +126,43 @@ namespace mtconnect::buffer { if (observation->isOrphan()) return 0; - std::lock_guard lock(m_sequenceLock); - auto dataItem = observation->getDataItem(); - auto seq = m_sequence; - - observation->setSequence(seq); - m_slidingBuffer.push_back(observation); - m_latest.addObservation(observation); + SequenceNumber_t seq; + DataItemPtr dataItem; - // Special case for the first event in the series to prime the first checkpoint. - if (seq == 1) - m_first.addObservation(observation); - else if (m_slidingBuffer.full()) { - observation::ObservationPtr old = m_slidingBuffer.front(); - m_first.addObservation(old); - if (old->getSequence() > 1) - m_firstSequence++; - // assert(old->getSequence() == m_firstSequence); - } + std::lock_guard lock(m_sequenceLock); + dataItem = observation->getDataItem(); + seq = m_sequence; + + observation->setSequence(seq); + m_slidingBuffer.push_back(observation); + m_latest.addObservation(observation); + + // Special case for the first event in the series to prime the first checkpoint. + if (seq == 1) + m_first.addObservation(observation); + else if (m_slidingBuffer.full()) + { + observation::ObservationPtr old = m_slidingBuffer.front(); + m_first.addObservation(old); + if (old->getSequence() > 1) + m_firstSequence++; + // assert(old->getSequence() == m_firstSequence); + } - // Checkpoint management - if (m_checkpointCount > 0 && (seq % m_checkpointFreq) == 0) - { - // Copy the checkpoint from the current into the slot - m_checkpoints.push_back(std::make_unique(m_latest)); - } + // Checkpoint management + if (m_checkpointCount > 0 && (seq % m_checkpointFreq) == 0) + { + // Copy the checkpoint from the current into the slot + m_checkpoints.push_back(std::make_unique(m_latest)); + } - dataItem->signalObservers(m_sequence); + m_sequence++; + } - m_sequence++; + // Signal observers outside the lock to avoid three-deep lock nesting + // (m_sequenceLock -> m_observerMutex -> observer m_mutex) on every write + dataItem->signalObservers(seq); return seq; } diff --git a/src/mtconnect/configuration/agent_config.cpp b/src/mtconnect/configuration/agent_config.cpp index 2bcafde47..0cd97b3a0 100644 --- a/src/mtconnect/configuration/agent_config.cpp +++ b/src/mtconnect/configuration/agent_config.cpp @@ -796,12 +796,12 @@ namespace mtconnect::configuration { catch (boost::property_tree::json_parser::json_parser_error &e) { cerr << "json file error: " << e.what() << " on line " << e.line() << endl; - throw e; + throw; } - catch (std::exception e) + catch (const std::exception &e) { cerr << "could not load config file: " << e.what() << endl; - throw e; + throw; } if (m_logChannels.empty()) diff --git a/src/mtconnect/entity/entity.hpp b/src/mtconnect/entity/entity.hpp index e04bf24b0..8f0699cfc 100644 --- a/src/mtconnect/entity/entity.hpp +++ b/src/mtconnect/entity/entity.hpp @@ -44,12 +44,12 @@ namespace mtconnect { PropertyKey(const char *s) : QName(s) {} /// @brief clears marks for this property - void clearMark() const { const_cast(this)->m_mark = false; } + void clearMark() const { m_mark = false; } /// @brief sets the mark for this property - void setMark() const { const_cast(this)->m_mark = true; } + void setMark() const { m_mark = true; } /// @brief allows factory to track required properties - bool m_mark {false}; + mutable bool m_mark {false}; }; /// @brief properties are a map of PropertyKey to Value @@ -209,7 +209,8 @@ namespace mtconnect { /// @return `VALUE` property Value &getValue() { - static Value null; + thread_local Value null; + null = std::monostate {}; auto p = m_properties.find("VALUE"); if (p != m_properties.end()) return p->second; @@ -260,7 +261,8 @@ namespace mtconnect { /// @returns a reference to the entity list. Returns an empty list if it does not exist. EntityList &getListProperty() { - static EntityList null; + thread_local EntityList null; + null.clear(); auto p = m_properties.find("LIST"); if (p != m_properties.end()) @@ -451,7 +453,8 @@ namespace mtconnect { Value &getProperty_(const std::string &name) { - static Value noValue {std::monostate()}; + thread_local Value noValue {std::monostate()}; + noValue = std::monostate {}; auto it = m_properties.find(name); if (it == m_properties.end()) return noValue; @@ -714,6 +717,11 @@ namespace mtconnect { } } + for (auto &key : removed) + { + m_properties.erase(key); + } + return changed; } diff --git a/src/mtconnect/entity/json_parser.cpp b/src/mtconnect/entity/json_parser.cpp index 638924118..458770074 100644 --- a/src/mtconnect/entity/json_parser.cpp +++ b/src/mtconnect/entity/json_parser.cpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2025, AMT The Association For Manufacturing Technology (AMT) +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,17 +17,18 @@ #include "mtconnect/entity/json_parser.hpp" -#include +#include +#include #include "mtconnect/logging.hpp" using namespace std; -using json = nlohmann::json; +namespace rj = ::rapidjson; namespace mtconnect { namespace entity { - static EntityPtr parseJson(FactoryPtr factory, string entity_name, json jNode, - ErrorList& errors) + static EntityPtr parseJson(FactoryPtr factory, string entity_name, const rj::Value& jNode, + ErrorList& errors, uint32_t version = 1) { auto ef = factory->factoryFor(entity_name); @@ -36,68 +37,116 @@ namespace mtconnect { Properties properties; EntityList* l {nullptr}; - if (ef->isList() && jNode.size() > 0) + auto nodeSize = jNode.IsArray() ? jNode.Size() : jNode.IsObject() ? jNode.MemberCount() : 0; + if (ef->isList() && nodeSize > 0) { l = &properties["LIST"].emplace(); } - for (auto& [key, value] : jNode.items()) + if (jNode.IsObject()) { - string property_key = key; + for (auto it = jNode.MemberBegin(); it != jNode.MemberEnd(); ++it) + { + string property_key = it->name.GetString(); + const auto& value = it->value; - if (key == "value" && !ef->hasRaw()) - property_key = "VALUE"; - else if (key == "value" && ef->hasRaw()) - continue; + if (property_key == "value" && !ef->hasRaw()) + property_key = "VALUE"; + else if (property_key == "value" && ef->hasRaw()) + continue; - if (value.is_string()) - { - properties.insert({property_key, string {value}}); + if (value.IsString()) + { + properties.insert({property_key, string(value.GetString(), value.GetStringLength())}); + } + else if (value.IsNumber()) + { + if (value.IsInt64()) + properties.insert({property_key, value.GetInt64()}); + else if (value.IsUint64()) + properties.insert({property_key, static_cast(value.GetUint64())}); + else + properties.insert({property_key, value.GetDouble()}); + } + else if (value.IsBool()) + { + properties.insert({property_key, value.GetBool()}); + } } - else if (value.is_number()) + + if (ef->hasRaw()) { - if (jNode[key].get() == jNode[key].get()) - properties.insert({property_key, int64_t(value)}); - else - properties.insert({property_key, double(value)}); + auto rawIt = jNode.FindMember("value"); + if (rawIt != jNode.MemberEnd() && rawIt->value.IsString()) + { + properties.insert( + {"RAW", string(rawIt->value.GetString(), rawIt->value.GetStringLength())}); + } } - else if (value.is_boolean()) + else if (version == 2 && l != nullptr) { - properties.insert({property_key, bool(value)}); - } - } - - if (ef->hasRaw()) - { - properties.insert({"RAW", string {jNode["value"]}}); - } - else - { - if (jNode.is_object()) - { - for (auto& [key, value] : jNode.items()) + // V2 format: entity lists are objects with type-name keys containing arrays + // e.g. {"Electric": [{"id":"e1"}], "Heating": [{"id":"h1"}]} + for (auto it = jNode.MemberBegin(); it != jNode.MemberEnd(); ++it) { - auto ent = parseJson(ef, key, value, errors); - if (ent) + string key(it->name.GetString(), it->name.GetStringLength()); + const auto& value = it->value; + + if (value.IsArray()) { - if (ef->isPropertySet(ent->getName())) + for (rj::SizeType i = 0; i < value.Size(); ++i) { - auto res = properties.try_emplace(ent->getName(), EntityList {}); - get(res.first->second).emplace_back(ent); + auto ent = parseJson(ef, key, value[i], errors, version); + if (ent) + { + l->emplace_back(ent); + } + else + { + LOG(debug) << "Unexpected element: " << key; + errors.emplace_back( + new EntityError("Invalid element '" + key + "'", entity_name)); + } } - else + } + } + } + else + { + for (auto it = jNode.MemberBegin(); it != jNode.MemberEnd(); ++it) + { + string key(it->name.GetString(), it->name.GetStringLength()); + const auto& value = it->value; + + if (value.IsObject() || value.IsArray()) + { + auto ent = parseJson(ef, key, value, errors, version); + if (ent) { - properties.insert({ent->getName(), ent}); + if (ef->isPropertySet(ent->getName())) + { + auto res = properties.try_emplace(ent->getName(), EntityList {}); + get(res.first->second).emplace_back(ent); + } + else + { + properties.insert({ent->getName(), ent}); + } } } } } - else if (jNode.is_array() && jNode.size() > 0) + } + else if (jNode.IsArray() && jNode.Size() > 0) + { + for (rj::SizeType i = 0; i < jNode.Size(); ++i) { - for (auto const& i : jNode) + const auto& item = jNode[i]; + if (item.IsObject() && item.MemberCount() > 0) { - auto it = i.begin(); - auto ent = parseJson(ef, it.key(), it.value(), errors); + auto it = item.MemberBegin(); + string key(it->name.GetString(), it->name.GetStringLength()); + auto ent = parseJson(ef, key, it->value, errors, version); if (ent) { if (l != nullptr) @@ -107,13 +156,13 @@ namespace mtconnect { } else { - LOG(debug) << "Unexpected element: " << it.key(); - errors.emplace_back( - new EntityError("Invalid element '" + it.key() + "'", entity_name)); + LOG(debug) << "Unexpected element: " << key; + errors.emplace_back(new EntityError("Invalid element '" + key + "'", entity_name)); } } } } + try { auto entity = ef->make(entity_name, properties, errors); @@ -133,17 +182,27 @@ namespace mtconnect { { NAMED_SCOPE("entity.json_parser"); EntityPtr entity; - auto jsonObj = json::parse(document.c_str()); - auto entity_name = jsonObj.begin().key(); + rj::Document jsonDoc; + jsonDoc.Parse(document.c_str()); - if (jsonObj.size() == 1) + if (jsonDoc.HasParseError()) { - entity = parseJson(factory, entity_name, jsonObj[entity_name], errors); + LOG(error) << "JSON parse error: " << rj::GetParseError_En(jsonDoc.GetParseError()) + << " at offset " << jsonDoc.GetErrorOffset(); + errors.emplace_back(new EntityError("Cannot Parse Document.")); + return nullptr; } - else + + if (!jsonDoc.IsObject() || jsonDoc.MemberCount() != 1) { errors.emplace_back(new EntityError("Cannot Parse Document.")); + return entity; } + + auto it = jsonDoc.MemberBegin(); + string entity_name(it->name.GetString(), it->name.GetStringLength()); + entity = parseJson(factory, entity_name, it->value, errors, m_version); + return entity; } } // namespace entity diff --git a/src/mtconnect/entity/json_parser.hpp b/src/mtconnect/entity/json_parser.hpp index 0e8b117d1..db8d224d9 100644 --- a/src/mtconnect/entity/json_parser.hpp +++ b/src/mtconnect/entity/json_parser.hpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2025, AMT � The Association For Manufacturing Technology (�AMT�) +// Copyright Copyright 2009-2026, AMT � The Association For Manufacturing Technology (�AMT�) // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/mtconnect/entity/requirement.hpp b/src/mtconnect/entity/requirement.hpp index ebacbf824..6e4067028 100644 --- a/src/mtconnect/entity/requirement.hpp +++ b/src/mtconnect/entity/requirement.hpp @@ -267,22 +267,9 @@ namespace mtconnect::entity { Requirement() = default; Requirement(const Requirement &o) = default; + Requirement &operator=(const Requirement &o) = default; ~Requirement() = default; - /// @brief Check if two requires are the same - /// @param o another requirement - /// @return `true` if they are equal - Requirement &operator=(const Requirement &o) - { - m_type = o.m_type; - m_lowerMultiplicity = o.m_lowerMultiplicity; - m_upperMultiplicity = o.m_upperMultiplicity; - m_factory = o.m_factory; - m_matcher = o.m_matcher; - m_size = o.m_size; - return *this; - } - /// @brief gets required state /// @return `true` if property is required bool isRequired() const { return m_lowerMultiplicity > 0; } @@ -340,7 +327,7 @@ namespace mtconnect::entity { catch (PropertyError &e) { e.setProperty(m_name); - throw e; + throw; } return false; } diff --git a/src/mtconnect/entity/xml_parser.cpp b/src/mtconnect/entity/xml_parser.cpp index a07877fd4..60b95c880 100644 --- a/src/mtconnect/entity/xml_parser.cpp +++ b/src/mtconnect/entity/xml_parser.cpp @@ -351,14 +351,14 @@ namespace mtconnect::entity { errors.emplace_back(new EntityError("Cannot parse document")); } - catch (EntityError e) + catch (const EntityError &e) { LOG(error) << "Cannot parse XML document: " << e.what(); errors.emplace_back(e.dup()); entity.reset(); } - catch (XmlError e) + catch (const XmlError &e) { LOG(error) << "Cannot parse XML document: " << e.what(); errors.emplace_back(new EntityError(e.what())); diff --git a/src/mtconnect/entity/xml_printer.cpp b/src/mtconnect/entity/xml_printer.cpp index 66d461886..d8b757b7c 100644 --- a/src/mtconnect/entity/xml_printer.cpp +++ b/src/mtconnect/entity/xml_printer.cpp @@ -42,40 +42,6 @@ namespace mtconnect { return name; } - static inline void addAttributes(xmlTextWriterPtr writer, - const std::map &attributes) - { - for (const auto &attr : attributes) - { - if (!attr.second.empty()) - { - THROW_IF_XML2_ERROR(xmlTextWriterWriteAttribute(writer, BAD_CAST attr.first.c_str(), - BAD_CAST attr.second.c_str())); - } - } - } - - static void addSimpleElement(xmlTextWriterPtr writer, const string &element, const string &body, - const map &attributes = {}, bool raw = false) - { - AutoElement ele(writer, element); - - if (!attributes.empty()) - addAttributes(writer, attributes); - - if (!body.empty()) - { - xmlChar *text = nullptr; - if (!raw) - text = xmlEncodeEntitiesReentrant(nullptr, BAD_CAST body.c_str()); - else - text = BAD_CAST body.c_str(); - THROW_IF_XML2_ERROR(xmlTextWriterWriteRaw(writer, text)); - if (!raw) - xmlFree(text); - } - } - void printDataSet(xmlTextWriterPtr writer, const std::string &name, const DataSet &set) { AutoElement ele(writer); diff --git a/src/mtconnect/observation/change_observer.cpp b/src/mtconnect/observation/change_observer.cpp index 522ec9707..6ee9e2f74 100644 --- a/src/mtconnect/observation/change_observer.cpp +++ b/src/mtconnect/observation/change_observer.cpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,109 +17,11 @@ #include "change_observer.hpp" -#include - -#include -#include - -#include "mtconnect/sink/sink.hpp" +#include "mtconnect/buffer/circular_buffer.hpp" using namespace std; namespace mtconnect::observation { - ChangeObserver::~ChangeObserver() - { - std::lock_guard scopedLock(m_mutex); - clear(); - } - - void ChangeObserver::clear() - { - std::unique_lock lock(m_mutex); - m_timer.cancel(); - m_handler.clear(); - for (const auto signaler : m_signalers) - signaler->removeObserver(this); - m_signalers.clear(); - } - - void ChangeObserver::addSignaler(ChangeSignaler *sig) { m_signalers.emplace_back(sig); } - - bool ChangeObserver::removeSignaler(ChangeSignaler *sig) - { - std::lock_guard scopedLock(m_mutex); - auto newEndPos = std::remove(m_signalers.begin(), m_signalers.end(), sig); - if (newEndPos == m_signalers.end()) - return false; - - m_signalers.erase(newEndPos); - return true; - } - - void ChangeObserver::handler(boost::system::error_code ec) - { - if (m_handler) - boost::asio::post(m_strand, boost::bind(m_handler, ec)); - } - - // Signaler Management - ChangeSignaler::~ChangeSignaler() - { - std::lock_guard lock(m_observerMutex); - - for (const auto observer : m_observers) - observer->removeSignaler(this); - } - - void ChangeSignaler::addObserver(ChangeObserver *observer) - { - std::lock_guard lock(m_observerMutex); - - m_observers.emplace_back(observer); - observer->addSignaler(this); - } - - bool ChangeSignaler::removeObserver(ChangeObserver *observer) - { - std::lock_guard lock(m_observerMutex); - - auto newEndPos = std::remove(m_observers.begin(), m_observers.end(), observer); - if (newEndPos == m_observers.end()) - return false; - - m_observers.erase(newEndPos); - return true; - } - - bool ChangeSignaler::hasObserver(ChangeObserver *observer) const - { - std::lock_guard lock(m_observerMutex); - - auto foundPos = std::find(m_observers.begin(), m_observers.end(), observer); - return foundPos != m_observers.end(); - } - - void ChangeSignaler::signalObservers(uint64_t sequence) const - { - std::lock_guard lock(m_observerMutex); - - for (const auto observer : m_observers) - observer->signal(sequence); - } - - AsyncObserver::AsyncObserver(boost::asio::io_context::strand &strand, - buffer::CircularBuffer &buffer, FilterSet &&filter, - std::chrono::milliseconds interval, - std::chrono::milliseconds heartbeat) - : AsyncResponse(interval), - m_heartbeat(heartbeat), - m_last(std::chrono::system_clock::now()), - m_filter(std::move(filter)), - m_strand(strand), - m_observer(strand), - m_buffer(buffer) - {} - void AsyncObserver::observe(const std::optional &from, Resolver resolver) { using std::placeholders::_1; @@ -155,23 +57,6 @@ namespace mtconnect::observation { m_endOfBuffer = from >= next; } - void AsyncObserver::handlerCompleted() - { - NAMED_SCOPE("AsyncObserver::handlerCompleted"); - - m_last = std::chrono::system_clock::now(); - if (m_endOfBuffer) - { - LOG(trace) << "End of buffer"; - using std::placeholders::_1; - m_observer.waitForSignal(m_heartbeat); - } - else - { - AsyncObserver::handleSignal(boost::system::error_code {}); - } - } - void AsyncObserver::handleSignal(boost::system::error_code ec) { NAMED_SCOPE("AsyncObserver::handleSignal"); diff --git a/src/mtconnect/observation/change_observer.hpp b/src/mtconnect/observation/change_observer.hpp index deb6fc950..0196dbfd4 100644 --- a/src/mtconnect/observation/change_observer.hpp +++ b/src/mtconnect/observation/change_observer.hpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,11 +21,14 @@ #include #include +#include +#include #include #include #include #include "mtconnect/config.hpp" +#include "mtconnect/logging.hpp" #include "mtconnect/utilities.hpp" namespace mtconnect::buffer { @@ -45,19 +48,23 @@ namespace mtconnect::observation { : m_strand(strand), m_timer(strand.context()) {} - virtual ~ChangeObserver(); + // Destructor and clear() defined after ChangeSignaler + ~ChangeObserver(); /// @brief dispatch handler /// /// this is only necessary becase of issue with windows DLLs /// /// @param ec the error code from the callback - void handler(boost::system::error_code ec); + void handler(boost::system::error_code ec) + { + if (m_handler) + boost::asio::post(m_strand, boost::bind(m_handler, ec)); + } /// @brief wait for a signal to occur asynchronously. If it is already signaled, call the /// callback immediately. /// @param duration the duration to wait - /// @param handler the handler to call back /// @return `true` if successful bool waitForSignal(std::chrono::milliseconds duration) { @@ -81,12 +88,12 @@ namespace mtconnect::observation { return true; } - /// @brief wait a period of time where signals will not cancle the timer + /// @brief wait a period of time where signals will not cancel the timer /// @param duration the duration to wait - /// @param handler the handler to call back /// @return `true` if successful bool waitFor(std::chrono::milliseconds duration) { + std::lock_guard lock(m_mutex); m_noCancelOnSignal = true; m_timer.expires_after(duration); using std::placeholders::_1; @@ -96,7 +103,7 @@ namespace mtconnect::observation { return true; } - /// @brief single all waiting observers if this sequence number is greater than the last + /// @brief signal all waiting observers if this sequence number is greater than the last /// /// also cancel the timer /// @param[in] sequence the sequence number of the observation @@ -143,7 +150,7 @@ namespace mtconnect::observation { auto try_lock() { return m_mutex.try_lock(); } ///@} - /// @brief clear the observer information. + /// @brief clear the observer information. Defined after ChangeSignaler. void clear(); private: @@ -151,14 +158,22 @@ namespace mtconnect::observation { mutable std::recursive_mutex m_mutex; boost::asio::steady_timer m_timer; - std::list m_signalers; - volatile uint64_t m_sequence = UINT64_MAX; + std::vector m_signalers; + std::atomic m_sequence {UINT64_MAX}; bool m_noCancelOnSignal {false}; protected: friend class ChangeSignaler; - void addSignaler(ChangeSignaler *sig); - bool removeSignaler(ChangeSignaler *sig); + void addSignaler(ChangeSignaler *sig) + { + std::unique_lock lock(m_mutex); + m_signalers.emplace_back(sig); + } + bool removeSignaler(ChangeSignaler *sig) + { + std::lock_guard lock(m_mutex); + return std::erase(m_signalers, sig) > 0; + } }; /// @brief A signaler of waiting observers @@ -167,28 +182,75 @@ namespace mtconnect::observation { public: /// @brief add an observer to the list /// @param[in] observer an observer - void addObserver(ChangeObserver *observer); + void addObserver(ChangeObserver *observer) + { + std::lock_guard lock(m_observerMutex); + m_observers.emplace_back(observer); + observer->addSignaler(this); + } /// @brief remove an observer /// @param[in] observer an observer /// @return `true` if the observer was removed - bool removeObserver(ChangeObserver *observer); + bool removeObserver(ChangeObserver *observer) + { + std::lock_guard lock(m_observerMutex); + std::erase(m_observers, observer); + return true; + } /// @brief check if an observer is in the list /// @param[in] observer an observer /// @return `true` if the observer is in the list - bool hasObserver(ChangeObserver *observer) const; - /// @brief singal observers with a sequence number + bool hasObserver(ChangeObserver *observer) const + { + std::lock_guard lock(m_observerMutex); + auto foundPos = std::find(m_observers.begin(), m_observers.end(), observer); + return foundPos != m_observers.end(); + } + /// @brief signal observers with a sequence number /// @param[in] sequence the sequence number - void signalObservers(uint64_t sequence) const; + void signalObservers(uint64_t sequence) const + { + std::lock_guard lock(m_observerMutex); + for (const auto observer : m_observers) + observer->signal(sequence); + } - virtual ~ChangeSignaler(); + ~ChangeSignaler() + { + std::lock_guard lock(m_observerMutex); + for (const auto observer : m_observers) + observer->removeSignaler(this); + } protected: // Observer Lists mutable std::recursive_mutex m_observerMutex; - std::list m_observers; + std::vector m_observers; }; - /// @brief Abstract class for things asynchronouos timers + // -- Deferred ChangeObserver method definitions (need complete ChangeSignaler) -- + + inline ChangeObserver::~ChangeObserver() + { + std::lock_guard scopedLock(m_mutex); + clear(); + } + + inline void ChangeObserver::clear() + { + m_timer.cancel(); + decltype(m_signalers) signalers; + { + std::unique_lock lock(m_mutex); + signalers = m_signalers; + } + for (const auto signaler : signalers) + signaler->removeObserver(this); + std::unique_lock lock(m_mutex); + m_signalers.clear(); + } + + /// @brief Abstract class for asynchronous timers class AGENT_LIB_API AsyncResponse : public std::enable_shared_from_this { public: @@ -203,7 +265,7 @@ namespace mtconnect::observation { /// @brief get the request id for webservices const auto &getRequestId() const { return m_requestId; } - /// @brief sets the optonal request id for webservices. + /// @brief sets the optional request id for webservices. void setRequestId(const std::optional &id) { m_requestId = id; } /// @brief Get the interval @@ -211,15 +273,15 @@ namespace mtconnect::observation { protected: std::chrono::milliseconds m_interval { - 0}; //! the minimum amout of time to wait before calling the handler + 0}; //! the minimum amount of time to wait before calling the handler std::optional m_requestId; //! request id }; - /// @brief Asyncronous change context for waiting for changes + /// @brief Asynchronous change context for waiting for changes /// /// This class must be subclassed and provide a fail and isRunning method. /// The caller first calls observe to resolve the filter ids to the signalers. This must be done - /// before the first handlerComplete is called asyncronously. The observer handles calling the + /// before the first handlerComplete is called asynchronously. The observer handles calling the /// handler whenever a new observation is available or the heartbeat has timed out keeping track /// of the sequence number of the last signaled observation or if the observer is still at the end /// of the buffer and nothing is signaled. @@ -236,17 +298,27 @@ namespace mtconnect::observation { using Resolver = std::function; /// @brief create async observer to manage data item callbacks - /// @param contract the sink contract to use to get the buffer information /// @param strand the strand to handle the async actions + /// @param buffer the circular buffer + /// @param filter the data items to observe /// @param interval minimum amount of time to wait for observations /// @param heartbeat maximum amount of time to wait before sending a heartbeat AsyncObserver(boost::asio::io_context::strand &strand, mtconnect::buffer::CircularBuffer &buffer, FilterSet &&filter, - std::chrono::milliseconds interval, std::chrono::milliseconds heartbeat); + std::chrono::milliseconds interval, std::chrono::milliseconds heartbeat) + : AsyncResponse(interval), + m_heartbeat(heartbeat), + m_last(std::chrono::system_clock::now()), + m_filter(std::move(filter)), + m_strand(strand), + m_observer(strand), + m_buffer(buffer) + {} + /// @brief default destructor virtual ~AsyncObserver() = default; - /// @brief Get a shared pointed + /// @brief Get a shared pointer auto getptr() { return std::dynamic_pointer_cast(shared_from_this()); } /// @brief sets up the `ChangeObserver` using the filter and initializes the references to the @@ -260,7 +332,22 @@ namespace mtconnect::observation { /// /// Bound as the completion routine for async writes and actions. Proceeds to the next /// handleObservations. - void handlerCompleted(); + void handlerCompleted() + { + NAMED_SCOPE("AsyncObserver::handlerCompleted"); + + m_last = std::chrono::system_clock::now(); + if (m_endOfBuffer) + { + LOG(trace) << "End of buffer"; + using std::placeholders::_1; + m_observer.waitForSignal(m_heartbeat); + } + else + { + AsyncObserver::handleSignal(boost::system::error_code {}); + } + } /// @brief abstract call to failure handler virtual void fail(boost::beast::http::status status, const std::string &message) = 0; @@ -273,9 +360,6 @@ namespace mtconnect::observation { } /// @brief handler callback when an action needs to be taken - /// - /// @tparam AsyncObserver shared point to this - /// @returns the ending sequence number Handler m_handler; ///@{ @@ -302,7 +386,7 @@ namespace mtconnect::observation { 0}; //! the maximum amount of time to wait before sending a heartbeat std::chrono::system_clock::time_point m_last; //! the last time the handler completed FilterSet m_filter; //! The data items to be observed - boost::asio::io_context::strand m_strand; //! Strand to use for aync dispatch + boost::asio::io_context::strand m_strand; //! Strand to use for async dispatch ChangeObserver m_observer; //! the change observer mtconnect::buffer::CircularBuffer &m_buffer; //! reference to the circular buffer diff --git a/src/mtconnect/observation/observation.hpp b/src/mtconnect/observation/observation.hpp index 9aa52a07c..c55f916ab 100644 --- a/src/mtconnect/observation/observation.hpp +++ b/src/mtconnect/observation/observation.hpp @@ -36,7 +36,7 @@ namespace mtconnect::observation { class Observation; using ObservationPtr = std::shared_ptr; using ConstObservationPtr = std::shared_ptr; - using ObservationList = std::list; + using ObservationList = std::vector; /// @brief Abstract observation class AGENT_LIB_API Observation : public entity::Entity diff --git a/src/mtconnect/parser/json_parser.cpp b/src/mtconnect/parser/json_parser.cpp new file mode 100644 index 000000000..01297fe5e --- /dev/null +++ b/src/mtconnect/parser/json_parser.cpp @@ -0,0 +1,209 @@ +// +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "json_parser.hpp" + +#include +#include +#include +#include +#include +#include + +#include "mtconnect/device_model/device.hpp" +#include "mtconnect/entity/json_parser.hpp" +#include "mtconnect/logging.hpp" + +using namespace std; +namespace rj = ::rapidjson; + +namespace mtconnect::parser { + using namespace device_model; + + list JsonParser::parseFile(const string &filePath) + { + NAMED_SCOPE("json.parser"); + + ifstream ifs(filePath); + if (!ifs.is_open()) + { + LOG(fatal) << "Cannot open JSON file: " << filePath; + throw FatalException("Cannot open JSON file: " + filePath); + } + + stringstream buffer; + buffer << ifs.rdbuf(); + return parseDocument(buffer.str()); + } + + list JsonParser::parseDocument(const string &jsonDoc) + { + NAMED_SCOPE("json.parser"); + + list deviceList; + + rj::Document doc; + doc.Parse(jsonDoc.c_str()); + + if (doc.HasParseError()) + { + LOG(fatal) << "JSON parse error: " << rj::GetParseError_En(doc.GetParseError()) + << " at offset " << doc.GetErrorOffset(); + throw FatalException("Cannot parse JSON document"); + } + + if (!doc.IsObject()) + { + LOG(fatal) << "JSON document root is not an object"; + throw FatalException("JSON document root is not an object"); + } + + // Navigate to MTConnectDevices + auto mtcIt = doc.FindMember("MTConnectDevices"); + if (mtcIt == doc.MemberEnd() || !mtcIt->value.IsObject()) + { + LOG(fatal) << "JSON document does not contain MTConnectDevices"; + throw FatalException("JSON document does not contain MTConnectDevices"); + } + + const auto &mtcDevices = mtcIt->value; + + // Use document jsonVersion if present, otherwise fall back to the default + uint32_t version = m_version; + auto jsonVersionIt = mtcDevices.FindMember("jsonVersion"); + if (jsonVersionIt != mtcDevices.MemberEnd() && jsonVersionIt->value.IsUint()) + { + version = jsonVersionIt->value.GetUint(); + } + + // Extract schema version if present + auto schemaIt = mtcDevices.FindMember("schemaVersion"); + if (schemaIt != mtcDevices.MemberEnd() && schemaIt->value.IsString()) + { + m_schemaVersion.emplace(schemaIt->value.GetString(), schemaIt->value.GetStringLength()); + } + + // Navigate to Devices + auto devicesIt = mtcDevices.FindMember("Devices"); + if (devicesIt == mtcDevices.MemberEnd()) + { + LOG(warning) << "No Devices in JSON document – expecting dynamic configuration"; + return deviceList; + } + + const auto &devices = devicesIt->value; + + if (version == 1) + { + // V1: Devices is an array of objects like [{"Device": {...}}, ...] + if (!devices.IsArray()) + { + LOG(fatal) << "V1 Devices is not an array"; + throw FatalException("V1 Devices is not an array"); + } + + for (rj::SizeType i = 0; i < devices.Size(); ++i) + { + const auto &item = devices[i]; + if (item.IsObject() && item.MemberCount() > 0) + { + // Re-serialize the single device wrapper object for the entity parser + rj::StringBuffer sb; + rj::Writer writer(sb); + item.Accept(writer); + + auto device = parseDevice(string(sb.GetString(), sb.GetLength()), version); + if (device) + deviceList.emplace_back(device); + else + LOG(error) << "Failed to parse device from JSON array element " << i; + } + } + } + else if (version == 2) + { + // V2: Devices is an object like {"Device": [{...}, ...]} + if (!devices.IsObject()) + { + LOG(fatal) << "V2 Devices is not an object"; + throw FatalException("V2 Devices is not an object"); + } + + auto deviceArrayIt = devices.FindMember("Device"); + if (deviceArrayIt != devices.MemberEnd() && deviceArrayIt->value.IsArray()) + { + for (rj::SizeType i = 0; i < deviceArrayIt->value.Size(); ++i) + { + const auto &deviceObj = deviceArrayIt->value[i]; + // Wrap as {"Device": {...}} for the entity parser + rj::StringBuffer sb; + rj::Writer writer(sb); + writer.StartObject(); + writer.Key("Device"); + deviceObj.Accept(writer); + writer.EndObject(); + + auto device = parseDevice(string(sb.GetString(), sb.GetLength()), version); + if (device) + deviceList.emplace_back(device); + else + LOG(error) << "Failed to parse device from JSON v2 array element " << i; + } + } + } + + return deviceList; + } + + DevicePtr JsonParser::parseDevice(const std::string &jsonDoc, uint32_t version) + { + NAMED_SCOPE("json.parser"); + + DevicePtr device; + + try + { + entity::ErrorList errors; + entity::JsonParser parser(version); + auto entity = parser.parse(Device::getRoot(), jsonDoc, "2.3", errors); + + if (!errors.empty()) + { + for (auto &e : errors) + { + LOG(warning) << "Error parsing JSON Device: " << e->what(); + } + } + + if (entity) + { + device = dynamic_pointer_cast(entity); + } + else + { + LOG(error) << "Failed to parse JSON device document"; + } + } + catch (const exception &e) + { + LOG(fatal) << "Cannot parse JSON document: " << e.what(); + throw FatalException(e.what()); + } + + return device; + } +} // namespace mtconnect::parser diff --git a/src/mtconnect/parser/json_parser.hpp b/src/mtconnect/parser/json_parser.hpp new file mode 100644 index 000000000..a9c61bf0f --- /dev/null +++ b/src/mtconnect/parser/json_parser.hpp @@ -0,0 +1,72 @@ +// +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include +#include +#include + +#include "mtconnect/config.hpp" +#include "mtconnect/device_model/device.hpp" + +/// @brief MTConnect Device parser namespace +namespace mtconnect::parser { + /// @brief parse a JSON document and create a list of devices + class AGENT_LIB_API JsonParser + { + public: + /// @brief Constructor + /// @param version the JSON serialization version (1 or 2) + JsonParser(uint32_t version = 1) : m_version(version) {} + virtual ~JsonParser() = default; + + /// @brief Parses a JSON file containing an MTConnectDevices document and returns a list of + /// devices + /// @param[in] filePath the path to the JSON file + /// @returns a list of device pointers + std::list parseFile(const std::string &filePath); + + /// @brief Parses a JSON string containing an MTConnectDevices document and returns a list of + /// devices. Navigates MTConnectDevices/Devices to find device nodes. + /// @param[in] jsonDoc the JSON document string + /// @returns a list of device pointers + std::list parseDocument(const std::string &jsonDoc); + + /// @brief Parses a JSON string containing a single device and returns the device + /// @param[in] jsonDoc the JSON document string wrapping a single Device + /// @returns a shared device pointer if successful + device_model::DevicePtr parseDevice(const std::string &jsonDoc) + { + return parseDevice(jsonDoc, m_version); + } + + /// @brief Parses a JSON string containing a single device and returns the device + /// @param[in] jsonDoc the JSON document string wrapping a single Device + /// @param[in] version the JSON serialization version to use + /// @returns a shared device pointer if successful + device_model::DevicePtr parseDevice(const std::string &jsonDoc, uint32_t version); + + /// @brief get the schema version parsed from the document + /// @return the version if found + const auto &getSchemaVersion() const { return m_schemaVersion; } + + protected: + uint32_t m_version; + std::optional m_schemaVersion; + }; +} // namespace mtconnect::parser diff --git a/src/mtconnect/parser/xml_parser.cpp b/src/mtconnect/parser/xml_parser.cpp index 3b2511c20..5617e3da2 100644 --- a/src/mtconnect/parser/xml_parser.cpp +++ b/src/mtconnect/parser/xml_parser.cpp @@ -230,7 +230,7 @@ namespace mtconnect::parser { xmlXPathFreeObject(devices); xmlXPathFreeContext(xpathCtx); } - catch (string e) + catch (const string &e) { if (devices) xmlXPathFreeObject(devices); @@ -281,7 +281,7 @@ namespace mtconnect::parser { device = dynamic_pointer_cast(entity); } } - catch (string e) + catch (const string &e) { LOG(fatal) << "Cannot parse XML document: " << e; throw FatalException(); @@ -320,10 +320,10 @@ namespace mtconnect::parser { nullptr, XML_PARSE_NOBLANKS)); } - catch (string e) + catch (const string &e) { LOG(fatal) << "Cannot parse XML document: " << e; - throw FatalException(); + throw FatalException("Cannot parse XML document: " + e); } } diff --git a/src/mtconnect/pipeline/transform.hpp b/src/mtconnect/pipeline/transform.hpp index 691e6ff20..e6f9cde45 100644 --- a/src/mtconnect/pipeline/transform.hpp +++ b/src/mtconnect/pipeline/transform.hpp @@ -41,7 +41,7 @@ namespace mtconnect { class Transform; using TransformPtr = std::shared_ptr; - using TransformList = std::list; + using TransformList = std::vector; using ApplyDataItem = std::function; using EachDataItem = std::function; @@ -219,7 +219,7 @@ namespace mtconnect { } /// @brief Binds to the first position in the next list /// @param xform the transform - void firstAfter(TransformPtr xform) { m_next.emplace_front(xform); } + void firstAfter(TransformPtr xform) { m_next.insert(m_next.begin(), xform); } /// @brief Replace one transform with another /// /// Rebinds the new transform replacing the old transform diff --git a/src/mtconnect/printer/json_printer.cpp b/src/mtconnect/printer/json_printer.cpp index c7e699a44..1c9eaf4e3 100644 --- a/src/mtconnect/printer/json_printer.cpp +++ b/src/mtconnect/printer/json_printer.cpp @@ -67,7 +67,7 @@ namespace mtconnect::printer { obj.AddPairs("version", version, "creationTime", getCurrentTime(GMT), "testIndicator", false, "instanceId", instanceId, "sender", hostname, "schemaVersion", schemaVersion); - if (schemaVersion >= "1.7") + if (IntSchemaVersion(schemaVersion) >= SCHEMA_VERSION(1, 7)) obj.AddPairs("deviceModelChangeTime", modelChangeTime); if (bufferSize > 0) obj.AddPairs("bufferSize", bufferSize); diff --git a/src/mtconnect/printer/xml_printer.cpp b/src/mtconnect/printer/xml_printer.cpp index f4b5bafa2..810c5a994 100644 --- a/src/mtconnect/printer/xml_printer.cpp +++ b/src/mtconnect/printer/xml_printer.cpp @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -36,18 +37,7 @@ #include "mtconnect/sink/rest_sink/error.hpp" #include "mtconnect/version.h" #include "xml_printer.hpp" - -#define strfy(line) #line -#define THROW_IF_XML2_ERROR(expr) \ - if ((expr) < 0) \ - { \ - throw string("XML Error at " __FILE__ "(" strfy(__LINE__) "): " #expr); \ - } -#define THROW_IF_XML2_NULL(expr) \ - if (!(expr)) \ - { \ - throw string("XML Error at " __FILE__ "(" strfy(__LINE__) "): " #expr); \ - } +#include "xml_printer_helper.hpp" using namespace std; @@ -56,52 +46,6 @@ namespace mtconnect::printer { using namespace asset; using namespace device_model::configuration; - class AGENT_LIB_API XmlWriter - { - public: - XmlWriter(bool pretty) : m_writer(nullptr), m_buf(nullptr) - { - THROW_IF_XML2_NULL(m_buf = xmlBufferCreate()); - THROW_IF_XML2_NULL(m_writer = xmlNewTextWriterMemory(m_buf, 0)); - if (pretty) - { - THROW_IF_XML2_ERROR(xmlTextWriterSetIndent(m_writer, 1)); - THROW_IF_XML2_ERROR(xmlTextWriterSetIndentString(m_writer, BAD_CAST " ")); - } - } - - ~XmlWriter() - { - if (m_writer != nullptr) - { - xmlFreeTextWriter(m_writer); - m_writer = nullptr; - } - if (m_buf != nullptr) - { - xmlBufferFree(m_buf); - m_buf = nullptr; - } - } - - operator xmlTextWriterPtr() { return m_writer; } - - string getContent() - { - if (m_writer != nullptr) - { - THROW_IF_XML2_ERROR(xmlTextWriterEndDocument(m_writer)); - xmlFreeTextWriter(m_writer); - m_writer = nullptr; - } - return std::string((char *)xmlBufferContent(m_buf), xmlBufferLength(m_buf)); - } - - protected: - xmlTextWriterPtr m_writer; - xmlBufferPtr m_buf; - }; - XmlPrinter::XmlPrinter(bool pretty, bool validation) : Printer(pretty, validation) { NAMED_SCOPE("xml.printer"); @@ -247,103 +191,6 @@ namespace mtconnect::printer { void XmlPrinter::setAssetsStyle(const std::string &style) { m_assetStyle = style; } - static inline void addAttribute(xmlTextWriterPtr writer, const char *key, - const std::string &value) - { - if (!value.empty()) - THROW_IF_XML2_ERROR( - xmlTextWriterWriteAttribute(writer, BAD_CAST key, BAD_CAST value.c_str())); - } - - void addAttributes(xmlTextWriterPtr writer, const std::map &attributes) - { - for (const auto &attr : attributes) - { - if (!attr.second.empty()) - { - THROW_IF_XML2_ERROR(xmlTextWriterWriteAttribute(writer, BAD_CAST attr.first.c_str(), - BAD_CAST attr.second.c_str())); - } - } - } - - static inline void openElement(xmlTextWriterPtr writer, const char *name) - { - THROW_IF_XML2_ERROR(xmlTextWriterStartElement(writer, BAD_CAST name)); - } - - static inline void closeElement(xmlTextWriterPtr writer) - { - THROW_IF_XML2_ERROR(xmlTextWriterEndElement(writer)); - } - - class AGENT_LIB_API AutoElement - { - public: - AutoElement(xmlTextWriterPtr writer) : m_writer(writer) {} - AutoElement(xmlTextWriterPtr writer, const char *name, string key = "") - : m_writer(writer), m_name(name), m_key(std::move(key)) - { - openElement(writer, name); - } - AutoElement(xmlTextWriterPtr writer, const string &name, string key = "") - : m_writer(writer), m_name(name), m_key(std::move(key)) - { - openElement(writer, name.c_str()); - } - bool reset(const string &name, const string &key = "") - { - if (name != m_name || m_key != key) - { - if (!m_name.empty()) - closeElement(m_writer); - if (!name.empty()) - openElement(m_writer, name.c_str()); - m_name = name; - m_key = key; - return true; - } - else - { - return false; - } - } - ~AutoElement() - { - if (!m_name.empty()) - xmlTextWriterEndElement(m_writer); - } - - const string &key() const { return m_key; } - const string &name() const { return m_name; } - - protected: - xmlTextWriterPtr m_writer; - string m_name; - string m_key; - }; - - void addSimpleElement(xmlTextWriterPtr writer, const string &element, const string &body, - const map &attributes = {}, bool raw = false) - { - AutoElement ele(writer, element); - - if (!attributes.empty()) - addAttributes(writer, attributes); - - if (!body.empty()) - { - xmlChar *text = nullptr; - if (!raw) - text = xmlEncodeEntitiesReentrant(nullptr, BAD_CAST body.c_str()); - else - text = BAD_CAST body.c_str(); - THROW_IF_XML2_ERROR(xmlTextWriterWriteRaw(writer, text)); - if (!raw) - xmlFree(text); - } - } - std::string XmlPrinter::printErrors(const uint64_t instanceId, const unsigned int bufferSize, const uint64_t nextSeq, const entity::EntityList &list, bool pretty, const std::optional requestId) const @@ -378,9 +225,9 @@ namespace mtconnect::printer { // Cleanup ret = writer.getContent(); } - catch (string error) + catch (const XmlError &error) { - LOG(error) << "printError: " << error; + LOG(error) << "printError: " << error.what(); } catch (...) { @@ -416,9 +263,9 @@ namespace mtconnect::printer { ret = writer.getContent(); } - catch (string error) + catch (const XmlError &error) { - LOG(error) << "printProbe: " << error; + LOG(error) << "printProbe: " << error.what(); } catch (...) { @@ -440,9 +287,9 @@ namespace mtconnect::printer { ret = writer.getContent(); } - catch (string error) + catch (const XmlError &error) { - LOG(error) << "printProbe: " << error; + LOG(error) << "printProbe: " << error.what(); } catch (...) { @@ -471,7 +318,7 @@ namespace mtconnect::printer { // Sort the vector by category. if (observations.size() > 0) { - observations.sort(ObservationCompare); + std::sort(observations.begin(), observations.end(), ObservationCompare); AutoElement deviceElement(writer); { @@ -522,9 +369,9 @@ namespace mtconnect::printer { ret = writer.getContent(); } - catch (string error) + catch (const XmlError &error) { - LOG(error) << "printSample: " << error; + LOG(error) << "printSample: " << error.what(); } catch (...) { @@ -557,9 +404,9 @@ namespace mtconnect::printer { ret = writer.getContent(); } - catch (string error) + catch (const XmlError &error) { - LOG(error) << "printAssets: " << error; + LOG(error) << "printAssets: " << error.what(); } catch (...) { @@ -618,15 +465,21 @@ namespace mtconnect::printer { if (!style.empty()) { - string pi = R"(xml-stylesheet type="text/xsl" href=")" + style + '"'; + string pi; + pi.reserve(42 + style.size()); + pi.append(R"(xml-stylesheet type="text/xsl" href=")").append(style).append("\""); THROW_IF_XML2_ERROR(xmlTextWriterStartPI(writer, BAD_CAST pi.c_str())); THROW_IF_XML2_ERROR(xmlTextWriterEndPI(writer)); } - string rootName = "MTConnect" + xmlType; + string rootName; + rootName.reserve(10 + xmlType.size()); + rootName.append("MTConnect").append(xmlType); if (!m_schemaVersion) defaultSchemaVersion(); - string xmlns = "urn:mtconnect.org:" + rootName + ":" + *m_schemaVersion; + string xmlns; + xmlns.reserve(19 + rootName.size() + m_schemaVersion->size()); + xmlns.append("urn:mtconnect.org:").append(rootName).append(":").append(*m_schemaVersion); string location; openElement(writer, rootName.c_str()); @@ -646,19 +499,23 @@ namespace mtconnect::printer { // Skip the mtconnect ns (always m) if (ns.first != "m") { - string attr = "xmlns:" + ns.first; + string attr; + attr.reserve(6 + ns.first.size()); + attr.append("xmlns:").append(ns.first); addAttribute(writer, attr.c_str(), ns.second.mUrn); if (location.empty() && !ns.second.mSchemaLocation.empty()) { // Always take the first location. There should only be one location! - location = ns.second.mUrn + " " + ns.second.mSchemaLocation; + location.reserve(ns.second.mUrn.size() + 1 + ns.second.mSchemaLocation.size()); + location.append(ns.second.mUrn).append(" ").append(ns.second.mSchemaLocation); } } else if (!ns.second.mSchemaLocation.empty()) { // This is the mtconnect namespace - mtcLocation = xmlns + " " + ns.second.mSchemaLocation; + mtcLocation.reserve(xmlns.size() + 1 + ns.second.mSchemaLocation.size()); + mtcLocation.append(xmlns).append(" ").append(ns.second.mSchemaLocation); } } @@ -666,8 +523,15 @@ namespace mtconnect::printer { if (location.empty() && !mtcLocation.empty()) location = mtcLocation; else if (location.empty()) - location = xmlns + " http://schemas.mtconnect.org/schemas/" + rootName + "_" + - *m_schemaVersion + ".xsd"; + { + location.reserve(xmlns.size() + 38 + rootName.size() + 1 + m_schemaVersion->size() + 4); + location.append(xmlns) + .append(" http://schemas.mtconnect.org/schemas/") + .append(rootName) + .append("_") + .append(*m_schemaVersion) + .append(".xsd"); + } addAttribute(writer, "xsi:schemaLocation", location); @@ -677,7 +541,7 @@ namespace mtconnect::printer { addAttribute(writer, "creationTime", getCurrentTime(GMT)); addAttribute(writer, "sender", m_senderName); - addAttribute(writer, "instanceId", to_string(instanceId)); + addAttribute(writer, "instanceId", instanceId); if (m_validation) addAttribute(writer, "validation", "true"s); @@ -699,21 +563,21 @@ namespace mtconnect::printer { if (aType == eASSETS || aType == eDEVICES) { - addAttribute(writer, "assetBufferSize", to_string(assetBufferSize)); - addAttribute(writer, "assetCount", to_string(assetCount)); + addAttribute(writer, "assetBufferSize", assetBufferSize); + addAttribute(writer, "assetCount", assetCount); } if (aType == eDEVICES || aType == eERROR || aType == eSTREAMS) { - addAttribute(writer, "bufferSize", to_string(bufferSize)); + addAttribute(writer, "bufferSize", bufferSize); } if (aType == eSTREAMS) { // Add additional attribtues for streams - addAttribute(writer, "nextSequence", to_string(nextSeq)); - addAttribute(writer, "firstSequence", to_string(firstSeq)); - addAttribute(writer, "lastSequence", to_string(lastSeq)); + addAttribute(writer, "nextSequence", nextSeq); + addAttribute(writer, "firstSequence", firstSeq); + addAttribute(writer, "lastSequence", lastSeq); } if (schemaVersion < SCHEMA_VERSION(2, 0) && aType == eDEVICES && count && !count->empty()) diff --git a/src/mtconnect/printer/xml_printer_helper.hpp b/src/mtconnect/printer/xml_printer_helper.hpp index 74a45cf24..68a0bc627 100644 --- a/src/mtconnect/printer/xml_printer_helper.hpp +++ b/src/mtconnect/printer/xml_printer_helper.hpp @@ -17,6 +17,12 @@ #pragma once +#include +#include +#include +#include + +#include #include #include "mtconnect/config.hpp" @@ -156,4 +162,72 @@ namespace mtconnect::printer { std::string m_name; std::string m_key; }; + /// @brief Add a string attribute to the current element + /// @param writer the writer + /// @param key the attribute name + /// @param value the attribute value (empty strings are skipped) + static inline void addAttribute(xmlTextWriterPtr writer, const char *key, + const std::string &value) + { + if (!value.empty()) + THROW_IF_XML2_ERROR( + xmlTextWriterWriteAttribute(writer, BAD_CAST key, BAD_CAST value.c_str())); + } + + /// @brief Add an integral attribute to the current element + /// @param writer the writer + /// @param key the attribute name + /// @param value the integral value + template + requires std::integral static inline void addAttribute(xmlTextWriterPtr writer, + const char *key, T value) + { + auto str = std::format("{}", value); + THROW_IF_XML2_ERROR(xmlTextWriterWriteAttribute(writer, BAD_CAST key, BAD_CAST str.c_str())); + } + + /// @brief Add a map of string attributes to the current element + /// @param writer the writer + /// @param attributes map of key-value attribute pairs + static inline void addAttributes(xmlTextWriterPtr writer, + const std::map &attributes) + { + for (const auto &attr : attributes) + { + if (!attr.second.empty()) + { + THROW_IF_XML2_ERROR(xmlTextWriterWriteAttribute(writer, BAD_CAST attr.first.c_str(), + BAD_CAST attr.second.c_str())); + } + } + } + + /// @brief Write a simple element with optional body text and attributes + /// @param writer the writer + /// @param element name of the element + /// @param body text content of the element + /// @param attributes optional map of attributes + /// @param raw if true, body is written without XML encoding + static inline void addSimpleElement(xmlTextWriterPtr writer, const std::string &element, + const std::string &body, + const std::map &attributes = {}, + bool raw = false) + { + AutoElement ele(writer, element); + + if (!attributes.empty()) + addAttributes(writer, attributes); + + if (!body.empty()) + { + xmlChar *text = nullptr; + if (!raw) + text = xmlEncodeEntitiesReentrant(nullptr, BAD_CAST body.c_str()); + else + text = BAD_CAST body.c_str(); + THROW_IF_XML2_ERROR(xmlTextWriterWriteRaw(writer, text)); + if (!raw) + xmlFree(text); + } + } } // namespace mtconnect::printer diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp index 6dbdd23fc..267efa1ba 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -18,17 +18,20 @@ #include "mqtt_entity_sink.hpp" #include +#include +#include #include -#include - #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/entity/entity.hpp" #include "mtconnect/mqtt/mqtt_client_impl.hpp" #include "mtconnect/observation/observation.hpp" +using namespace std; +using namespace mtconnect; +using namespace mtconnect::mqtt_client; using ptree = boost::property_tree::ptree; -using json = nlohmann::json; +namespace rj = ::rapidjson; using namespace std; using namespace mtconnect; @@ -211,7 +214,7 @@ namespace mtconnect { std::lock_guard lock(m_queueMutex); if (m_queuedObservations.size() >= MAX_QUEUE_SIZE) { - m_queuedObservations.erase(m_queuedObservations.begin()); + m_queuedObservations.pop_front(); } m_queuedObservations.push_back(obsCopy); obsCount++; @@ -270,25 +273,29 @@ namespace mtconnect { else if (holds_alternative(value)) { // For DataSet, return as JSON string - json j = json::object(); + rj::StringBuffer buf; + rj::Writer w(buf); + w.StartObject(); auto& ds = get(value); for (auto& entry : ds) { - // Convert the variant value to appropriate JSON type + w.Key(entry.m_key.c_str(), rj::SizeType(entry.m_key.size())); if (holds_alternative(entry.m_value)) { - j[entry.m_key] = get(entry.m_value); + auto& s = get(entry.m_value); + w.String(s.c_str(), rj::SizeType(s.size())); } else if (holds_alternative(entry.m_value)) { - j[entry.m_key] = get(entry.m_value); + w.Int64(get(entry.m_value)); } else if (holds_alternative(entry.m_value)) { - j[entry.m_key] = get(entry.m_value); + w.Double(get(entry.m_value)); } } - return j.dump(); + w.EndObject(); + return buf.GetString(); } return "UNAVAILABLE"; @@ -297,8 +304,6 @@ namespace mtconnect { std::string MqttEntitySink::formatObservationJson( const observation::ObservationPtr& observation) { - json j; - try { auto dataItem = observation->getDataItem(); @@ -308,46 +313,58 @@ namespace mtconnect { return "{}"; } - j["dataItemId"] = dataItem->getId(); + rj::StringBuffer buf; + rj::Writer w(buf); + w.StartObject(); + + w.Key("dataItemId"); + w.String(dataItem->getId().c_str()); const auto& name = dataItem->getName(); if (name && !name->empty()) { - j["name"] = *name; + w.Key("name"); + w.String(name->c_str()); } - j["type"] = dataItem->getType(); + w.Key("type"); + w.String(dataItem->getType().c_str()); auto subType = dataItem->maybeGet("subType"); if (subType && !subType->empty()) { - j["subType"] = *subType; + w.Key("subType"); + w.String(subType->c_str()); } - j["timestamp"] = formatTimestamp(observation->getTimestamp()); + w.Key("timestamp"); + auto ts = formatTimestamp(observation->getTimestamp()); + w.String(ts.c_str()); // Get the category auto category = dataItem->getCategory(); + w.Key("category"); if (category == device_model::data_item::DataItem::SAMPLE) - { - j["category"] = "SAMPLE"; - } + w.String("SAMPLE"); else if (category == device_model::data_item::DataItem::EVENT) - { - j["category"] = "EVENT"; - } + w.String("EVENT"); else if (category == device_model::data_item::DataItem::CONDITION) - { - j["category"] = "CONDITION"; - } + w.String("CONDITION"); + else + w.String("UNKNOWN"); // Add the result/value - j["result"] = getObservationValue(observation); + w.Key("result"); + auto val = getObservationValue(observation); + w.String(val.c_str()); // Add sequence number - j["sequence"] = observation->getSequence(); + w.Key("sequence"); + w.Int64(observation->getSequence()); - auto result = j.dump(); + w.EndObject(); + + auto result = string(buf.GetString(), buf.GetSize()); LOG(trace) << "Formatted observation JSON: " << result; return result; } @@ -360,72 +377,90 @@ namespace mtconnect { std::string MqttEntitySink::formatConditionJson(const observation::ConditionPtr& condition) { - json j; - auto dataItem = condition->getDataItem(); if (!dataItem) { return "{}"; } - j["dataItemId"] = dataItem->getId(); + rj::StringBuffer buf; + rj::Writer w(buf); + w.StartObject(); + + w.Key("dataItemId"); + w.String(dataItem->getId().c_str()); const auto& name = dataItem->getName(); if (name && !name->empty()) { - j["name"] = *name; + w.Key("name"); + w.String(name->c_str()); } - j["type"] = dataItem->getType(); + w.Key("type"); + w.String(dataItem->getType().c_str()); auto subType = dataItem->maybeGet("subType"); if (subType && !subType->empty()) { - j["subType"] = *subType; + w.Key("subType"); + w.String(subType->c_str()); } - j["timestamp"] = formatTimestamp(condition->getTimestamp()); - j["category"] = "CONDITION"; + w.Key("timestamp"); + auto ts = formatTimestamp(condition->getTimestamp()); + w.String(ts.c_str()); + + w.Key("category"); + w.String("CONDITION"); // Add condition-specific fields + w.Key("level"); switch (condition->getLevel()) { case Condition::NORMAL: - j["level"] = "NORMAL"; + w.String("NORMAL"); break; case Condition::WARNING: - j["level"] = "WARNING"; + w.String("WARNING"); break; case Condition::FAULT: - j["level"] = "FAULT"; + w.String("FAULT"); break; case Condition::UNAVAILABLE: - j["level"] = "UNAVAILABLE"; + w.String("UNAVAILABLE"); break; } // Add native code if present if (condition->hasProperty("nativeCode")) { - j["nativeCode"] = condition->get("nativeCode"); + w.Key("nativeCode"); + auto nc = condition->get("nativeCode"); + w.String(nc.c_str()); } // Add condition ID if present if (!condition->getCode().empty()) { - j["conditionId"] = condition->getCode(); + w.Key("conditionId"); + w.String(condition->getCode().c_str()); } // Add message/value if present if (condition->hasValue()) { - j["message"] = getObservationValue(condition); + w.Key("message"); + auto msg = getObservationValue(condition); + w.String(msg.c_str()); } // Add sequence number - j["sequence"] = condition->getSequence(); + w.Key("sequence"); + w.Int64(condition->getSequence()); - return j.dump(); + w.EndObject(); + return string(buf.GetString(), buf.GetSize()); } std::string MqttEntitySink::getObservationTopic( @@ -469,7 +504,7 @@ namespace mtconnect { LOG(warning) << "MqttEntitySink::publish: Observation queue full (" << MAX_QUEUE_SIZE << "), dropping oldest observation for " << m_queuedObservations.front()->getDataItem()->getId(); - m_queuedObservations.erase(m_queuedObservations.begin()); + m_queuedObservations.pop_front(); } LOG(debug) << "MqttEntitySink::publish: Client not connected, queuing observation for " << dataItem->getId(); diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp index db71cf0ad..f1af0e55a 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -20,7 +20,7 @@ #include "boost/asio/io_context.hpp" #include -#include +#include #include "mtconnect/buffer/checkpoint.hpp" #include "mtconnect/config.hpp" @@ -30,11 +30,6 @@ #include "mtconnect/sink/sink.hpp" #include "mtconnect/utilities.hpp" -using namespace std; -using namespace mtconnect; -using namespace mtconnect::mqtt_client; -using json = nlohmann::json; - namespace mtconnect { namespace sink { namespace mqtt_entity_sink { @@ -81,7 +76,7 @@ namespace mtconnect { /// @brief Gets the MQTT Client /// @return MqttClient - std::shared_ptr getClient() { return m_client; } + std::shared_ptr getClient() { return m_client; } /// @brief Check if MQTT Client is Connected /// @return `true` when the client is connected @@ -129,8 +124,8 @@ namespace mtconnect { ConfigOptions m_options; - std::shared_ptr m_client; - std::vector m_queuedObservations; + std::shared_ptr m_client; + std::deque m_queuedObservations; std::mutex m_queueMutex; }; } // namespace mqtt_entity_sink diff --git a/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp b/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp index 4d8fa5261..c6264d348 100644 --- a/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp +++ b/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp @@ -17,8 +17,6 @@ #include "mqtt_service.hpp" -#include - #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/entity/entity.hpp" #include "mtconnect/entity/factory.hpp" @@ -26,8 +24,11 @@ #include "mtconnect/mqtt/mqtt_client_impl.hpp" #include "mtconnect/printer/json_printer.hpp" +using namespace std; +using namespace mtconnect; +using namespace mtconnect::entity; +using namespace mtconnect::mqtt_client; using ptree = boost::property_tree::ptree; -using json = nlohmann::json; using namespace std; using namespace mtconnect; diff --git a/src/mtconnect/sink/mqtt_sink/mqtt_service.hpp b/src/mtconnect/sink/mqtt_sink/mqtt_service.hpp index 7a6228152..ad0140425 100644 --- a/src/mtconnect/sink/mqtt_sink/mqtt_service.hpp +++ b/src/mtconnect/sink/mqtt_sink/mqtt_service.hpp @@ -20,8 +20,6 @@ #include "boost/asio/io_context.hpp" #include -#include - #include "mtconnect/buffer/checkpoint.hpp" #include "mtconnect/config.hpp" #include "mtconnect/configuration/agent_config.hpp" @@ -34,13 +32,6 @@ #include "mtconnect/sink/sink.hpp" #include "mtconnect/utilities.hpp" -using namespace std; -using namespace mtconnect; -using namespace mtconnect::entity; -using namespace mtconnect::mqtt_client; - -using json = nlohmann::json; - namespace mtconnect { class XmlPrinter; @@ -107,7 +98,7 @@ namespace mtconnect { /// @brief gets a Mqtt Client /// @return MqttClient - std::shared_ptr getClient() { return m_client; } + std::shared_ptr getClient() { return m_client; } /// @brief Mqtt Client is Connected or not /// @return `true` when the client was connected @@ -135,8 +126,8 @@ namespace mtconnect { std::string formatTopic(const std::string &topic, const DevicePtr device, const std::string defaultUuid = "Unknown") { - string uuid; - string formatted {topic}; + std::string uuid; + std::string formatted {topic}; if (!device) uuid = defaultUuid; else @@ -163,7 +154,7 @@ namespace mtconnect { std::string getTopic(const std::string &option, int maxTopicDepth) { - auto topic {get(m_options[option])}; + auto topic {std::get(m_options[option])}; auto depth = std::count(topic.begin(), topic.end(), '/'); if (depth > maxTopicDepth) @@ -190,10 +181,10 @@ namespace mtconnect { ConfigOptions m_options; - std::unique_ptr m_jsonPrinter; + std::unique_ptr m_jsonPrinter; std::unique_ptr m_printer; - std::shared_ptr m_client; + std::shared_ptr m_client; boost::asio::steady_timer m_currentTimer; int m_sampleCount; //! Timer for current requests diff --git a/src/mtconnect/source/adapter/agent_adapter/agent_adapter.hpp b/src/mtconnect/source/adapter/agent_adapter/agent_adapter.hpp index 44e17c790..308dc4b91 100644 --- a/src/mtconnect/source/adapter/agent_adapter/agent_adapter.hpp +++ b/src/mtconnect/source/adapter/agent_adapter/agent_adapter.hpp @@ -30,8 +30,6 @@ namespace boost::asio::ssl { /// @brief @brief the agent adapter namespace namespace mtconnect::source::adapter::agent_adapter { - using namespace mtconnect; - using namespace source::adapter; /// @brief The Agent adapter pipeline class AGENT_LIB_API AgentAdapterPipeline : public AdapterPipeline diff --git a/src/mtconnect/source/adapter/agent_adapter/session.hpp b/src/mtconnect/source/adapter/agent_adapter/session.hpp index abb69db4b..05c424b6a 100644 --- a/src/mtconnect/source/adapter/agent_adapter/session.hpp +++ b/src/mtconnect/source/adapter/agent_adapter/session.hpp @@ -112,13 +112,13 @@ namespace mtconnect::source::adapter::agent_adapter { ///@} protected: - Handler *m_handler = nullptr; ///< Pipeline handler for processing data - std::string m_identity; ///< Unique identity hash for this session - Failure m_failed; ///< Callback invoked on connection failure - UpdateAssets m_updateAssets; ///< Callback to trigger asset updates - bool m_closeConnectionAfterResponse = false; ///< Close connection after each response + Handler *m_handler = nullptr; ///< Pipeline handler for processing data + std::string m_identity; ///< Unique identity hash for this session + Failure m_failed; ///< Callback invoked on connection failure + UpdateAssets m_updateAssets; ///< Callback to trigger asset updates + bool m_closeConnectionAfterResponse = false; ///< Close connection after each response std::chrono::milliseconds m_timeout = std::chrono::milliseconds(30000); ///< I/O timeout - bool m_closeOnRead = false; ///< Close after read (HTTP 1.0 or Connection: close) + bool m_closeOnRead = false; ///< Close after read (HTTP 1.0 or Connection: close) }; } // namespace mtconnect::source::adapter::agent_adapter diff --git a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp index 305bab57a..8f75d6965 100644 --- a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp +++ b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp @@ -47,6 +47,7 @@ namespace mtconnect { using namespace entity; using namespace pipeline; using namespace source::adapter; + using namespace mqtt_client; namespace source::adapter::mqtt_adapter { diff --git a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.hpp b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.hpp index e75211701..157b76327 100644 --- a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.hpp +++ b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.hpp @@ -22,8 +22,6 @@ #include "mtconnect/source/adapter/adapter.hpp" #include "mtconnect/source/adapter/adapter_pipeline.hpp" -using namespace mtconnect::mqtt_client; - /// @brief @brief the Mqtt adapter namespace namespace mtconnect::source::adapter::mqtt_adapter { /// @brief The Mqtt adapter pipeline @@ -100,6 +98,6 @@ namespace mtconnect::source::adapter::mqtt_adapter { MqttPipeline m_pipeline; - std::shared_ptr m_client; + std::shared_ptr m_client; }; } // namespace mtconnect::source::adapter::mqtt_adapter diff --git a/src/mtconnect/source/adapter/shdr/connector.cpp b/src/mtconnect/source/adapter/shdr/connector.cpp index be774fe60..abed232c5 100644 --- a/src/mtconnect/source/adapter/shdr/connector.cpp +++ b/src/mtconnect/source/adapter/shdr/connector.cpp @@ -71,9 +71,6 @@ namespace mtconnect::source::adapter::shdr { m_socket.cancel(); m_socket.close(); } - - if (m_reconnectInterval < 500ms) - m_reconnectInterval = 500ms; } bool Connector::start() { return resolve(); } @@ -171,10 +168,8 @@ namespace mtconnect::source::adapter::shdr { { NAMED_SCOPE("Connector::reconnect"); - static mutex s_reconnectLock; - { - lock_guard guard(s_reconnectLock); + lock_guard guard(m_reconnectLock); if (m_disconnecting || !m_connected) { LOG(warning) << "Already disconnecting. returning"; diff --git a/src/mtconnect/source/adapter/shdr/connector.hpp b/src/mtconnect/source/adapter/shdr/connector.hpp index 66d4ff3c4..9fb7721be 100644 --- a/src/mtconnect/source/adapter/shdr/connector.hpp +++ b/src/mtconnect/source/adapter/shdr/connector.hpp @@ -27,7 +27,7 @@ #include "mtconnect/config.hpp" #include "mtconnect/utilities.hpp" -#define HEARTBEAT_FREQ 60000 +inline constexpr int HEARTBEAT_FREQ = 60000; namespace mtconnect::source::adapter::shdr { /// @brief Connection to an adapter socket @@ -118,8 +118,8 @@ namespace mtconnect::source::adapter::shdr { // Name of the server to connect to std::string m_server; - // Connection - boost::asio::io_context::strand m_strand; + // Connection – reference to the owning Source's strand (not a copy) + boost::asio::io_context::strand &m_strand; boost::asio::ip::tcp::socket m_socket; boost::asio::ip::tcp::endpoint m_endpoint; boost::asio::ip::tcp::resolver::results_type m_results; @@ -154,5 +154,6 @@ namespace mtconnect::source::adapter::shdr { std::chrono::milliseconds m_legacyTimeout; std::chrono::milliseconds m_reconnectInterval; std::chrono::milliseconds m_receiveTimeLimit; + std::mutex m_reconnectLock; }; } // namespace mtconnect::source::adapter::shdr diff --git a/src/mtconnect/source/adapter/shdr/shdr_adapter.cpp b/src/mtconnect/source/adapter/shdr/shdr_adapter.cpp index 0a614b6d3..89c4f372b 100644 --- a/src/mtconnect/source/adapter/shdr/shdr_adapter.cpp +++ b/src/mtconnect/source/adapter/shdr/shdr_adapter.cpp @@ -205,7 +205,7 @@ namespace mtconnect::source::adapter::shdr { options[configuration::Device] = value; else if (command == "shdrversion") options[configuration::ShdrVersion] = stringToInt(value, 1); - else if (command == "messsage") + else if (command == "message") { LOG(info) << '[' << getIdentity() << "] Adapter message: " << value; return; diff --git a/test_package/CMakeLists.txt b/test_package/CMakeLists.txt index e26e3f04d..19f81b328 100644 --- a/test_package/CMakeLists.txt +++ b/test_package/CMakeLists.txt @@ -265,6 +265,7 @@ add_agent_test(json_printer_error TRUE json) add_agent_test(json_printer_probe TRUE json) add_agent_test(json_printer_stream TRUE json) +add_agent_test(json_device_parser TRUE json) add_agent_test(xml_parser TRUE xml) add_agent_test(xml_printer TRUE xml) diff --git a/test_package/agent_test.cpp b/test_package/agent_test.cpp index 25679675a..a23e3fcd8 100644 --- a/test_package/agent_test.cpp +++ b/test_package/agent_test.cpp @@ -2626,6 +2626,8 @@ TEST_F(AgentTest, should_initialize_observaton_to_initial_value_when_available) TEST_F(AgentTest, should_handle_japanese_characters) { + using namespace nlohmann; + addAdapter({{configuration::FilterDuplicates, true}}); auto agent = m_agentTestHelper->getAgent(); auto logic = agent->getDataItemForDevice("LinuxCNC", "lp"); @@ -2649,7 +2651,7 @@ TEST_F(AgentTest, should_handle_japanese_characters) PARSE_JSON_RESPONSE("/current"); json streams = doc.at("/MTConnectStreams/Streams/DeviceStream/0/ComponentStream"_json_pointer); ASSERT_TRUE(streams.is_array()); - auto controller = std::find_if(streams.begin(), streams.end(), [](const json &comp) { + auto controller = std::find_if(streams.begin(), streams.end(), [](const nlohmann::json &comp) { return comp.at("/component"_json_pointer).get() == "Controller"; }); ASSERT_NE(streams.end(), controller); diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 70f34dc06..b689ab21a 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -498,11 +498,11 @@ class AgentTestHelper } template - bool waitFor(const chrono::duration &time, function pred) + bool waitFor(const std::chrono::duration &time, std::function pred) { std::decay_t run = time / 2; - if (run > 500ms) - run = 500ms; + if (run > std::chrono::milliseconds(500)) + run = std::chrono::milliseconds(500); boost::asio::steady_timer timer(m_ioContext); timer.expires_after(time); @@ -524,7 +524,7 @@ class AgentTestHelper } template - bool waitForResponseSent(const chrono::duration &time, const std::string &id) + bool waitForResponseSent(const std::chrono::duration &time, const std::string &id) { uint32_t initial = m_websocketSession->m_responsesSent[id]; return waitFor(time, [this, initial, id]() -> bool { diff --git a/test_package/connector_test.cpp b/test_package/connector_test.cpp index 016b8cbdf..1da1fac85 100644 --- a/test_package/connector_test.cpp +++ b/test_package/connector_test.cpp @@ -112,10 +112,11 @@ class TestConnector : public Connector class ConnectorTest : public testing::Test { protected: + ConnectorTest() : m_strand(m_context) {} + void SetUp() override { - boost::asio::io_context::strand strand(m_context); - m_connector = std::make_unique(strand, "127.0.0.1", m_port); + m_connector = std::make_unique(m_strand, "127.0.0.1", m_port); m_connector->m_disconnected = true; m_connected = false; } @@ -204,6 +205,7 @@ class ConnectorTest : public testing::Test unsigned short m_port {0}; boost::asio::io_context m_context; + boost::asio::io_context::strand m_strand; std::unique_ptr m_server; std::unique_ptr m_acceptor; bool m_connected; diff --git a/test_package/json_device_parser_test.cpp b/test_package/json_device_parser_test.cpp new file mode 100644 index 000000000..0fc41d4a8 --- /dev/null +++ b/test_package/json_device_parser_test.cpp @@ -0,0 +1,654 @@ +// +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology ("AMT") +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Ensure that gtest is the first header otherwise Windows raises an error +#include +// Keep this comment to keep gtest.h above. (clang-format off/on is not working here!) + +#include +#include +#include + +#include "mtconnect/device_model/component.hpp" +#include "mtconnect/device_model/data_item/data_item.hpp" +#include "mtconnect/device_model/device.hpp" +#include "mtconnect/parser/json_parser.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::device_model; +using namespace mtconnect::device_model::data_item; + +// main +int main(int argc, char *argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +class JsonDeviceParserTest : public testing::Test +{ +protected: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(JsonDeviceParserTest, should_parse_v1_mtconnect_devices_document) +{ + auto doc = R"( +{ + "MTConnectDevices": { + "schemaVersion": "2.3", + "Header": {"instanceId": 123, "bufferSize": 9999, "sender": "test"}, + "Devices": [ + {"Device": {"id": "d1", "name": "Machine1", "uuid": "aaa-111", + "Components": [ + {"Axes": {"id": "a1"}} + ] + }}, + {"Device": {"id": "d2", "name": "Machine2", "uuid": "bbb-222"}} + ] + } +} +)"; + + parser::JsonParser parser(1); + auto devices = parser.parseDocument(doc); + ASSERT_EQ(2, devices.size()); + + ASSERT_EQ("2.3", *parser.getSchemaVersion()); + + auto it = devices.begin(); + ASSERT_EQ("d1", (*it)->get("id")); + ASSERT_EQ("Machine1", (*it)->get("name")); + + auto components = (*it)->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + ASSERT_EQ("Axes", string(components->front()->getName())); + + it++; + ASSERT_EQ("d2", (*it)->get("id")); + ASSERT_EQ("Machine2", (*it)->get("name")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v2_mtconnect_devices_document) +{ + auto doc = R"( +{ + "MTConnectDevices": { + "schemaVersion": "2.3", + "Header": {"instanceId": 123, "bufferSize": 9999, "sender": "test"}, + "Devices": { + "Device": [ + {"id": "d1", "name": "Machine1", "uuid": "aaa-111", + "Components": { + "Axes": [{"id": "a1"}] + } + }, + {"id": "d2", "name": "Machine2", "uuid": "bbb-222"} + ] + } + } +} +)"; + + parser::JsonParser parser(2); + auto devices = parser.parseDocument(doc); + ASSERT_EQ(2, devices.size()); + + ASSERT_EQ("2.3", *parser.getSchemaVersion()); + + auto it = devices.begin(); + ASSERT_EQ("d1", (*it)->get("id")); + ASSERT_EQ("Machine1", (*it)->get("name")); + + auto components = (*it)->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + ASSERT_EQ("Axes", string(components->front()->getName())); + + it++; + ASSERT_EQ("d2", (*it)->get("id")); + ASSERT_EQ("Machine2", (*it)->get("name")); +} + +TEST_F(JsonDeviceParserTest, should_detect_json_version_from_document) +{ + // V2 document with jsonVersion field — parser should auto-detect version 2 + auto doc = R"( +{ + "MTConnectDevices": { + "jsonVersion": 2, + "schemaVersion": "2.3", + "Header": {"instanceId": 123}, + "Devices": { + "Device": [ + {"id": "d1", "name": "Machine1", "uuid": "aaa-111", + "Components": { + "Axes": [{"id": "a1"}] + } + } + ] + } + } +} +)"; + + // Default version is 1, but jsonVersion in the document overrides it + parser::JsonParser parser; + auto devices = parser.parseDocument(doc); + ASSERT_EQ(1, devices.size()); + + auto device = devices.front(); + ASSERT_EQ("d1", device->get("id")); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + ASSERT_EQ("Axes", string(components->front()->getName())); +} + +TEST_F(JsonDeviceParserTest, should_default_to_v1_when_json_version_absent) +{ + auto doc = R"( +{ + "MTConnectDevices": { + "schemaVersion": "2.3", + "Header": {"instanceId": 123}, + "Devices": [ + {"Device": {"id": "d1", "name": "Machine1", "uuid": "aaa-111"}} + ] + } +} +)"; + + // No jsonVersion in document, should use default (v1) + parser::JsonParser parser; + auto devices = parser.parseDocument(doc); + ASSERT_EQ(1, devices.size()); + ASSERT_EQ("d1", devices.front()->get("id")); +} + +TEST_F(JsonDeviceParserTest, should_override_v1_default_with_v2_from_document) +{ + // Parser defaults to v1, but the document specifies jsonVersion 2 + auto doc = R"( +{ + "MTConnectDevices": { + "jsonVersion": 2, + "Devices": { + "Device": [ + {"id": "d1", "name": "Machine1", "uuid": "aaa-111", + "Components": { + "Axes": [{"id": "a1"}] + } + } + ] + } + } +} +)"; + + parser::JsonParser parser(1); + auto devices = parser.parseDocument(doc); + ASSERT_EQ(1, devices.size()); + + auto device = devices.front(); + ASSERT_EQ("d1", device->get("id")); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + ASSERT_EQ("Axes", string(components->front()->getName())); +} + +TEST_F(JsonDeviceParserTest, should_override_v2_default_with_v1_from_document) +{ + // Parser defaults to v2, but the document specifies jsonVersion 1 + auto doc = R"( +{ + "MTConnectDevices": { + "jsonVersion": 1, + "Devices": [ + {"Device": {"id": "d1", "name": "Machine1", "uuid": "aaa-111", + "Components": [ + {"Axes": {"id": "a1"}} + ] + }} + ] + } +} +)"; + + parser::JsonParser parser(2); + auto devices = parser.parseDocument(doc); + ASSERT_EQ(1, devices.size()); + + auto device = devices.front(); + ASSERT_EQ("d1", device->get("id")); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + ASSERT_EQ("Axes", string(components->front()->getName())); +} + +TEST_F(JsonDeviceParserTest, should_not_mutate_default_version_after_override) +{ + // First parse a v2 document that overrides the default + auto v2Doc = R"( +{ + "MTConnectDevices": { + "jsonVersion": 2, + "Devices": { + "Device": [{"id": "d1", "name": "M1", "uuid": "aaa"}] + } + } +} +)"; + + parser::JsonParser parser(1); + auto devices = parser.parseDocument(v2Doc); + ASSERT_EQ(1, devices.size()); + + // Now parse a v1 document without jsonVersion — should still use the original default (v1) + auto v1Doc = R"( +{ + "MTConnectDevices": { + "Devices": [ + {"Device": {"id": "d2", "name": "M2", "uuid": "bbb"}} + ] + } +} +)"; + + devices = parser.parseDocument(v1Doc); + ASSERT_EQ(1, devices.size()); + ASSERT_EQ("d2", devices.front()->get("id")); +} + +TEST_F(JsonDeviceParserTest, should_throw_when_document_missing_mtconnect_devices) +{ + auto doc = R"({"SomethingElse": {}})"; + + parser::JsonParser parser(1); + ASSERT_THROW(parser.parseDocument(doc), FatalException); +} + +TEST_F(JsonDeviceParserTest, should_return_empty_list_when_no_devices) +{ + auto doc = R"( +{ + "MTConnectDevices": { + "Header": {"instanceId": 123} + } +} +)"; + + parser::JsonParser parser(1); + auto devices = parser.parseDocument(doc); + ASSERT_TRUE(devices.empty()); +} + +TEST_F(JsonDeviceParserTest, should_parse_v1_device_with_components) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "Components": [ + {"Systems": {"id": "s1", + "Components": [ + {"Electric": {"id": "e1"}}, + {"Hydraulic": {"id": "h1"}} + ] + }} + ] + } +} +)"; + + parser::JsonParser parser(1); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + ASSERT_EQ("Device", string(device->getName())); + ASSERT_EQ("d1", device->get("id")); + ASSERT_EQ("MyDevice", device->get("name")); + ASSERT_EQ("xxx-yyy-zzz", device->get("uuid")); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + + auto systems = dynamic_pointer_cast(components->front()); + ASSERT_TRUE(systems); + ASSERT_EQ("Systems", string(systems->getName())); + ASSERT_EQ("s1", systems->get("id")); + + auto subComponents = systems->getList("Components"); + ASSERT_TRUE(subComponents); + ASSERT_EQ(2, subComponents->size()); + + auto it = subComponents->begin(); + ASSERT_EQ("Electric", string((*it)->getName())); + ASSERT_EQ("e1", (*it)->get("id")); + + it++; + ASSERT_EQ("Hydraulic", string((*it)->getName())); + ASSERT_EQ("h1", (*it)->get("id")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v2_device_with_components) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "Components": { + "Systems": [{"id": "s1", + "Components": { + "Electric": [{"id": "e1"}], + "Hydraulic": [{"id": "h1"}] + } + }] + } + } +} +)"; + + parser::JsonParser parser(2); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + ASSERT_EQ("Device", string(device->getName())); + ASSERT_EQ("d1", device->get("id")); + ASSERT_EQ("MyDevice", device->get("name")); + ASSERT_EQ("xxx-yyy-zzz", device->get("uuid")); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(1, components->size()); + + auto systems = dynamic_pointer_cast(components->front()); + ASSERT_TRUE(systems); + ASSERT_EQ("Systems", string(systems->getName())); + ASSERT_EQ("s1", systems->get("id")); + + auto subComponents = systems->getList("Components"); + ASSERT_TRUE(subComponents); + ASSERT_EQ(2, subComponents->size()); + + auto it = subComponents->begin(); + ASSERT_EQ("Electric", string((*it)->getName())); + ASSERT_EQ("e1", (*it)->get("id")); + + it++; + ASSERT_EQ("Hydraulic", string((*it)->getName())); + ASSERT_EQ("h1", (*it)->get("id")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v1_device_with_data_items) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "DataItems": [ + {"DataItem": {"id": "avail", "type": "AVAILABILITY", "category": "EVENT"}}, + {"DataItem": {"id": "xpos", "type": "POSITION", "category": "SAMPLE", + "subType": "ACTUAL", "units": "MILLIMETER"}} + ] + } +} +)"; + + parser::JsonParser parser(1); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + ASSERT_EQ("d1", device->get("id")); + + auto dataItems = device->getList("DataItems"); + ASSERT_TRUE(dataItems); + ASSERT_EQ(2, dataItems->size()); + + auto it = dataItems->begin(); + auto di = dynamic_pointer_cast(*it); + ASSERT_TRUE(di); + ASSERT_EQ("avail", di->getId()); + ASSERT_EQ("AVAILABILITY", di->getType()); + + it++; + di = dynamic_pointer_cast(*it); + ASSERT_TRUE(di); + ASSERT_EQ("xpos", di->getId()); + ASSERT_EQ("POSITION", di->getType()); + ASSERT_EQ("MILLIMETER", di->get("units")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v2_device_with_data_items) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "DataItems": { + "DataItem": [ + {"id": "avail", "type": "AVAILABILITY", "category": "EVENT"}, + {"id": "xpos", "type": "POSITION", "category": "SAMPLE", + "subType": "ACTUAL", "units": "MILLIMETER"} + ] + } + } +} +)"; + + parser::JsonParser parser(2); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + ASSERT_EQ("d1", device->get("id")); + + auto dataItems = device->getList("DataItems"); + ASSERT_TRUE(dataItems); + ASSERT_EQ(2, dataItems->size()); + + auto it = dataItems->begin(); + auto di = dynamic_pointer_cast(*it); + ASSERT_TRUE(di); + ASSERT_EQ("avail", di->getId()); + ASSERT_EQ("AVAILABILITY", di->getType()); + + it++; + di = dynamic_pointer_cast(*it); + ASSERT_TRUE(di); + ASSERT_EQ("xpos", di->getId()); + ASSERT_EQ("POSITION", di->getType()); + ASSERT_EQ("MILLIMETER", di->get("units")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v1_device_with_description) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "Description": {"manufacturer": "ACME", "model": "X100", + "value": "A test device"} + } +} +)"; + + parser::JsonParser parser(1); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + + auto &description = device->get("Description"); + ASSERT_TRUE(description); + ASSERT_EQ("ACME", description->get("manufacturer")); + ASSERT_EQ("X100", description->get("model")); + ASSERT_EQ("A test device", description->getValue()); +} + +TEST_F(JsonDeviceParserTest, should_fail_when_required_properties_missing) +{ + // Missing uuid and name which are required for Device + auto doc = R"( +{ + "Device": { + "id": "d1" + } +} +)"; + + parser::JsonParser parser(1); + auto device = parser.parseDevice(doc); + ASSERT_FALSE(device); +} + +TEST_F(JsonDeviceParserTest, should_parse_v2_device_with_multiple_component_types) +{ + auto doc = R"( +{ + "Device": { + "id": "d1", "name": "MyDevice", "uuid": "xxx-yyy-zzz", + "Components": { + "Axes": [{"id": "a1", + "Components": { + "Linear": [ + {"id": "x1", "name": "X"}, + {"id": "y1", "name": "Y"} + ], + "Rotary": [ + {"id": "c1", "name": "C"} + ] + } + }], + "Controller": [{"id": "ct1"}] + } + } +} +)"; + + parser::JsonParser parser(2); + auto device = parser.parseDevice(doc); + ASSERT_TRUE(device); + + auto components = device->getList("Components"); + ASSERT_TRUE(components); + ASSERT_EQ(2, components->size()); + + auto it = components->begin(); + auto axes = dynamic_pointer_cast(*it); + ASSERT_TRUE(axes); + ASSERT_EQ("Axes", string(axes->getName())); + + auto axesChildren = axes->getList("Components"); + ASSERT_TRUE(axesChildren); + ASSERT_EQ(3, axesChildren->size()); + + auto cit = axesChildren->begin(); + ASSERT_EQ("Linear", string((*cit)->getName())); + ASSERT_EQ("x1", (*cit)->get("id")); + + cit++; + ASSERT_EQ("Linear", string((*cit)->getName())); + ASSERT_EQ("y1", (*cit)->get("id")); + + cit++; + ASSERT_EQ("Rotary", string((*cit)->getName())); + ASSERT_EQ("c1", (*cit)->get("id")); + + it++; + auto controller = dynamic_pointer_cast(*it); + ASSERT_TRUE(controller); + ASSERT_EQ("Controller", string(controller->getName())); + ASSERT_EQ("ct1", controller->get("id")); +} + +TEST_F(JsonDeviceParserTest, should_parse_v1_and_v2_produce_same_device_structure) +{ + auto v1Doc = R"( +{ + "Device": { + "id": "d1", "name": "Machine", "uuid": "aaa-bbb", + "Components": [ + {"Axes": {"id": "a1", + "Components": [ + {"Linear": {"id": "x1", "name": "X"}}, + {"Linear": {"id": "y1", "name": "Y"}} + ] + }} + ] + } +} +)"; + + auto v2Doc = R"( +{ + "Device": { + "id": "d1", "name": "Machine", "uuid": "aaa-bbb", + "Components": { + "Axes": [{"id": "a1", + "Components": { + "Linear": [ + {"id": "x1", "name": "X"}, + {"id": "y1", "name": "Y"} + ] + } + }] + } + } +} +)"; + + parser::JsonParser parserV1(1); + auto deviceV1 = parserV1.parseDevice(v1Doc); + ASSERT_TRUE(deviceV1); + + parser::JsonParser parserV2(2); + auto deviceV2 = parserV2.parseDevice(v2Doc); + ASSERT_TRUE(deviceV2); + + // Both should produce the same device structure + ASSERT_EQ(deviceV1->get("id"), deviceV2->get("id")); + ASSERT_EQ(deviceV1->get("name"), deviceV2->get("name")); + ASSERT_EQ(deviceV1->get("uuid"), deviceV2->get("uuid")); + + auto compsV1 = deviceV1->getList("Components"); + auto compsV2 = deviceV2->getList("Components"); + ASSERT_TRUE(compsV1); + ASSERT_TRUE(compsV2); + ASSERT_EQ(compsV1->size(), compsV2->size()); + + auto axesV1 = compsV1->front(); + auto axesV2 = compsV2->front(); + ASSERT_EQ(string(axesV1->getName()), string(axesV2->getName())); + + auto childrenV1 = axesV1->getList("Components"); + auto childrenV2 = axesV2->getList("Components"); + ASSERT_TRUE(childrenV1); + ASSERT_TRUE(childrenV2); + ASSERT_EQ(childrenV1->size(), childrenV2->size()); + + auto itV1 = childrenV1->begin(); + auto itV2 = childrenV2->begin(); + for (; itV1 != childrenV1->end(); ++itV1, ++itV2) + { + ASSERT_EQ(string((*itV1)->getName()), string((*itV2)->getName())); + ASSERT_EQ((*itV1)->get("id"), (*itV2)->get("id")); + ASSERT_EQ((*itV1)->get("name"), (*itV2)->get("name")); + } +} diff --git a/test_package/json_parser_test.cpp b/test_package/json_parser_test.cpp index 8bbd86d56..fb8a3f968 100644 --- a/test_package/json_parser_test.cpp +++ b/test_package/json_parser_test.cpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// Copyright Copyright 2009-2026, AMT – The Association For Manufacturing Technology (“AMT”) // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -81,7 +81,7 @@ class JsonParserTest : public testing::Test } }; -TEST_F(JsonParserTest, TestParseSimpleDocument) +TEST_F(JsonParserTest, should_parse_v1_entity_with_properties_and_entity_list) { auto fileProperty = make_shared(Requirements({Requirement("name", true), Requirement("VALUE", true)})); @@ -148,7 +148,7 @@ TEST_F(JsonParserTest, TestParseSimpleDocument) ASSERT_EQ("Flat", get((*it)->getProperty("VALUE"))); } -TEST_F(JsonParserTest, TestRecursiveEntityLists) +TEST_F(JsonParserTest, should_parse_v1_recursive_entity_lists) { auto root = components(); @@ -199,7 +199,7 @@ TEST_F(JsonParserTest, TestRecursiveEntityLists) ASSERT_EQ("h1", get((*sli)->getProperty("id"))); } -TEST_F(JsonParserTest, TestRecursiveEntityListFailure) +TEST_F(JsonParserTest, should_fail_v1_when_required_property_missing) { auto root = components(); @@ -227,7 +227,7 @@ TEST_F(JsonParserTest, TestRecursiveEntityListFailure) errors.front()->what()); } -TEST_F(JsonParserTest, TestRecursiveEntityListMissingComponents) +TEST_F(JsonParserTest, should_error_v1_when_entity_list_is_empty) { auto root = components(); @@ -268,7 +268,233 @@ TEST_F(JsonParserTest, TestRecursiveEntityListMissingComponents) ASSERT_FALSE(sl); } -TEST_F(JsonParserTest, TestRawContent) +TEST_F(JsonParserTest, should_parse_v2_entity_with_properties_and_typed_entity_list) +{ + auto fileProperty = + make_shared(Requirements({Requirement("name", true), Requirement("VALUE", true)})); + + auto fileProperties = make_shared(Requirements( + {Requirement("FileProperty", ValueType::ENTITY, fileProperty, 1, Requirement::Infinite)})); + fileProperties->registerMatchers(); + + auto fileComment = make_shared( + Requirements({Requirement("timestamp", true), Requirement("VALUE", true)})); + + auto fileComments = make_shared(Requirements( + {Requirement("FileComment", ValueType::ENTITY, fileComment, 1, Requirement::Infinite)})); + fileComments->registerMatchers(); + + auto fileArchetype = make_shared(Requirements { + Requirement("assetId", true), Requirement("deviceUuid", true), Requirement("timestamp", true), + Requirement("removed", false), Requirement("name", true), Requirement("mediaType", true), + Requirement("applicationCategory", true), Requirement("applicationType", true), + Requirement("FileComments", ValueType::ENTITY_LIST, fileComments, false), + Requirement("FileProperties", ValueType::ENTITY_LIST, fileProperties, false)}); + + auto root = make_shared( + Requirements {Requirement("FileArchetype", ValueType::ENTITY, fileArchetype)}); + + // V2 format: entity lists are objects with type-name keys containing arrays + auto doc = R"( + { + "FileArchetype" : { + "name":"xxxx", "assetId":"uuid", "deviceUuid":"duid", "timestamp":"2020-12-01T10:00Z", + "mediaType":"json", "applicationCategory":"ASSEMBLY", "applicationType":"DATA", + "FileProperties":{ + "FileProperty":[ + {"name":"one", "value":"Round"}, + {"name":"two", "value":"Flat"}]} + } + } + )"; + + ErrorList errors; + entity::JsonParser parser(2); + + auto entity = parser.parse(root, doc, "1.7", errors); + ASSERT_EQ(0, errors.size()); + + ASSERT_EQ("FileArchetype", entity->getName()); + ASSERT_EQ("xxxx", get(entity->getProperty("name"))); + ASSERT_EQ("uuid", get(entity->getProperty("assetId"))); + ASSERT_EQ("2020-12-01T10:00Z", get(entity->getProperty("timestamp"))); + ASSERT_EQ("json", get(entity->getProperty("mediaType"))); + ASSERT_EQ("ASSEMBLY", get(entity->getProperty("applicationCategory"))); + ASSERT_EQ("DATA", get(entity->getProperty("applicationType"))); + + auto fps = entity->getList("FileProperties"); + ASSERT_TRUE(fps); + ASSERT_EQ(2, fps->size()); + + auto it = fps->begin(); + ASSERT_EQ("FileProperty", (*it)->getName()); + ASSERT_EQ("one", get((*it)->getProperty("name"))); + ASSERT_EQ("Round", get((*it)->getProperty("VALUE"))); + + it++; + ASSERT_EQ("FileProperty", (*it)->getName()); + ASSERT_EQ("two", get((*it)->getProperty("name"))); + ASSERT_EQ("Flat", get((*it)->getProperty("VALUE"))); +} + +TEST_F(JsonParserTest, should_parse_v2_recursive_entity_lists_grouped_by_type) +{ + auto root = components(); + + // V2 format: Components is an object with type-name keys containing arrays + auto doc = R"( +{ + "Device" : { "id":"d1", "name":"foo", "uuid":"xxx", + "Components":{ + "Systems":[{"id":"s1", + "Components":{ + "Electric":[{"id":"e1"}], + "Heating":[{"id":"h1"}]} + }] + } + } +} +)"; + + ErrorList errors; + entity::JsonParser parser(2); + + auto entity = parser.parse(root, doc, "1.7", errors); + ASSERT_EQ(0, errors.size()); + + ASSERT_EQ("Device", entity->getName()); + ASSERT_EQ("d1", get(entity->getProperty("id"))); + ASSERT_EQ("foo", get(entity->getProperty("name"))); + ASSERT_EQ("xxx", get(entity->getProperty("uuid"))); + + auto l = entity->getList("Components"); + ASSERT_TRUE(l); + ASSERT_EQ(1, l->size()); + + auto systems = l->front(); + ASSERT_EQ("Systems", systems->getName()); + ASSERT_EQ("s1", get(systems->getProperty("id"))); + + auto sl = systems->getList("Components"); + ASSERT_TRUE(sl); + ASSERT_EQ(2, sl->size()); + + auto sli = sl->begin(); + + ASSERT_EQ("Electric", (*sli)->getName()); + ASSERT_EQ("e1", get((*sli)->getProperty("id"))); + + sli++; + ASSERT_EQ("Heating", (*sli)->getName()); + ASSERT_EQ("h1", get((*sli)->getProperty("id"))); +} + +TEST_F(JsonParserTest, should_fail_v2_when_required_property_missing) +{ + auto root = components(); + + auto doc = R"( +{ + "Device" : { "id":"d1", "name":"foo", + "Components":{ + "Systems":[{"id":"s1", + "Components":{ + "Electric":[{"id":"e1"}], + "Heating":[{"id":"h1"}]} + }] + } + } +} +)"; + + ErrorList errors; + entity::JsonParser parser(2); + + auto entity = parser.parse(root, doc, "1.7", errors); + ASSERT_EQ(1, errors.size()); + ASSERT_FALSE(entity); + ASSERT_EQ(string("Device(uuid): Property uuid is required and not provided"), + errors.front()->what()); +} + +TEST_F(JsonParserTest, should_error_v2_when_entity_list_is_empty) +{ + auto root = components(); + + auto doc = R"( +{ + "Device" : { "id":"d1", "name":"foo", "uuid":"xxx", + "Components":{ + "Systems":[{"id":"s1", + "Components":{} + }] + } + } +} +)"; + + ErrorList errors; + entity::JsonParser parser(2); + + auto entity = parser.parse(root, doc, "1.7", errors); + ASSERT_EQ(1, errors.size()); + ASSERT_TRUE(entity); + ASSERT_EQ(string("Components(Component): Property Component is required and not provided"), + errors.front()->what()); + ASSERT_EQ("Device", entity->getName()); + ASSERT_EQ("d1", get(entity->getProperty("id"))); + ASSERT_EQ("foo", get(entity->getProperty("name"))); + ASSERT_EQ("xxx", get(entity->getProperty("uuid"))); + + auto l = entity->getList("Components"); + ASSERT_TRUE(l); + ASSERT_EQ(1, l->size()); + + auto systems = l->front(); + ASSERT_EQ("Systems", systems->getName()); + ASSERT_EQ("s1", get(systems->getProperty("id"))); + + auto sl = systems->getList("Components"); + ASSERT_FALSE(sl); +} + +TEST_F(JsonParserTest, should_parse_v2_multiple_entities_of_same_type_in_one_array) +{ + auto root = components(); + + // V2 format with multiple entities of the same type in one array + auto doc = R"( +{ + "Device" : { "id":"d1", "name":"foo", "uuid":"xxx", + "Components":{ + "Systems":[ + {"id":"s1"}, + {"id":"s2"}] + } + } +} +)"; + + ErrorList errors; + entity::JsonParser parser(2); + + auto entity = parser.parse(root, doc, "1.7", errors); + ASSERT_EQ(0, errors.size()); + + auto l = entity->getList("Components"); + ASSERT_TRUE(l); + ASSERT_EQ(2, l->size()); + + auto it = l->begin(); + ASSERT_EQ("Systems", (*it)->getName()); + ASSERT_EQ("s1", get((*it)->getProperty("id"))); + + it++; + ASSERT_EQ("Systems", (*it)->getName()); + ASSERT_EQ("s2", get((*it)->getProperty("id"))); +} + +TEST_F(JsonParserTest, should_parse_raw_content_without_xml_encoding) { auto definition = make_shared(Requirements({Requirement("format", false), Requirement("RAW", true)})); diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp index 4cc274038..e1605d1b2 100644 --- a/test_package/mqtt_entity_sink_test.cpp +++ b/test_package/mqtt_entity_sink_test.cpp @@ -42,6 +42,7 @@ using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_entity_sink; using namespace mtconnect::asset; using namespace mtconnect::configuration; +using namespace mtconnect::mqtt_client; // main int main(int argc, char* argv[]) diff --git a/test_package/mqtt_isolated_test.cpp b/test_package/mqtt_isolated_test.cpp index fb802105f..015a49f20 100644 --- a/test_package/mqtt_isolated_test.cpp +++ b/test_package/mqtt_isolated_test.cpp @@ -38,6 +38,7 @@ using namespace mtconnect::configuration; using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; using namespace mtconnect::sink::rest_sink; +using namespace mtconnect::mqtt_client; using json = nlohmann::json; diff --git a/test_package/mqtt_sink_test.cpp b/test_package/mqtt_sink_test.cpp index d544b579c..011a67ad9 100644 --- a/test_package/mqtt_sink_test.cpp +++ b/test_package/mqtt_sink_test.cpp @@ -40,6 +40,7 @@ using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; using namespace mtconnect::asset; using namespace mtconnect::configuration; +using namespace mtconnect::mqtt_client; // main int main(int argc, char *argv[])