namespace rocksdb {
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
-__thread uint32_t ConcurrentArena::tls_cpuid = 0;
+__thread size_t ConcurrentArena::tls_cpuid = 0;
#endif
ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size)
- : shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) {
- // find a power of two >= num_cpus and >= 8
- auto num_cpus = std::thread::hardware_concurrency();
- index_mask_ = 7;
- while (index_mask_ + 1 < num_cpus) {
- index_mask_ = index_mask_ * 2 + 1;
- }
-
- shards_.reset(new Shard[index_mask_ + 1]);
+ : shard_block_size_(block_size / 8),
+ shards_(),
+ arena_(block_size, huge_page_size) {
Fixup();
}
ConcurrentArena::Shard* ConcurrentArena::Repick() {
- int cpuid = port::PhysicalCoreID();
- if (UNLIKELY(cpuid < 0)) {
- // cpu id unavailable, just pick randomly
- cpuid =
- Random::GetTLSInstance()->Uniform(static_cast<int>(index_mask_) + 1);
- }
+ auto shard_and_index = shards_.AccessElementAndIndex();
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
// even if we are cpu 0, use a non-zero tls_cpuid so we can tell we
// have repicked
- tls_cpuid = cpuid | (static_cast<int>(index_mask_) + 1);
+ tls_cpuid = shard_and_index.second | shards_.Size();
#endif
- return &shards_[cpuid & index_mask_];
+ return shard_and_index.first;
}
} // namespace rocksdb
#include "port/likely.h"
#include "util/allocator.h"
#include "util/arena.h"
+#include "util/core_local.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
size_t ApproximateMemoryUsage() const {
std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
- if (index_mask_ != 0) {
- lock.lock();
- }
+ lock.lock();
return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
}
};
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
- static __thread uint32_t tls_cpuid;
+ static __thread size_t tls_cpuid;
#else
- enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
+ enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
#endif
char padding0[56] ROCKSDB_FIELD_UNUSED;
size_t shard_block_size_;
- // shards_[i & index_mask_] is valid
- size_t index_mask_;
- std::unique_ptr<Shard[]> shards_;
+ CoreLocalArray<Shard> shards_;
Arena arena_;
mutable SpinMutex arena_mutex_;
size_t ShardAllocatedAndUnused() const {
size_t total = 0;
- for (size_t i = 0; i <= index_mask_; ++i) {
- total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
+ for (size_t i = 0; i < shards_.Size(); ++i) {
+ total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
+ std::memory_order_relaxed);
}
return total;
}
template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
- uint32_t cpu;
+ size_t cpu;
// Go directly to the arena if the allocation is too large, or if
// we've never needed to Repick() and the arena mutex is available
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4 || force_arena ||
((cpu = tls_cpuid) == 0 &&
- !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
+ !shards_.AccessAtCore(0)->allocated_and_unused_.load(
+ std::memory_order_relaxed) &&
arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) {
arena_lock.lock();
}
// pick a shard from which to allocate
- Shard* s = &shards_[cpu & index_mask_];
+ Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
if (!s->mutex.try_lock()) {
s = Repick();
s->mutex.lock();
--- /dev/null
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is also licensed under the GPLv2 license found in the
+// COPYING file in the root directory of this source tree.
+
+#pragma once
+
+#include "port/likely.h"
+#include "port/port.h"
+#include "util/random.h"
+
+#include <cstddef>
+#include <thread>
+#include <vector>
+
+namespace rocksdb {
+
+// An array of core-local values. Ideally the value type, T, is cache aligned to
+// prevent false sharing.
+template<typename T>
+class CoreLocalArray {
+ public:
+ CoreLocalArray();
+
+ size_t Size() const;
+ // returns pointer to the element corresponding to the core that the thread
+ // currently runs on.
+ T* Access() const;
+ // same as above, but also returns the core index, which the client can cache
+ // to reduce how often core ID needs to be retrieved. Only do this if some
+ // inaccuracy is tolerable, as the thread may migrate to a different core.
+ std::pair<T*, size_t> AccessElementAndIndex() const;
+ // returns pointer to element for the specified core index. This can be used,
+ // e.g., for aggregation, or if the client caches core index.
+ T* AccessAtCore(size_t core_idx) const;
+
+ private:
+ std::unique_ptr<T[]> data_;
+ size_t size_shift_;
+};
+
+template<typename T>
+CoreLocalArray<T>::CoreLocalArray() {
+ unsigned int num_cpus = std::thread::hardware_concurrency();
+ // find a power of two >= num_cpus and >= 8
+ size_shift_ = 3;
+ while (1u << size_shift_ < num_cpus) {
+ ++size_shift_;
+ }
+ data_.reset(new T[1 << size_shift_]);
+}
+
+template<typename T>
+size_t CoreLocalArray<T>::Size() const {
+ return 1u << size_shift_;
+}
+
+template<typename T>
+T* CoreLocalArray<T>::Access() const {
+ return AccessElementAndIndex().first;
+}
+
+template<typename T>
+std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
+ int cpuid = port::PhysicalCoreID();
+ size_t core_idx;
+ if (UNLIKELY(cpuid < 0)) {
+ // cpu id unavailable, just pick randomly
+ core_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_);
+ } else {
+ core_idx = static_cast<size_t>(cpuid & ((1 << size_shift_) - 1));
+ }
+ return {AccessAtCore(core_idx), core_idx};
+}
+
+template<typename T>
+T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
+ assert(core_idx < 1u << size_shift_);
+ return &data_[core_idx];
+}
+
+} // namespace rocksdb