Core-local statistics
authorAndrew Kryczka <andrewkr@fb.com>
Tue, 23 May 2017 17:29:14 +0000 (10:29 -0700)
committerGaudenz Steinlin <gaudenz@debian.org>
Tue, 19 Feb 2019 07:50:12 +0000 (07:50 +0000)
Summary:
This diff changes `StatisticsImpl` from a thread-local approach to a core-local one. The goal is to perform faster aggregations, particularly for applications that have many threads. There should be no behavior change.
Closes https://github.com/facebook/rocksdb/pull/2258

Differential Revision: D5016258

Pulled By: ajkr

fbshipit-source-id: 7d4d165b4a91d8110f0409d113d1be91f22d31a9
(cherry picked from commit ac39d6bec5b2c23a2c3fd0f0e61d468be4f3e803)

Gbp-Pq: Name 0003-Core-local-statistics.patch

src/rocksdb/HISTORY.md
src/rocksdb/monitoring/statistics.cc
src/rocksdb/monitoring/statistics.h
src/rocksdb/util/core_local.h

index 7b51d37a07916e68c7ec8ff142dd992e2c8d82e7..4cde9e2791494e114204737c68a80c22cafdd4d0 100644 (file)
@@ -1,6 +1,10 @@
 # Rocksdb Change Log
 ## Unreleased
 ### New Features
+* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
+
+## 5.5.0 (05/17/2017)
+### New Features
 * DB::ResetStats() to reset internal stats.
 * Statistics::Reset() to reset user stats.
 * ldb add option --try_load_options, which will open DB with its own option file.
index fb5634f762c18b44ab4761c9a0aed41c7ebf6706..3a69a13063553dc3a27270e6ea5041152bcee1e7 100644 (file)
@@ -21,13 +21,9 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
   return std::make_shared<StatisticsImpl>(nullptr, false);
 }
 
-StatisticsImpl::StatisticsImpl(
-    std::shared_ptr<Statistics> stats,
-    bool enable_internal_stats)
-  : stats_shared_(stats),
-    stats_(stats.get()),
-    enable_internal_stats_(enable_internal_stats) {
-}
+StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats,
+                               bool enable_internal_stats)
+    : stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {}
 
 StatisticsImpl::~StatisticsImpl() {}
 
@@ -41,79 +37,36 @@ uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
     enable_internal_stats_ ?
       tickerType < INTERNAL_TICKER_ENUM_MAX :
       tickerType < TICKER_ENUM_MAX);
-  uint64_t thread_local_sum = 0;
-  tickers_[tickerType].thread_value->Fold(
-      [](void* curr_ptr, void* res) {
-        auto* sum_ptr = static_cast<uint64_t*>(res);
-        *sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load(
-            std::memory_order_relaxed);
-      },
-      &thread_local_sum);
-  return thread_local_sum +
-         tickers_[tickerType].merged_sum.load(std::memory_order_relaxed);
-}
-
-std::unique_ptr<HistogramImpl>
-StatisticsImpl::HistogramInfo::getMergedHistogram() const {
-  std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
-  {
-    MutexLock lock(&merge_lock);
-    res_hist->Merge(merged_hist);
+  uint64_t res = 0;
+  for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+    res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
   }
-  thread_value->Fold(
-      [](void* curr_ptr, void* res) {
-        auto tmp_res_hist = static_cast<HistogramImpl*>(res);
-        auto curr_hist = static_cast<HistogramImpl*>(curr_ptr);
-        tmp_res_hist->Merge(*curr_hist);
-      },
-      res_hist.get());
-  return res_hist;
+  return res;
 }
 
 void StatisticsImpl::histogramData(uint32_t histogramType,
                                    HistogramData* const data) const {
   MutexLock lock(&aggregate_lock_);
-  histogramDataLocked(histogramType, data);
+  getHistogramImplLocked(histogramType)->Data(data);
 }
 
