Skip to content

File output_recorder.hpp

File List > backends > cxx > include > zmbt > model > output_recorder.hpp

Go to the documentation of this file

#ifndef ZMBT_ENV_OUTPUT_RECORDER_HPP_
#define ZMBT_ENV_OUTPUT_RECORDER_HPP_

#include <atomic>
#include <bitset>
#include <chrono>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <typeindex>
#include <typeinfo>

#include <boost/json.hpp>
#define BOOST_UNORDERED_DISABLE_REENTRANCY_CHECK
#include <boost/unordered/concurrent_flat_map.hpp>

#include <boost/core/demangle.hpp>

#include "zmbt/application/log.hpp"
#include "zmbt/core.hpp"
#include "zmbt/reflect.hpp"

#include "channel_kind.hpp"
#include "global_flags.hpp"
#include "global_stats.hpp"
#include "error_or_return.hpp"


namespace zmbt {

struct output_recorder_error : public base_error {
    using base_error::base_error;
};

class OutputRecorder
{
  public:

    template <class T>
    struct Frame
    {
        std::size_t ts;
        T data {reflect::signal_traits<T>::init()};
    };

  private:
    class Registry;


    template <class Interface>
    static std::type_index get_args_typeid(Interface const&)
    {
        using reflection  = reflect::invocation<Interface const&>;
        using args_t      = typename reflection::args_t;
        using unqf_args_t = tuple_unqf_t<args_t>;
        return {typeid(unqf_args_t)};
    }

    struct Registry
    {
        struct FrameBuffs
        {
            std::shared_ptr<void> args{};
            std::shared_ptr<void> ret{};
            std::shared_ptr<void> err{};
        };

        using FramesBuffMap = boost::concurrent_flat_map<std::thread::id, FrameBuffs, std::hash<std::thread::id>>;
        using consume_fn_t = std::function<void(Registry&)>;

        std::type_index data_typeid;
        boost::json::string interface_name;
        consume_fn_t extract_fn;
        std::shared_ptr<FramesBuffMap> frame_buff_map;

        boost::json::array serialized_frames{};
        std::atomic<std::size_t> count{0};
        std::atomic<std::size_t> lost_count{0};

        std::bitset<static_cast<unsigned>(ChannelKind::Undefined)> enable_categories_{};

        Registry(
            std::type_index typid,
            boost::json::string name,
            consume_fn_t efn,
            std::shared_ptr<FramesBuffMap> fbm
        )
            : data_typeid(typid)
            , interface_name{name}
            , extract_fn(efn)
            , frame_buff_map{fbm}
        {
        }

        ~Registry()
        {
        }


        template <class Interface>
        static std::shared_ptr<Registry> Make()
        {
            using ArgsTuple = reflect::invocation_args_unqf_t<Interface const&>;
            using Return = reflect::invocation_ret_unqf_or_nullptr_t<Interface const&>;

            std::type_index const typid {typeid(std::tuple<ArgsTuple, Return>)};

            consume_fn_t extract_fn = [](Registry& registry) {

                std::shared_ptr<Registry> expected {nullptr};

                auto frame_buff_map = std::atomic_exchange(&registry.frame_buff_map, std::make_shared<Registry::FramesBuffMap>());
                if (!frame_buff_map)
                {
                    ZMBT_LOG(FATAL) << "corrupted output recorder registry on " << registry.interface_name;
                    return;
                }

                bool const tid_enabled = registry.enable_categories_[static_cast<unsigned>(ChannelKind::ThreadId)];

                using FB = Registry::FrameBuffs;
                using ThreadFrameBuffers = std::pair<std::thread::id, FB>;

                // Snapshot thread buffers
                std::vector<ThreadFrameBuffers> fbs;
                fbs.reserve(frame_buff_map->size());
                frame_buff_map->visit_all([&](auto& record){
                    fbs.push_back(record);
                });

                auto get_next_ts = [](FB& fb) -> std::size_t {
                    std::size_t ts = std::string::npos;
                    if (fb.args) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
                        if (!dq.empty()) ts = std::min(ts, dq.front().ts);
                    }
                    if (fb.ret) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
                        if (!dq.empty()) ts = std::min(ts, dq.front().ts);
                    }
                    if (fb.err) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);
                        if (!dq.empty()) ts = std::min(ts, dq.front().ts);
                    }
                    return ts;
                };

                struct Cursor {
                    std::size_t ts {std::string::npos}; // next timestamp
                    ThreadFrameBuffers* buffs{nullptr};
                };

                // initialize candidates
                std::vector<Cursor> cursors;
                cursors.reserve(fbs.size());
                for (auto& tfbs : fbs) {
                    cursors.push_back({get_next_ts(tfbs.second), &tfbs});
                }

