CoreLocalArray class
authorAndrew Kryczka <andrewkr@fb.com>
Thu, 11 May 2017 01:16:31 +0000 (18:16 -0700)
committerGaudenz Steinlin <gaudenz@debian.org>
Fri, 30 Nov 2018 15:49:02 +0000 (15:49 +0000)
Summary:
Moved the logic for core-local array out of ConcurrentArena and into a separate class because I want to reuse it for core-local stats.
Closes https://github.com/facebook/rocksdb/pull/2256

Differential Revision: D5011518

Pulled By: ajkr

fbshipit-source-id: a75a7b8f7b7a42fd6273489ada405f14c6be196a
(cherry picked from commit cda5fde2d96624df38afc7f02b6b3e699648c62d)

Gbp-Pq: Name 0001-CoreLocalArray-class.patch

src/rocksdb/util/concurrent_arena.cc
src/rocksdb/util/concurrent_arena.h
src/rocksdb/util/core_local.h [new file with mode: 0644]

index df870114a7d2e827be284a7a2ca6dabd892730d0..a0feb93bf9beb4ea47f0c9d8202a77e9da7ad8d5 100644 (file)
 namespace rocksdb {
 
 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
-__thread uint32_t ConcurrentArena::tls_cpuid = 0;
+__thread size_t ConcurrentArena::tls_cpuid = 0;
 #endif
 
 ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size)
-    : shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) {
-  // find a power of two >= num_cpus and >= 8
-  auto num_cpus = std::thread::hardware_concurrency();
-  index_mask_ = 7;
-  while (index_mask_ + 1 < num_cpus) {
-    index_mask_ = index_mask_ * 2 + 1;
-  }
-
-  shards_.reset(new Shard[index_mask_ + 1]);
+    : shard_block_size_(block_size / 8),
+      shards_(),
+      arena_(block_size, huge_page_size) {
   Fixup();
 }
 
 ConcurrentArena::Shard* ConcurrentArena::Repick() {
-  int cpuid = port::PhysicalCoreID();
-  if (UNLIKELY(cpuid < 0)) {
-    // cpu id unavailable, just pick randomly
-    cpuid =
-        Random::GetTLSInstance()->Uniform(static_cast<int>(index_mask_) + 1);
-  }
+  auto shard_and_index = shards_.AccessElementAndIndex();
 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
   // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we
   // have repicked
-  tls_cpuid = cpuid | (static_cast<int>(index_mask_) + 1);
+  tls_cpuid = shard_and_index.second | shards_.Size();
 #endif
-  return &shards_[cpuid & index_mask_];
+  return shard_and_index.first;
 }
 
 }  // namespace rocksdb
index 3a20bb6dbd05f1f66a7fecbc9c55ca88756e0cc7..a6db1e9eba59a21c048499c30ef2ac03fa6fd24f 100644 (file)
@@ -14,6 +14,7 @@
 #include "port/likely.h"
 #include "util/allocator.h"
 #include "util/arena.h"
+#include "util/core_local.h"
 #include "util/mutexlock.h"
 #include "util/thread_local.h"
 