-void StatisticsImpl::histogramDataLocked(uint32_t histogramType,
-                                         HistogramData* const data) const {
+std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
+    uint32_t histogramType) const {
   assert(
     enable_internal_stats_ ?
       histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
       histogramType < HISTOGRAM_ENUM_MAX);
-  histograms_[histogramType].getMergedHistogram()->Data(data);
+  std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
+  for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+    res_hist->Merge(
+        per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
+  }
+  return res_hist;
 }
 
 std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
   MutexLock lock(&aggregate_lock_);
-  assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX
-                                : histogramType < HISTOGRAM_ENUM_MAX);
-  return histograms_[histogramType].getMergedHistogram()->ToString();
-}
-
-StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
-    uint32_t tickerType) {
-  auto info_ptr =
-      static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
-  if (info_ptr == nullptr) {
-    info_ptr =
-        new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
-    tickers_[tickerType].thread_value->Reset(info_ptr);
-  }
-  return info_ptr;
-}
-
-StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo(
-    uint32_t histogram_type) {
-  auto info_ptr = static_cast<ThreadHistogramInfo*>(
-      histograms_[histogram_type].thread_value->Get());
-  if (info_ptr == nullptr) {
-    info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist,
-                                       &histograms_[histogram_type].merge_lock);
-    histograms_[histogram_type].thread_value->Reset(info_ptr);
-  }
-  return info_ptr;
+  return getHistogramImplLocked(histogramType)->ToString();
 }
 
 void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
@@ -129,14 +82,12 @@ void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
 void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
   assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
                                 : tickerType < TICKER_ENUM_MAX);
-  if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
-    tickers_[tickerType].thread_value->Fold(
-        [](void* curr_ptr, void* res) {
-          static_cast<std::atomic<uint64_t>*>(curr_ptr)->store(
-              0, std::memory_order_relaxed);
-        },
-        nullptr /* res */);
-    tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed);
+  for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+    if (core_idx == 0) {
+      per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
+    } else {
+      per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
+    }
   }
 }
 
@@ -146,16 +97,10 @@ uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
     MutexLock lock(&aggregate_lock_);
     assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
                                   : tickerType < TICKER_ENUM_MAX);
-    if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
-      tickers_[tickerType].thread_value->Fold(
-          [](void* curr_ptr, void* res) {
-            auto* sum_ptr = static_cast<uint64_t*>(res);
-            *sum_ptr += static_cast<std::atomic<uint64_t>*>(curr_ptr)->exchange(
-                0, std::memory_order_relaxed);
-          },
-          &sum);
-      sum += tickers_[tickerType].merged_sum.exchange(
-          0, std::memory_order_relaxed);
+    for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+      sum +=
+          per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
+              0, std::memory_order_relaxed);
     }
   }
   if (stats_ && tickerType < TICKER_ENUM_MAX) {
@@ -169,10 +114,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
     enable_internal_stats_ ?
       tickerType < INTERNAL_TICKER_ENUM_MAX :
       tickerType < TICKER_ENUM_MAX);
-  if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
-    auto info_ptr = getThreadTickerInfo(tickerType);
-    info_ptr->value.fetch_add(count, std::memory_order_relaxed);
-  }
+  per_core_stats_.Access()->tickers_[tickerType].fetch_add(
+      count, std::memory_order_relaxed);
   if (stats_ && tickerType < TICKER_ENUM_MAX) {
     stats_->recordTick(tickerType, count);
   }
@@ -183,9 +126,7 @@ void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) {
     enable_internal_stats_ ?
       histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
       histogramType < HISTOGRAM_ENUM_MAX);
-  if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
-    getThreadHistogramInfo(histogramType)->value.Add(value);
-  }
+  per_core_stats_.Access()->histograms_[histogramType].Add(value);
   if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
     stats_->measureTime(histogramType, value);
   }
@@ -197,11 +138,9 @@ Status StatisticsImpl::Reset() {
     setTickerCountLocked(i, 0);
   }
   for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
