return std::make_shared<StatisticsImpl>(nullptr, false);
}
-StatisticsImpl::StatisticsImpl(
- std::shared_ptr<Statistics> stats,
- bool enable_internal_stats)
- : stats_shared_(stats),
- stats_(stats.get()),
- enable_internal_stats_(enable_internal_stats) {
-}
+StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats,
+ bool enable_internal_stats)
+ : stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {}
StatisticsImpl::~StatisticsImpl() {}
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
- uint64_t thread_local_sum = 0;
- tickers_[tickerType].thread_value->Fold(
- [](void* curr_ptr, void* res) {
- auto* sum_ptr = static_cast<uint64_t*>(res);
- *sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load(
- std::memory_order_relaxed);
- },
- &thread_local_sum);
- return thread_local_sum +
- tickers_[tickerType].merged_sum.load(std::memory_order_relaxed);
-}
-
-std::unique_ptr<HistogramImpl>
-StatisticsImpl::HistogramInfo::getMergedHistogram() const {
- std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
- {
- MutexLock lock(&merge_lock);
- res_hist->Merge(merged_hist);
+ uint64_t res = 0;
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
}
- thread_value->Fold(
- [](void* curr_ptr, void* res) {
- auto tmp_res_hist = static_cast<HistogramImpl*>(res);
- auto curr_hist = static_cast<HistogramImpl*>(curr_ptr);
- tmp_res_hist->Merge(*curr_hist);
- },
- res_hist.get());
- return res_hist;
+ return res;
}
void StatisticsImpl::histogramData(uint32_t histogramType,
HistogramData* const data) const {
MutexLock lock(&aggregate_lock_);
- histogramDataLocked(histogramType, data);
+ getHistogramImplLocked(histogramType)->Data(data);
}
-void StatisticsImpl::histogramDataLocked(uint32_t histogramType,
- HistogramData* const data) const {
+std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
+ uint32_t histogramType) const {
assert(
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
- histograms_[histogramType].getMergedHistogram()->Data(data);
+ std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ res_hist->Merge(
+ per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
+ }
+ return res_hist;
}
std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
MutexLock lock(&aggregate_lock_);
- assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX
- : histogramType < HISTOGRAM_ENUM_MAX);
- return histograms_[histogramType].getMergedHistogram()->ToString();
-}
-
-StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
- uint32_t tickerType) {
- auto info_ptr =
- static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
- if (info_ptr == nullptr) {
- info_ptr =
- new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
- tickers_[tickerType].thread_value->Reset(info_ptr);
- }
- return info_ptr;
-}
-
-StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo(
- uint32_t histogram_type) {
- auto info_ptr = static_cast<ThreadHistogramInfo*>(
- histograms_[histogram_type].thread_value->Get());
- if (info_ptr == nullptr) {
- info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist,
- &histograms_[histogram_type].merge_lock);
- histograms_[histogram_type].thread_value->Reset(info_ptr);
- }
- return info_ptr;
+ return getHistogramImplLocked(histogramType)->ToString();
}
void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
- if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
- tickers_[tickerType].thread_value->Fold(
- [](void* curr_ptr, void* res) {
- static_cast<std::atomic<uint64_t>*>(curr_ptr)->store(
- 0, std::memory_order_relaxed);
- },
- nullptr /* res */);
- tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed);
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ if (core_idx == 0) {
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
+ } else {
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
+ }
}
}
MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
- if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
- tickers_[tickerType].thread_value->Fold(
- [](void* curr_ptr, void* res) {
- auto* sum_ptr = static_cast<uint64_t*>(res);
- *sum_ptr += static_cast<std::atomic<uint64_t>*>(curr_ptr)->exchange(
- 0, std::memory_order_relaxed);
- },
- &sum);
- sum += tickers_[tickerType].merged_sum.exchange(
- 0, std::memory_order_relaxed);
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ sum +=
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
+ 0, std::memory_order_relaxed);
}
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
- if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
- auto info_ptr = getThreadTickerInfo(tickerType);
- info_ptr->value.fetch_add(count, std::memory_order_relaxed);
- }
+ per_core_stats_.Access()->tickers_[tickerType].fetch_add(
+ count, std::memory_order_relaxed);
if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count);
}
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
- if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
- getThreadHistogramInfo(histogramType)->value.Add(value);
- }
+ per_core_stats_.Access()->histograms_[histogramType].Add(value);
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
stats_->measureTime(histogramType, value);
}
setTickerCountLocked(i, 0);
}
for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
- histograms_[i].thread_value->Fold(
- [](void* curr_ptr, void* res) {
- static_cast<HistogramImpl*>(curr_ptr)->Clear();
- },
- nullptr /* res */);
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
+ }
}
return Status::OK();
}
if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
char buffer[kTmpStrBufferSize];
HistogramData hData;
- histogramDataLocked(h.first, &hData);
+ getHistogramImplLocked(h.first)->Data(&hData);
snprintf(
buffer, kTmpStrBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n",
#include "monitoring/histogram.h"
#include "port/likely.h"
#include "port/port.h"
+#include "util/core_local.h"
#include "util/mutexlock.h"
-#include "util/thread_local.h"
+
+#ifdef __clang__
+#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
+#else
+#define ROCKSDB_FIELD_UNUSED
+#endif // __clang__
namespace rocksdb {
virtual bool HistEnabledForType(uint32_t type) const override;
private:
- std::shared_ptr<Statistics> stats_shared_;
- Statistics* stats_;
+ // If non-nullptr, forwards updates to the object pointed to by `stats_`.
+ std::shared_ptr<Statistics> stats_;
+ // TODO(ajkr): clean this up since there are no internal stats anymore
bool enable_internal_stats_;
- // Synchronizes anything that operates on other threads' thread-specific data
+ // Synchronizes anything that operates across other cores' local data,
// such that operations like Reset() can be performed atomically.
mutable port::Mutex aggregate_lock_;
- // Holds data maintained by each thread for implementing tickers.
- struct ThreadTickerInfo {
- std::atomic_uint_fast64_t value;
- // During teardown, value will be summed into *merged_sum.
- std::atomic_uint_fast64_t* merged_sum;
-
- ThreadTickerInfo(uint_fast64_t _value,
- std::atomic_uint_fast64_t* _merged_sum)
- : value(_value), merged_sum(_merged_sum) {}
+ // The ticker/histogram data are stored in this structure, which we will store
+ // per-core. It is cache-aligned, so tickers/histograms belonging to different
+ // cores can never share the same cache line.
+ //
+ // Alignment attributes expand to nothing depending on the platform
+ struct StatisticsData {
+ std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}};
+ HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
+ char
+ padding[(CACHE_LINE_SIZE -
+ (INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) +
+ INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) %
+ CACHE_LINE_SIZE) %
+ CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
};
- // Holds data maintained by each thread for implementing histograms.
- struct ThreadHistogramInfo {
- HistogramImpl value;
- // During teardown, value will be merged into *merged_hist while holding
- // *merge_lock, which also syncs with the merges necessary for reads.
- HistogramImpl* merged_hist;
- port::Mutex* merge_lock;
+ static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned");
- ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock)
- : value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {}
- };
-
- // Holds global data for implementing tickers.
- struct TickerInfo {
- TickerInfo()
- : thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
- // Holds thread-specific pointer to ThreadTickerInfo
- std::unique_ptr<ThreadLocalPtr> thread_value;
- // Sum of thread-specific values for tickers that have been reset due to
- // thread termination or ThreadLocalPtr destruction. Also, this is used by
- // setTickerCount() to conveniently change the global value by setting this
- // while simultaneously zeroing all thread-local values.
- std::atomic_uint_fast64_t merged_sum;
-
- static void mergeThreadValue(void* ptr) {
- auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
- *info_ptr->merged_sum += info_ptr->value;
- delete info_ptr;
- }
- };
-
- // Holds global data for implementing histograms.
- struct HistogramInfo {
- HistogramInfo()
- : merged_hist(),
- merge_lock(),
- thread_value(new ThreadLocalPtr(&mergeThreadValue)) {}
- // Merged thread-specific values for histograms that have been reset due to
- // thread termination or ThreadLocalPtr destruction. Note these must be
- // destroyed after thread_value since its destructor accesses them.
- HistogramImpl merged_hist;
- mutable port::Mutex merge_lock;
- // Holds thread-specific pointer to ThreadHistogramInfo
- std::unique_ptr<ThreadLocalPtr> thread_value;
-
- static void mergeThreadValue(void* ptr) {
- auto info_ptr = static_cast<ThreadHistogramInfo*>(ptr);
- {
- MutexLock lock(info_ptr->merge_lock);
- info_ptr->merged_hist->Merge(info_ptr->value);
- }
- delete info_ptr;
- }
-
- // Returns a histogram that merges all histograms (thread-specific and
- // previously merged ones).
- std::unique_ptr<HistogramImpl> getMergedHistogram() const;
- };
+ CoreLocalArray<StatisticsData> per_core_stats_;
uint64_t getTickerCountLocked(uint32_t ticker_type) const;
- void histogramDataLocked(uint32_t histogram_type,
- HistogramData* const data) const;
+ std::unique_ptr<HistogramImpl> getHistogramImplLocked(
+ uint32_t histogram_type) const;
void setTickerCountLocked(uint32_t ticker_type, uint64_t count);
-
- // Returns the info for this tickerType/thread. It sets a new info with zeroed
- // counter if none exists.
- ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type);
- // Returns the info for this histogramType/thread. It sets a new histogram
- // with zeroed data if none exists.
- ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type);
-
- TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
- HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
};
// Utility functions