File output_recorder.hpp¶
File List > backends > cxx > include > zmbt > model > output_recorder.hpp
Go to the documentation of this file
#ifndef ZMBT_ENV_OUTPUT_RECORDER_HPP_
#define ZMBT_ENV_OUTPUT_RECORDER_HPP_
#include <atomic>
#include <bitset>
#include <chrono>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <typeindex>
#include <typeinfo>
#include <boost/json.hpp>
#define BOOST_UNORDERED_DISABLE_REENTRANCY_CHECK
#include <boost/unordered/concurrent_flat_map.hpp>
#include <boost/core/demangle.hpp>
#include "zmbt/application/log.hpp"
#include "zmbt/core.hpp"
#include "zmbt/reflect.hpp"
#include "channel_kind.hpp"
#include "global_flags.hpp"
#include "global_stats.hpp"
#include "error_or_return.hpp"
namespace zmbt {
struct output_recorder_error : public base_error {
using base_error::base_error;
};
class OutputRecorder
{
public:
template <class T>
struct Frame
{
std::size_t ts;
T data {reflect::signal_traits<T>::init()};
};
private:
class Registry;
template <class Interface>
static std::type_index get_args_typeid(Interface const&)
{
using reflection = reflect::invocation<Interface const&>;
using args_t = typename reflection::args_t;
using unqf_args_t = tuple_unqf_t<args_t>;
return {typeid(unqf_args_t)};
}
struct Registry
{
struct FrameBuffs
{
std::shared_ptr<void> args{};
std::shared_ptr<void> ret{};
std::shared_ptr<void> err{};
};
using FramesBuffMap = boost::concurrent_flat_map<std::thread::id, FrameBuffs, std::hash<std::thread::id>>;
using consume_fn_t = std::function<void(Registry&)>;
std::type_index data_typeid;
boost::json::string interface_name;
consume_fn_t extract_fn;
std::shared_ptr<FramesBuffMap> frame_buff_map;
boost::json::array serialized_frames{};
std::atomic<std::size_t> count{0};
std::atomic<std::size_t> lost_count{0};
std::bitset<static_cast<unsigned>(ChannelKind::Undefined)> enable_categories_{};
Registry(
std::type_index typid,
boost::json::string name,
consume_fn_t efn,
std::shared_ptr<FramesBuffMap> fbm
)
: data_typeid(typid)
, interface_name{name}
, extract_fn(efn)
, frame_buff_map{fbm}
{
}
~Registry()
{
}
template <class Interface>
static std::shared_ptr<Registry> Make()
{
using ArgsTuple = reflect::invocation_args_unqf_t<Interface const&>;
using Return = reflect::invocation_ret_unqf_or_nullptr_t<Interface const&>;
std::type_index const typid {typeid(std::tuple<ArgsTuple, Return>)};
consume_fn_t extract_fn = [](Registry& registry) {
std::shared_ptr<Registry> expected {nullptr};
auto frame_buff_map = std::atomic_exchange(®istry.frame_buff_map, std::make_shared<Registry::FramesBuffMap>());
if (!frame_buff_map)
{
ZMBT_LOG(FATAL) << "corrupted output recorder registry on " << registry.interface_name;
return;
}
bool const tid_enabled = registry.enable_categories_[static_cast<unsigned>(ChannelKind::ThreadId)];
using FB = Registry::FrameBuffs;
using ThreadFrameBuffers = std::pair<std::thread::id, FB>;
// Snapshot thread buffers
std::vector<ThreadFrameBuffers> fbs;
fbs.reserve(frame_buff_map->size());
frame_buff_map->visit_all([&](auto& record){
fbs.push_back(record);
});
auto get_next_ts = [](FB& fb) -> std::size_t {
std::size_t ts = std::string::npos;
if (fb.args) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
if (!dq.empty()) ts = std::min(ts, dq.front().ts);
}
if (fb.ret) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
if (!dq.empty()) ts = std::min(ts, dq.front().ts);
}
if (fb.err) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);
if (!dq.empty()) ts = std::min(ts, dq.front().ts);
}
return ts;
};
struct Cursor {
std::size_t ts {std::string::npos}; // next timestamp
ThreadFrameBuffers* buffs{nullptr};
};
// initialize candidates
std::vector<Cursor> cursors;
cursors.reserve(fbs.size());
for (auto& tfbs : fbs) {
cursors.push_back({get_next_ts(tfbs.second), &tfbs});
}
Cursor null{};
for (;;) {
// find candidate with smallest ts
Cursor* cursor = &null;
for (auto& candidate: cursors)
{
if (candidate.ts < cursor->ts) {
cursor = &candidate;
}
}
if (cursor == &null)
break; // all empty
boost::json::object json_frame{
{"ts", cursor->ts},
};
FB& fb = cursor->buffs->second;
if (tid_enabled)
{
json_frame["tid"] = tid2str(cursor->buffs->first);
}
// consume exactly one element of each deque matching this ts
if (fb.args) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
if (!dq.empty() && dq.front().ts == cursor->ts) {
json_frame["args"] = json_from(dq.front().data);
dq.pop_front();
}
}
if (fb.ret) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
if (!dq.empty() && dq.front().ts == cursor->ts) {
json_frame["return"] = json_from(dq.front().data);
dq.pop_front();
}
}
if (fb.err) {
auto& dq = *std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);
if (!dq.empty() && dq.front().ts == cursor->ts) {
json_frame["exception"] = dq.front().data.to_json();
dq.pop_front();
}
}
registry.serialized_frames.push_back(std::move(json_frame));
// refresh this candidate’s ts
cursor->ts = get_next_ts(fb);
}
};
auto frame_buff_map = std::make_shared<FramesBuffMap>();
return std::make_shared<Registry>(typid, type_name<Interface>().c_str(), extract_fn, frame_buff_map);
}
};
template <ChannelKind ck>
bool check_filter()
{
return registry_->enable_categories_[static_cast<unsigned>(ck)];
}
void report_test_error(ErrorInfo const&) const;
public:
OutputRecorder(interface_id const& ifc_id, object_id const& obj_id);
~OutputRecorder();
OutputRecorder(OutputRecorder const&) = default;
OutputRecorder(OutputRecorder &&) = default;
OutputRecorder& operator=(OutputRecorder const&) = default;
OutputRecorder& operator=(OutputRecorder &&) = default;
void flush();
template <class Interface, class InterfacePointer = ifc_pointer_t<Interface>>
void setup_handlers()
{
// std::atomic_store(&(this->registry_), Registry::Make<InterfacePointer>());
if (registry_ != nullptr) return;
std::shared_ptr<Registry> expected {nullptr};
std::atomic_compare_exchange_weak(&(this->registry_), &expected, Registry::Make<InterfacePointer>());
}
template <class ArgsTuple, class Return>
void push(ArgsTuple const& args, ErrorOr<Return> const& return_or_error)
{
if (!ensure_registry())
{
ErrorInfo e;
e.type = type_name<output_recorder_error>();
e.what = "push to unregistered output recorder";
e.context = "OutputRecorder";
report_test_error(e);
return;
}
std::type_index const ti {typeid(std::tuple<ArgsTuple, Return>)};
if (ti != registry_->data_typeid)
{
ErrorInfo e;
e.type = type_name<output_recorder_error>();
e.what = format("invalid type on push - expected `%s`, got `%s`",
boost::core::demangle(registry_->data_typeid.name()),
boost::core::demangle(ti.name())
).c_str();
e.context = format("OutputRecorder[%s]", registry_->interface_name).c_str();
report_test_error(e);
return;
}
if (!flags::TestIsRunning::status())
{
registry_->lost_count++;
// TODO: redesign me
// lost call may come from SUT initialization
// before expectations are st, which may be ok to skip
return;
}
auto const start = std::chrono::steady_clock::now();
registry_->count++;
auto const ts = get_ts();
bool const args_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Args)];
bool const ret_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Return)];
bool const err_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::Exception)];
bool const tid_en = registry_->enable_categories_[static_cast<unsigned>(ChannelKind::ThreadId)];
boost::optional<Registry::FrameBuffs> maybe_fb_ref{};
registry_->frame_buff_map->visit(std::this_thread::get_id(),
[&maybe_fb_ref](auto& record){
maybe_fb_ref = record.second;
});
if (!maybe_fb_ref.has_value())
{
Registry::FrameBuffs new_fb;
if (args_en) new_fb.args = std::make_shared<std::deque<Frame<ArgsTuple>>>();
if (ret_en) new_fb.ret = std::make_shared<std::deque<Frame<Return>>>();
if (err_en) new_fb.err = std::make_shared<std::deque<Frame<ErrorInfo>>>();
maybe_fb_ref = new_fb;
registry_->frame_buff_map->try_emplace_or_visit(std::this_thread::get_id(), new_fb,
[&maybe_fb_ref](auto& record){
maybe_fb_ref = record.second;
});
}
Registry::FrameBuffs& fb = maybe_fb_ref.value();
auto fb_args = std::static_pointer_cast<std::deque<Frame<ArgsTuple>>>(fb.args);
auto fb_ret = std::static_pointer_cast<std::deque<Frame<Return>>>(fb.ret);
auto fb_err = std::static_pointer_cast<std::deque<Frame<ErrorInfo>>>(fb.err);
if (args_en) fb_args->push_back({ts, args});
if (ret_en && return_or_error.is_return()) fb_ret->push_back({ts, return_or_error.as_return()});
if (return_or_error.is_error())
{
if (err_en)
{
fb_err->push_back({ts, return_or_error.as_error()});
}
else
{
report_test_error(return_or_error.as_error());
registry_->enable_categories_.reset();
}
}
flags::RecordingTime::add(std::chrono::steady_clock::now() - start);
}
std::size_t count() const
{
return registry_ ? registry_->count.load(std::memory_order_relaxed) : 0UL;
}
std::size_t lost_count() const
{
return registry_ ? registry_->lost_count.load(std::memory_order_relaxed) : 0UL;
}
bool is_registered() const
{
return registry_ != nullptr;
}
boost::json::string_view interface_name() const
{
constexpr char* nil {"unregistered"};
return registry_ ? boost::json::string_view(registry_->interface_name) : boost::json::string_view(nil);
}
boost::json::array const& data_frames() const;
void clear();
void enable_category(ChannelKind const ck);
private:
interface_id ifc_id_;
object_id obj_id_;
std::shared_ptr<Registry> registry_;
bool ensure_registry();
};
}
#endif