@@ -63,9 +64,7 @@ class ConcurrentArena : public Allocator {
 
   size_t ApproximateMemoryUsage() const {
     std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
-    if (index_mask_ != 0) {
-      lock.lock();
-    }
+    lock.lock();
     return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
   }
 
@@ -95,18 +94,16 @@ class ConcurrentArena : public Allocator {
   };
 
 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
-  static __thread uint32_t tls_cpuid;
+  static __thread size_t tls_cpuid;
 #else
-  enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
+  enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
 #endif
 
   char padding0[56] ROCKSDB_FIELD_UNUSED;
 
   size_t shard_block_size_;
 
-  // shards_[i & index_mask_] is valid
-  size_t index_mask_;
-  std::unique_ptr<Shard[]> shards_;
+  CoreLocalArray<Shard> shards_;
 
   Arena arena_;
   mutable SpinMutex arena_mutex_;
@@ -120,15 +117,16 @@ class ConcurrentArena : public Allocator {
 
   size_t ShardAllocatedAndUnused() const {
     size_t total = 0;
-    for (size_t i = 0; i <= index_mask_; ++i) {
-      total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
+    for (size_t i = 0; i < shards_.Size(); ++i) {
+      total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
+          std::memory_order_relaxed);
     }
     return total;
   }
 
   template <typename Func>
   char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
-    uint32_t cpu;
+    size_t cpu;
 
     // Go directly to the arena if the allocation is too large, or if
     // we've never needed to Repick() and the arena mutex is available
@@ -137,7 +135,8 @@ class ConcurrentArena : public Allocator {
     std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
     if (bytes > shard_block_size_ / 4 || force_arena ||
         ((cpu = tls_cpuid) == 0 &&
-         !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
+         !shards_.AccessAtCore(0)->allocated_and_unused_.load(
+             std::memory_order_relaxed) &&
          arena_lock.try_lock())) {
       if (!arena_lock.owns_lock()) {
         arena_lock.lock();
@@ -148,7 +147,7 @@ class ConcurrentArena : public Allocator {
     }
 
     // pick a shard from which to allocate
-    Shard* s = &shards_[cpu & index_mask_];
+    Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
     if (!s->mutex.try_lock()) {
       s = Repick();
       s->mutex.lock();
diff --git a/src/rocksdb/util/core_local.h b/src/rocksdb/util/core_local.h
new file mode 100644 (file)
index 0000000..806584d
--- /dev/null
@@ -0,0 +1,84 @@
+//  Copyright (c) 2017-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under the BSD-style license found in the
+//  LICENSE file in the root directory of this source tree. An additional grant
+//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is also licensed under the GPLv2 license found in the
+//  COPYING file in the root directory of this source tree.
+
+#pragma once
+
+#include "port/likely.h"
+#include "port/port.h"
+#include "util/random.h"
+
+#include <cstddef>
+#include <thread>
+#include <vector>
+
+namespace rocksdb {
+
+// An array of core-local values. Ideally the value type, T, is cache aligned to
+// prevent false sharing.
+template<typename T>
+class CoreLocalArray {
+ public:
+  CoreLocalArray();
+
+  size_t Size() const;
+  // returns pointer to the element corresponding to the core that the thread
+  // currently runs on.
+  T* Access() const;
+  // same as above, but also returns the core index, which the client can cache
+  // to reduce how often core ID needs to be retrieved. Only do this if some
+  // inaccuracy is tolerable, as the thread may migrate to a different core.
+  std::pair<T*, size_t> AccessElementAndIndex() const;
+  // returns pointer to element for the specified core index. This can be used,
+  // e.g., for aggregation, or if the client caches core index.
+  T* AccessAtCore(size_t core_idx) const;
+
+ private:
+  std::unique_ptr<T[]> data_;
+  size_t size_shift_;
+};
+
+template<typename T>
+CoreLocalArray<T>::CoreLocalArray() {
+  unsigned int num_cpus = std::thread::hardware_concurrency();
+  // find a power of two >= num_cpus and >= 8
+  size_shift_ = 3;
+  while (1u << size_shift_ < num_cpus) {
+    ++size_shift_;
+  }
+  data_.reset(new T[1 << size_shift_]);
+}
+
+template<typename T>
+size_t CoreLocalArray<T>::Size() const {
+  return 1u << size_shift_;
+}
+
+template<typename T>
+T* CoreLocalArray<T>::Access() const {
+  return AccessElementAndIndex().first;
+}
+
+template<typename T>
+std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
+  int cpuid = port::PhysicalCoreID();
+  size_t core_idx;
+  if (UNLIKELY(cpuid < 0)) {
+    // cpu id unavailable, just pick randomly
+    core_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_);
+  } else {
+    core_idx = static_cast<size_t>(cpuid & ((1 << size_shift_) - 1));
+  }
+  return {AccessAtCore(core_idx), core_idx};
+}
+
+template<typename T>
+T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
+  assert(core_idx < 1u << size_shift_);
+  return &data_[core_idx];
+}
+
+}  // namespace rocksdb