-    histograms_[i].thread_value->Fold(
-        [](void* curr_ptr, void* res) {
-          static_cast<HistogramImpl*>(curr_ptr)->Clear();
-        },
-        nullptr /* res */);
+    for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+      per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
+    }
   }
   return Status::OK();
 }
@@ -229,7 +168,7 @@ std::string StatisticsImpl::ToString() const {
     if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
       char buffer[kTmpStrBufferSize];
       HistogramData hData;
-      histogramDataLocked(h.first, &hData);
+      getHistogramImplLocked(h.first)->Data(&hData);
       snprintf(
           buffer, kTmpStrBufferSize,
           "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n",
index 32b7036caaa3a21f317db85759db066c80a01205..96b31a3b8130eaa32a169e14f8560e7c793d70a8 100644 (file)
 #include "monitoring/histogram.h"
 #include "port/likely.h"
 #include "port/port.h"
+#include "util/core_local.h"
 #include "util/mutexlock.h"
-#include "util/thread_local.h"
+
+#ifdef __clang__
+#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
+#else
+#define ROCKSDB_FIELD_UNUSED
+#endif  // __clang__
 
 namespace rocksdb {
 
@@ -50,97 +56,38 @@ class StatisticsImpl : public Statistics {
   virtual bool HistEnabledForType(uint32_t type) const override;
 
  private:
-  std::shared_ptr<Statistics> stats_shared_;
-  Statistics* stats_;
+  // If non-nullptr, forwards updates to the object pointed to by `stats_`.
+  std::shared_ptr<Statistics> stats_;
+  // TODO(ajkr): clean this up since there are no internal stats anymore
   bool enable_internal_stats_;
-  // Synchronizes anything that operates on other threads' thread-specific data
+  // Synchronizes anything that operates across other cores' local data,
   // such that operations like Reset() can be performed atomically.
   mutable port::Mutex aggregate_lock_;
 
-  // Holds data maintained by each thread for implementing tickers.
-  struct ThreadTickerInfo {
-    std::atomic_uint_fast64_t value;
-    // During teardown, value will be summed into *merged_sum.
-    std::atomic_uint_fast64_t* merged_sum;
-
-    ThreadTickerInfo(uint_fast64_t _value,
-                     std::atomic_uint_fast64_t* _merged_sum)
-        : value(_value), merged_sum(_merged_sum) {}
+  // The ticker/histogram data are stored in this structure, which we will store
+  // per-core. It is cache-aligned, so tickers/histograms belonging to different
+  // cores can never share the same cache line.
+  //
+  // Alignment attributes expand to nothing depending on the platform
+  struct StatisticsData {
+    std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}};
+    HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
+    char
+        padding[(CACHE_LINE_SIZE -
+                 (INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) +
+                  INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) %
+                     CACHE_LINE_SIZE) %
+                CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
   };
 
-  // Holds data maintained by each thread for implementing histograms.
-  struct ThreadHistogramInfo {
-    HistogramImpl value;
-    // During teardown, value will be merged into *merged_hist while holding
-    // *merge_lock, which also syncs with the merges necessary for reads.
-    HistogramImpl* merged_hist;
-    port::Mutex* merge_lock;
+  static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned");
 
-    ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock)
-        : value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {}
-  };
-
-  // Holds global data for implementing tickers.
-  struct TickerInfo {
-    TickerInfo()
-        : thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
-    // Holds thread-specific pointer to ThreadTickerInfo
-    std::unique_ptr<ThreadLocalPtr> thread_value;
-    // Sum of thread-specific values for tickers that have been reset due to
-    // thread termination or ThreadLocalPtr destruction. Also, this is used by
-    // setTickerCount() to conveniently change the global value by setting this
-    // while simultaneously zeroing all thread-local values.
-    std::atomic_uint_fast64_t merged_sum;
-
-    static void mergeThreadValue(void* ptr) {
-      auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
-      *info_ptr->merged_sum += info_ptr->value;
-      delete info_ptr;
-    }
-  };
-
-  // Holds global data for implementing histograms.
-  struct HistogramInfo {
-    HistogramInfo()
-        : merged_hist(),
-          merge_lock(),
-          thread_value(new ThreadLocalPtr(&mergeThreadValue)) {}
-    // Merged thread-specific values for histograms that have been reset due to
-    // thread termination or ThreadLocalPtr destruction. Note these must be
-    // destroyed after thread_value since its destructor accesses them.
-    HistogramImpl merged_hist;
-    mutable port::Mutex merge_lock;
-    // Holds thread-specific pointer to ThreadHistogramInfo
-    std::unique_ptr<ThreadLocalPtr> thread_value;
-
-    static void mergeThreadValue(void* ptr) {
-      auto info_ptr = static_cast<ThreadHistogramInfo*>(ptr);
-      {
-        MutexLock lock(info_ptr->merge_lock);
-        info_ptr->merged_hist->Merge(info_ptr->value);
-      }
-      delete info_ptr;
-    }
-
-    // Returns a histogram that merges all histograms (thread-specific and
-    // previously merged ones).
-    std::unique_ptr<HistogramImpl> getMergedHistogram() const;
-  };
+  CoreLocalArray<StatisticsData> per_core_stats_;
 
   uint64_t getTickerCountLocked(uint32_t ticker_type) const;
-  void histogramDataLocked(uint32_t histogram_type,
-                           HistogramData* const data) const;
+  std::unique_ptr<HistogramImpl> getHistogramImplLocked(
+      uint32_t histogram_type) const;
   void setTickerCountLocked(uint32_t ticker_type, uint64_t count);
-
-  // Returns the info for this tickerType/thread. It sets a new info with zeroed
-  // counter if none exists.
-  ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type);
-  // Returns the info for this histogramType/thread. It sets a new histogram
-  // with zeroed data if none exists.
-  ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type);
-
-  TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
-  HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
 };
 
 // Utility functions