                Cursor null{};
                for (;;) {
                    // find candidate with smallest ts
                    Cursor* cursor = &null;

                    for (auto& candidate: cursors)
                    {
                        if (candidate.ts < cursor->ts) {
                            cursor = &candidate;
                        }
                    }
                    if (cursor == &null)
                        break; // all empty

                    boost::json::object json_frame{
                        {"ts",  cursor->ts},
                    };

                    FB& fb = cursor->buffs->second;

                    if (tid_enabled)
                    {
                        json_frame["tid"] = tid2str(cursor->buffs->first);
                    }

                    // consume exactly one element of each deque matching this ts
                    if (fb.args) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
                        if (!dq.empty() && dq.front().ts == cursor->ts) {
                            json_frame["args"] = json_from(dq.front().data);
                            dq.pop_front();
                        }
                    }
                    if (fb.ret) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
                        if (!dq.empty() && dq.front().ts == cursor->ts) {
                            json_frame["return"] = json_from(dq.front().data);
                            dq.pop_front();
                        }
                    }
                    if (fb.err) {
                        auto& dq = *std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);
                        if (!dq.empty() && dq.front().ts == cursor->ts) {
                            json_frame["exception"] = dq.front().data.to_json();
                            dq.pop_front();
                        }
                    }

                    registry.serialized_frames.push_back(std::move(json_frame));

                    // refresh this candidate’s ts
                    cursor->ts = get_next_ts(fb);
                }
            };

            auto frame_buff_map = std::make_shared<FramesBuffMap>();

            return std::make_shared<Registry>(typid, type_name<Interface>().c_str(), extract_fn, frame_buff_map);
        }
    };

    template <ChannelKind ck>
    bool check_filter()
    {
        return registry_->enable_categories_[static_cast<unsigned>(ck)];
    }

    void report_test_error(ErrorInfo const&) const;


  public:


    OutputRecorder(interface_id const& ifc_id, object_id const& obj_id);
    ~OutputRecorder();

    OutputRecorder(OutputRecorder const&) = default;
    OutputRecorder(OutputRecorder &&) = default;
    OutputRecorder& operator=(OutputRecorder const&) = default;
    OutputRecorder& operator=(OutputRecorder &&) = default;

    void flush();

    template <class Interface, class InterfacePointer = ifc_pointer_t<Interface>>
    void setup_handlers()
    {
        // std::atomic_store(&(this->registry_), Registry::Make<InterfacePointer>());
        if (registry_ != nullptr) return;
        std::shared_ptr<Registry> expected {nullptr};
        std::atomic_compare_exchange_weak(&(this->registry_), &expected, Registry::Make<InterfacePointer>());
    }


    template <class ArgsTuple, class Return>
    void push(ArgsTuple const& args, ErrorOr<Return> const& return_or_error)
    {
        if (!ensure_registry())
        {
            ErrorInfo e;
            e.type = type_name<output_recorder_error>();
            e.what = "push to unregistered output recorder";
            e.context = "OutputRecorder";
            report_test_error(e);
            return;
        }

        std::type_index const ti {typeid(std::tuple<ArgsTuple, Return>)};

        if (ti != registry_->data_typeid)
        {
            ErrorInfo e;
            e.type = type_name<output_recorder_error>();
            e.what = format("invalid type on push - expected `%s`, got `%s`",
                boost::core::demangle(registry_->data_typeid.name()),
                boost::core::demangle(ti.name())
            ).c_str();
            e.context = format("OutputRecorder[%s]", registry_->interface_name).c_str();
            report_test_error(e);
            return;
        }

        if (!flags::TestIsRunning::status())
        {
            registry_->lost_count++;
            // TODO: redesign me
            // lost call may come from SUT initialization
            // before expectations are st, which may be ok to skip
            return;
        }

        auto const start = std::chrono::steady_clock::now();
        registry_->count++;

        auto const ts = get_ts();

        bool const args_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Args)];
        bool const ret_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Return)];
        bool const err_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Exception)];
        bool const tid_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::ThreadId)];

        boost::optional<Registry::FrameBuffs> maybe_fb_ref{};

        registry_->frame_buff_map->visit(std::this_thread::get_id(),
            [&maybe_fb_ref](auto& record){
            maybe_fb_ref = record.second;
        });

        if (!maybe_fb_ref.has_value())
        {
            Registry::FrameBuffs new_fb;
            if (args_en) new_fb.args = std::make_shared<std::deque<Frame<ArgsTuple>>>();
            if (ret_en)  new_fb.ret  = std::make_shared<std::deque<Frame<Return>>>();
            if (err_en)  new_fb.err  = std::make_shared<std::deque<Frame<ErrorInfo>>>();
            maybe_fb_ref = new_fb;
            registry_->frame_buff_map->try_emplace_or_visit(std::this_thread::get_id(), new_fb,
                [&maybe_fb_ref](auto& record){
                maybe_fb_ref = record.second;
            });
        }

        Registry::FrameBuffs& fb = maybe_fb_ref.value();


        auto fb_args = std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
        auto fb_ret  = std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
        auto fb_err  = std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);

        if (args_en) fb_args->push_back({ts, args});
        if (ret_en && return_or_error.is_return()) fb_ret->push_back({ts, return_or_error.as_return()});

        if (return_or_error.is_error())
        {
            if (err_en)
            {
                fb_err->push_back({ts, return_or_error.as_error()});
            }
            else
            {
                report_test_error(return_or_error.as_error());
                registry_->enable_categories_.reset();
            }
        }

        flags::RecordingTime::add(std::chrono::steady_clock::now() - start);
    }

    std::size_t count() const
    {
        return registry_ ? registry_->count.load(std::memory_order_relaxed) : 0UL;
    }

    std::size_t lost_count() const
    {
        return registry_ ? registry_->lost_count.load(std::memory_order_relaxed) : 0UL;
    }

    bool is_registered() const
    {
        return registry_ != nullptr;
    }

    boost::json::string_view interface_name() const
    {
        constexpr char* nil {"unregistered"};
        return registry_ ? boost::json::string_view(registry_->interface_name) : boost::json::string_view(nil);
    }


    boost::json::array const& data_frames() const;

    void clear();

    void enable_category(ChannelKind const ck);

  private:

    interface_id ifc_id_;
    object_id obj_id_;
    std::shared_ptr<Registry> registry_;
    bool ensure_registry();
};

}

#endif