From b5b5043d42b5d25ccf146b8ab422f72571f58301 Mon Sep 17 00:00:00 2001 From: Yubin Date: Sun, 10 Sep 2023 04:05:08 +0800 Subject: [PATCH] Support async_overflow_policy::discard_new (#2876) Reason for the discard_new policy: when there is an overflow, there is usually some unexpected issue (a bug, or some other unexpected stuff). And in case of unexpected issue, the first arrived log messages are usually more important than subsequent ones. For example, some application keep logging error messages in case of functionality failure, which, when using async_overflow_policy::overrun_oldest, will overrun the first arrived messages that may contain real reason for the failure. --- .gitignore | 1 + include/spdlog/async_logger.h | 7 +-- include/spdlog/details/mpmc_blocking_q.h | 54 ++++++++++++++++++++++++ include/spdlog/details/thread_pool-inl.h | 17 +++++++- include/spdlog/details/thread_pool.h | 2 + tests/test_async.cpp | 17 ++++++++ 6 files changed, 94 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 4d2aace3..1dc3436c 100644 --- a/.gitignore +++ b/.gitignore @@ -72,6 +72,7 @@ install_manifest.txt /tests/logs/* spdlogConfig.cmake spdlogConfigVersion.cmake +compile_commands.json # idea .idea/ diff --git a/include/spdlog/async_logger.h b/include/spdlog/async_logger.h index 91a93fcb..e979ead7 100644 --- a/include/spdlog/async_logger.h +++ b/include/spdlog/async_logger.h @@ -21,9 +21,10 @@ namespace spdlog { // Async overflow policy - block by default. enum class async_overflow_policy { - block, // Block until message can be enqueued - overrun_oldest // Discard oldest message in the queue if full when trying to - // add new item. + block, // Block until message can be enqueued + overrun_oldest, // Discard oldest message in the queue if full when trying to + // add new item. + discard_new // Discard new message if the queue is full when trying to add new item. }; namespace details { diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h index 101ea8c0..deffa6eb 100644 --- a/include/spdlog/details/mpmc_blocking_q.h +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -12,6 +12,7 @@ #include +#include #include #include @@ -49,6 +50,28 @@ public: push_cv_.notify_one(); } + void enqueue_if_have_room(T &&item) + { + bool pushed = false; + { + std::unique_lock lock(queue_mutex_); + if (!q_.full()) + { + q_.push_back(std::move(item)); + pushed = true; + } + } + + if (pushed) + { + push_cv_.notify_one(); + } + else + { + ++discard_counter_; + } + } + // dequeue with a timeout. // Return true, if succeeded dequeue item, false otherwise bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) @@ -99,6 +122,26 @@ public: push_cv_.notify_one(); } + void enqueue_if_have_room(T &&item) + { + bool pushed = false; + std::unique_lock lock(queue_mutex_); + if (!q_.full()) + { + q_.push_back(std::move(item)); + pushed = true; + } + + if (pushed) + { + push_cv_.notify_one(); + } + else + { + ++discard_counter_; + } + } + // dequeue with a timeout. // Return true, if succeeded dequeue item, false otherwise bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) @@ -132,6 +175,11 @@ public: return q_.overrun_counter(); } + size_t discard_counter() + { + return discard_counter_.load(std::memory_order_relaxed); + } + size_t size() { std::unique_lock lock(queue_mutex_); @@ -144,11 +192,17 @@ public: q_.reset_overrun_counter(); } + void reset_discard_counter() + { + discard_counter_.store(0, std::memory_order_relaxed); + } + private: std::mutex queue_mutex_; std::condition_variable push_cv_; std::condition_variable pop_cv_; spdlog::details::circular_q q_; + std::atomic discard_counter_{0}; }; } // namespace details } // namespace spdlog diff --git a/include/spdlog/details/thread_pool-inl.h b/include/spdlog/details/thread_pool-inl.h index dbd424ff..d3ae442f 100644 --- a/include/spdlog/details/thread_pool-inl.h +++ b/include/spdlog/details/thread_pool-inl.h @@ -80,6 +80,16 @@ void SPDLOG_INLINE thread_pool::reset_overrun_counter() q_.reset_overrun_counter(); } +size_t SPDLOG_INLINE thread_pool::discard_counter() +{ + return q_.discard_counter(); +} + +void SPDLOG_INLINE thread_pool::reset_discard_counter() +{ + q_.reset_discard_counter(); +} + size_t SPDLOG_INLINE thread_pool::queue_size() { return q_.size(); @@ -91,10 +101,15 @@ void SPDLOG_INLINE thread_pool::post_async_msg_(async_msg &&new_msg, async_overf { q_.enqueue(std::move(new_msg)); } - else + else if (overflow_policy == async_overflow_policy::overrun_oldest) { q_.enqueue_nowait(std::move(new_msg)); } + else + { + assert(overflow_policy == async_overflow_policy::discard_new); + q_.enqueue_if_have_room(std::move(new_msg)); + } } void SPDLOG_INLINE thread_pool::worker_loop_() diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index 52c569b8..3d0b2cb3 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -98,6 +98,8 @@ public: void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); size_t overrun_counter(); void reset_overrun_counter(); + size_t discard_counter(); + void reset_discard_counter(); size_t queue_size(); private: diff --git a/tests/test_async.cpp b/tests/test_async.cpp index 06c5c921..c82040ba 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -43,6 +43,23 @@ TEST_CASE("discard policy ", "[async]") REQUIRE(tp->overrun_counter() > 0); } +TEST_CASE("discard policy discard_new ", "[async]") +{ + auto test_sink = std::make_shared(); + test_sink->set_delay(std::chrono::milliseconds(1)); + size_t queue_size = 4; + size_t messages = 1024; + + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared("as", test_sink, tp, spdlog::async_overflow_policy::discard_new); + for (size_t i = 0; i < messages; i++) + { + logger->info("Hello message"); + } + REQUIRE(test_sink->msg_counter() < messages); + REQUIRE(tp->discard_counter() > 0); +} + TEST_CASE("discard policy using factory ", "[async]") { size_t queue_size = 4;