index 7515c542362e95078cb9efdab0ecb809be05a673..4239df62efce946c12c76cb051fe1ff7942a41de 100644 (file)
@@ -7,19 +7,20 @@
 
 #pragma once
 
-#include "port/likely.h"
-#include "port/port.h"
-#include "util/random.h"
-
 #include <cstddef>
 #include <thread>
+#include <utility>
 #include <vector>
 
+#include "port/likely.h"
+#include "port/port.h"
+#include "util/random.h"
+
 namespace rocksdb {
 
 // An array of core-local values. Ideally the value type, T, is cache aligned to
 // prevent false sharing.
-template<typename T>
+template <typename T>
 class CoreLocalArray {
  public:
   CoreLocalArray();
@@ -41,7 +42,7 @@ class CoreLocalArray {
   int size_shift_;
 };
 
-template<typename T>
+template <typename T>
 CoreLocalArray<T>::CoreLocalArray() {
   int num_cpus = static_cast<int>(std::thread::hardware_concurrency());
   // find a power of two >= num_cpus and >= 8
@@ -52,17 +53,17 @@ CoreLocalArray<T>::CoreLocalArray() {
   data_.reset(new T[static_cast<size_t>(1) << size_shift_]);
 }
 
-template<typename T>
+template <typename T>
 size_t CoreLocalArray<T>::Size() const {
   return static_cast<size_t>(1) << size_shift_;
 }
 
-template<typename T>
+template <typename T>
 T* CoreLocalArray<T>::Access() const {
   return AccessElementAndIndex().first;
 }
 
-template<typename T>
+template <typename T>
 std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
   int cpuid = port::PhysicalCoreID();
   size_t core_idx;
@@ -75,7 +76,7 @@ std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
   return {AccessAtCore(core_idx), core_idx};
 }
 
-template<typename T>
+template <typename T>
 T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
   assert(core_idx < static_cast<size_t>(1) << size_shift_);
   return &data_[core_idx];