thread_pool and refactoring async

This commit is contained in:
gabime
2018-04-14 03:34:57 +03:00
parent 5e08950ed2
commit 6f4cd8d397
18 changed files with 575 additions and 657 deletions

View File

@@ -10,33 +10,57 @@
// If user requests a non existing logger, nullptr will be returned
// This class is thread safe
#include "../async_logger.h"
#include "../common.h"
#include "../details/null_mutex.h"
#include "../logger.h"
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
namespace spdlog {
namespace details {
class thread_pool;
template<class Mutex>
class registry_t
{
public:
using MutexT = Mutex;
registry_t<Mutex>(const registry_t<Mutex> &) = delete;
registry_t<Mutex> &operator=(const registry_t<Mutex> &) = delete;
void register_logger(std::shared_ptr<logger> logger)
void register_logger(std::shared_ptr<logger> new_logger)
{
std::lock_guard<Mutex> lock(_mutex);
auto logger_name = logger->name();
auto logger_name = new_logger->name();
throw_if_exists(logger_name);
_loggers[logger_name] = logger;
_loggers[logger_name] = new_logger;
}
void register_and_init(std::shared_ptr<logger> new_logger)
{
std::lock_guard<Mutex> lock(_mutex);
auto logger_name = new_logger->name();
throw_if_exists(logger_name);
if (_formatter)
{
new_logger->set_formatter(_formatter);
}
if (_err_handler)
{
new_logger->set_error_handler(_err_handler);
}
new_logger->set_level(_level);
new_logger->flush_on(_flush_level);
// Add to registry
_loggers[logger_name] = new_logger;
}
std::shared_ptr<logger> get(const std::string &logger_name)
@@ -46,116 +70,19 @@ public:
return found == _loggers.end() ? nullptr : found->second;
}
template<class It>
std::shared_ptr<logger> create(const std::string &logger_name, const It &sinks_begin, const It &sinks_end)
void set_thread_pool(std::shared_ptr<thread_pool> tp)
{
std::lock_guard<Mutex> lock(_mutex);
throw_if_exists(logger_name);
std::shared_ptr<logger> new_logger;
if (_async_mode)
{
new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy,
_worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb);
}
else
{
new_logger = std::make_shared<logger>(logger_name, sinks_begin, sinks_end);
}
if (_formatter)
{
new_logger->set_formatter(_formatter);
}
if (_err_handler)
{
new_logger->set_error_handler(_err_handler);
}
new_logger->set_level(_level);
new_logger->flush_on(_flush_level);
// Add to registry
_loggers[logger_name] = new_logger;
return new_logger;
_tp = std::move(tp);
}
template<class It>
std::shared_ptr<async_logger> create_async(const std::string &logger_name, size_t queue_size,
const async_overflow_policy overflow_policy, const std::function<void()> &worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, const std::function<void()> &worker_teardown_cb, const It &sinks_begin,
const It &sinks_end)
std::shared_ptr<thread_pool> get_thread_pool()
{
std::lock_guard<Mutex> lock(_mutex);
throw_if_exists(logger_name);
auto new_logger = std::make_shared<async_logger>(
logger_name, sinks_begin, sinks_end, queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb);
if (_formatter)
{
new_logger->set_formatter(_formatter);
}
if (_err_handler)
{
new_logger->set_error_handler(_err_handler);
}
new_logger->set_level(_level);
new_logger->flush_on(_flush_level);
// Add to registry
_loggers[logger_name] = new_logger;
return new_logger;
return _tp;
}
void apply_all(std::function<void(std::shared_ptr<logger>)> fun)
{
std::lock_guard<Mutex> lock(_mutex);
for (auto &l : _loggers)
{
fun(l.second);
}
}
void drop(const std::string &logger_name)
{
std::lock_guard<Mutex> lock(_mutex);
_loggers.erase(logger_name);
}
void drop_all()
{
std::lock_guard<Mutex> lock(_mutex);
_loggers.clear();
}
std::shared_ptr<logger> create(const std::string &logger_name, sinks_init_list sinks)
{
return create(logger_name, sinks.begin(), sinks.end());
}
std::shared_ptr<logger> create(const std::string &logger_name, sink_ptr sink)
{
return create(logger_name, {sink});
}
std::shared_ptr<async_logger> create_async(const std::string &logger_name, size_t queue_size,
const async_overflow_policy overflow_policy, const std::function<void()> &worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, const std::function<void()> &worker_teardown_cb, sinks_init_list sinks)
{
return create_async(
logger_name, queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb, sinks.begin(), sinks.end());
}
std::shared_ptr<async_logger> create_async(const std::string &logger_name, size_t queue_size,
const async_overflow_policy overflow_policy, const std::function<void()> &worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, const std::function<void()> &worker_teardown_cb, sink_ptr sink)
{
return create_async(logger_name, queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms, worker_teardown_cb, {sink});
}
void formatter(formatter_ptr f)
void set_formatter(formatter_ptr f)
{
std::lock_guard<Mutex> lock(_mutex);
_formatter = f;
@@ -204,22 +131,30 @@ public:
_err_handler = handler;
}
void set_async_mode(size_t q_size, const async_overflow_policy overflow_policy, const std::function<void()> &worker_warmup_cb,
const std::chrono::milliseconds &flush_interval_ms, const std::function<void()> &worker_teardown_cb)
void apply_all(std::function<void(std::shared_ptr<logger>)> fun)
{
std::lock_guard<Mutex> lock(_mutex);
_async_mode = true;
_async_q_size = q_size;
_overflow_policy = overflow_policy;
_worker_warmup_cb = worker_warmup_cb;
_flush_interval_ms = flush_interval_ms;
_worker_teardown_cb = worker_teardown_cb;
for (auto &l : _loggers)
{
fun(l.second);
}
}
void set_sync_mode()
void drop(const std::string &logger_name)
{
std::lock_guard<Mutex> lock(_mutex);
_async_mode = false;
_loggers.erase(logger_name);
}
void drop_all()
{
std::lock_guard<Mutex> lock(_mutex);
_loggers.clear();
}
Mutex &tp_mutex()
{
return _tp_mutex;
}
static registry_t<Mutex> &instance()
@@ -240,24 +175,22 @@ private:
}
Mutex _mutex;
Mutex _tp_mutex;
std::unordered_map<std::string, std::shared_ptr<logger>> _loggers;
formatter_ptr _formatter;
level::level_enum _level = level::info;
level::level_enum _flush_level = level::off;
log_err_handler _err_handler;
bool _async_mode = false;
size_t _async_q_size = 0;
async_overflow_policy _overflow_policy = async_overflow_policy::block_retry;
std::function<void()> _worker_warmup_cb;
std::chrono::milliseconds _flush_interval_ms{std::chrono::milliseconds::zero()};
std::function<void()> _worker_teardown_cb;
std::shared_ptr<thread_pool> _tp;
};
#ifdef SPDLOG_NO_REGISTRY_MUTEX
#include "../details/null_mutex.h"
using registry = registry_t<spdlog::details::null_mutex>;
#else
#include <mutex>
using registry = registry_t<std::mutex>;
#endif
} // namespace details
} // namespace spdlog
} // namespace spdlog