// Protocol Buffers - Google's data interchange format // Copyright 2008 Google Inc. All rights reserved. // https://developers.google.com/protocol-buffers/ // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "google/protobuf/arenaz_sampler.h" #include #include #include #include #include #include #include #include // Must be included last. #include "google/protobuf/port_def.inc" namespace google { namespace protobuf { namespace internal { #if defined(PROTOBUF_ARENAZ_SAMPLE) class ThreadSafeArenaStatsHandlePeer { public: static bool IsSampled(const ThreadSafeArenaStatsHandle& h) { return h.info_ != nullptr; } static ThreadSafeArenaStats* GetInfo(ThreadSafeArenaStatsHandle* h) { return h->info_; } }; std::vector GetBytesAllocated(ThreadSafeArenazSampler* s) { std::vector res; s->Iterate([&](const ThreadSafeArenaStats& info) { for (const auto& block_stats : info.block_histogram) { size_t bytes_allocated = block_stats.bytes_allocated.load(std::memory_order_acquire); if (bytes_allocated != 0) { res.push_back(bytes_allocated); } } }); return res; } ThreadSafeArenaStats* Register(ThreadSafeArenazSampler* s, size_t size, int64_t stride) { auto* info = s->Register(stride); assert(info != nullptr); info->block_histogram[0].bytes_allocated.store(size, std::memory_order_relaxed); return info; } #endif // defined(PROTOBUF_ARENAZ_SAMPLE) namespace { #if defined(PROTOBUF_ARENAZ_SAMPLE) TEST(ThreadSafeArenaStatsTest, PrepareForSampling) { ThreadSafeArenaStats info; constexpr int64_t kTestStride = 107; absl::MutexLock l(&info.init_mu); info.PrepareForSampling(kTestStride); for (const auto& block_stats : info.block_histogram) { EXPECT_EQ(block_stats.num_allocations.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_used.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_allocated.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_wasted.load(std::memory_order_relaxed), 0); } EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 0); EXPECT_EQ(info.weight, kTestStride); for (auto& block_stats : info.block_histogram) { block_stats.num_allocations.store(1, std::memory_order_relaxed); block_stats.bytes_used.store(1, std::memory_order_relaxed); block_stats.bytes_allocated.store(1, std::memory_order_relaxed); block_stats.bytes_wasted.store(1, std::memory_order_relaxed); } info.max_block_size.store(1, std::memory_order_relaxed); info.PrepareForSampling(2 * kTestStride); for (auto& block_stats : info.block_histogram) { EXPECT_EQ(block_stats.num_allocations.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_used.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_allocated.load(std::memory_order_relaxed), 0); EXPECT_EQ(block_stats.bytes_wasted.load(std::memory_order_relaxed), 0); } EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 0); EXPECT_EQ(info.weight, 2 * kTestStride); } TEST(ThreadSafeArenaStatsTest, FindBin) { size_t current_bin = 0; size_t bytes = 1; while (current_bin < ThreadSafeArenaStats::kBlockHistogramBins - 1) { size_t next_bin = ThreadSafeArenaStats::FindBin(bytes); if (next_bin != current_bin) { // Test the bins increase linearly. EXPECT_EQ(next_bin, current_bin + 1); // Test the bins change only at values of the form 2^k + 1. EXPECT_EQ(absl::popcount(bytes - 1), 1); current_bin = next_bin; } ++bytes; } } TEST(ThreadSafeArenaStatsTest, MinMaxBlockSizeForBin) { std::pair current_limits = ThreadSafeArenaStats::MinMaxBlockSizeForBin(0); EXPECT_EQ(current_limits.first, 1); EXPECT_LT(current_limits.first, current_limits.second); for (size_t i = 1; i < ThreadSafeArenaStats::kBlockHistogramBins; ++i) { std::pair next_limits = ThreadSafeArenaStats::MinMaxBlockSizeForBin(i); EXPECT_LT(next_limits.first, next_limits.second); // Test the limits do not have gaps. EXPECT_EQ(next_limits.first, current_limits.second + 1); if (i != ThreadSafeArenaStats::kBlockHistogramBins - 1) { EXPECT_EQ(next_limits.second, 2 * current_limits.second); } current_limits = next_limits; } // Test the limits cover the entire range possible. EXPECT_EQ(current_limits.second, std::numeric_limits::max()); } TEST(ThreadSafeArenaStatsTest, RecordAllocateSlow) { ThreadSafeArenaStats info; constexpr int64_t kTestStride = 458; absl::MutexLock l(&info.init_mu); info.PrepareForSampling(kTestStride); RecordAllocateSlow(&info, /*requested=*/0, /*allocated=*/128, /*wasted=*/0); EXPECT_EQ( info.block_histogram[0].num_allocations.load(std::memory_order_relaxed), 1); EXPECT_EQ(info.block_histogram[0].bytes_used.load(std::memory_order_relaxed), 0); EXPECT_EQ( info.block_histogram[0].bytes_allocated.load(std::memory_order_relaxed), 128); EXPECT_EQ( info.block_histogram[0].bytes_wasted.load(std::memory_order_relaxed), 0); EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 128); RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/256, /*wasted=*/28); EXPECT_EQ(info.block_histogram[0].bytes_used.load(std::memory_order_relaxed), 100); EXPECT_EQ( info.block_histogram[0].bytes_wasted.load(std::memory_order_relaxed), 28); EXPECT_EQ( info.block_histogram[1].num_allocations.load(std::memory_order_relaxed), 1); EXPECT_EQ( info.block_histogram[1].bytes_allocated.load(std::memory_order_relaxed), 256); EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 256); } TEST(ThreadSafeArenaStatsTest, RecordAllocateSlowMaxBlockSizeTest) { ThreadSafeArenaStats info; constexpr int64_t kTestStride = 458; absl::MutexLock l(&info.init_mu); info.PrepareForSampling(kTestStride); RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/128, /*wasted=*/0); EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 128); RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/256, /*wasted=*/28); EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 256); RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/128, /*wasted=*/28); EXPECT_EQ(info.max_block_size.load(std::memory_order_relaxed), 256); } TEST(ThreadSafeArenazSamplerTest, SamplingCorrectness) { SetThreadSafeArenazEnabled(true); for (int p = 0; p <= 15; ++p) { SetThreadSafeArenazSampleParameter(1 << p); SetThreadSafeArenazGlobalNextSample(1 << p); const int kTrials = 1000 << p; std::vector hv; for (int i = 0; i < kTrials; ++i) { ThreadSafeArenaStatsHandle h = Sample(); if (h.MutableStats() != nullptr) hv.push_back(std::move(h)); } // Ideally samples << p should be very close to kTrials. But we keep a // factor of two guard band. EXPECT_GE(hv.size() << p, kTrials / 2); EXPECT_LE(hv.size() << p, 2 * kTrials); } } TEST(ThreadSafeArenazSamplerTest, SmallSampleParameter) { SetThreadSafeArenazEnabled(true); SetThreadSafeArenazSampleParameter(100); constexpr int64_t kTestStride = 0; for (int i = 0; i < 1000; ++i) { SamplingState sampling_state = {kTestStride, kTestStride}; ThreadSafeArenaStats* sample = SampleSlow(sampling_state); EXPECT_GT(sampling_state.next_sample, 0); EXPECT_NE(sample, nullptr); UnsampleSlow(sample); } } TEST(ThreadSafeArenazSamplerTest, LargeSampleParameter) { SetThreadSafeArenazEnabled(true); SetThreadSafeArenazSampleParameter(std::numeric_limits::max()); constexpr int64_t kTestStride = 0; for (int i = 0; i < 1000; ++i) { SamplingState sampling_state = {kTestStride, kTestStride}; ThreadSafeArenaStats* sample = SampleSlow(sampling_state); EXPECT_GT(sampling_state.next_sample, 0); EXPECT_NE(sample, nullptr); UnsampleSlow(sample); } } TEST(ThreadSafeArenazSamplerTest, Sample) { SetThreadSafeArenazEnabled(true); SetThreadSafeArenazSampleParameter(100); SetThreadSafeArenazGlobalNextSample(0); int64_t num_sampled = 0; int64_t total = 0; double sample_rate = 0.0; for (int i = 0; i < 1000000; ++i) { ThreadSafeArenaStatsHandle h = Sample(); ++total; if (ThreadSafeArenaStatsHandlePeer::IsSampled(h)) { ++num_sampled; } sample_rate = static_cast(num_sampled) / total; if (0.005 < sample_rate && sample_rate < 0.015) break; } EXPECT_NEAR(sample_rate, 0.01, 0.005); } TEST(ThreadSafeArenazSamplerTest, Handle) { auto& sampler = GlobalThreadSafeArenazSampler(); constexpr int64_t kTestStride = 17; ThreadSafeArenaStatsHandle h(sampler.Register(kTestStride)); auto* info = ThreadSafeArenaStatsHandlePeer::GetInfo(&h); info->block_histogram[0].bytes_allocated.store(0x12345678, std::memory_order_relaxed); bool found = false; sampler.Iterate([&](const ThreadSafeArenaStats& h) { if (&h == info) { EXPECT_EQ( h.block_histogram[0].bytes_allocated.load(std::memory_order_relaxed), 0x12345678); EXPECT_EQ(h.weight, kTestStride); found = true; } }); EXPECT_TRUE(found); h = ThreadSafeArenaStatsHandle(); found = false; sampler.Iterate([&](const ThreadSafeArenaStats& h) { if (&h == info) { // this will only happen if some other thread has resurrected the info // the old handle was using. if (h.block_histogram[0].bytes_allocated.load( std::memory_order_relaxed) == 0x12345678) { found = true; } } }); EXPECT_FALSE(found); } TEST(ThreadSafeArenazSamplerTest, Registration) { ThreadSafeArenazSampler sampler; constexpr int64_t kTestStride = 100; auto* info1 = Register(&sampler, 1, kTestStride); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1)); auto* info2 = Register(&sampler, 2, kTestStride); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 2)); info1->block_histogram[0].bytes_allocated.store(3, std::memory_order_relaxed); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(3, 2)); sampler.Unregister(info1); sampler.Unregister(info2); } TEST(ThreadSafeArenazSamplerTest, Unregistration) { ThreadSafeArenazSampler sampler; std::vector infos; constexpr int64_t kTestStride = 200; for (size_t i = 0; i < 3; ++i) { infos.push_back(Register(&sampler, i + 1, kTestStride)); } EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 2, 3)); sampler.Unregister(infos[1]); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 3)); infos.push_back(Register(&sampler, 3, kTestStride)); infos.push_back(Register(&sampler, 4, kTestStride)); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 3, 3, 4)); sampler.Unregister(infos[3]); EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 3, 4)); sampler.Unregister(infos[0]); sampler.Unregister(infos[2]); sampler.Unregister(infos[4]); EXPECT_THAT(GetBytesAllocated(&sampler), IsEmpty()); } TEST(ThreadSafeArenazSamplerTest, MultiThreaded) { ThreadSafeArenazSampler sampler; absl::Notification stop; ThreadPool pool(10); for (int i = 0; i < 10; ++i) { const int64_t sampling_stride = 11 + i % 3; pool.Schedule([&sampler, &stop, sampling_stride]() { std::random_device rd; std::mt19937 gen(rd()); std::vector infoz; while (!stop.HasBeenNotified()) { if (infoz.empty()) { infoz.push_back(sampler.Register(sampling_stride)); } switch (std::uniform_int_distribution<>(0, 1)(gen)) { case 0: { infoz.push_back(sampler.Register(sampling_stride)); break; } case 1: { size_t p = std::uniform_int_distribution<>(0, infoz.size() - 1)(gen); ThreadSafeArenaStats* info = infoz[p]; infoz[p] = infoz.back(); infoz.pop_back(); EXPECT_EQ(info->weight, sampling_stride); sampler.Unregister(info); break; } } } }); } // The threads will hammer away. Give it a little bit of time for tsan to // spot errors. absl::SleepFor(absl::Seconds(3)); stop.Notify(); } TEST(ThreadSafeArenazSamplerTest, Callback) { ThreadSafeArenazSampler sampler; constexpr int64_t kTestStride = 203; auto* info1 = Register(&sampler, 1, kTestStride); auto* info2 = Register(&sampler, 2, kTestStride); static const ThreadSafeArenaStats* expected; auto callback = [](const ThreadSafeArenaStats& info) { // We can't use `info` outside of this callback because the object will be // disposed as soon as we return from here. EXPECT_EQ(&info, expected); }; // Set the callback. EXPECT_EQ(sampler.SetDisposeCallback(callback), nullptr); expected = info1; sampler.Unregister(info1); // Unset the callback. EXPECT_EQ(callback, sampler.SetDisposeCallback(nullptr)); expected = nullptr; // no more calls. sampler.Unregister(info2); } TEST(ThreadSafeArenazSamplerTest, InitialBlockReportsZeroUsedAndWasted) { SetThreadSafeArenazEnabled(true); // Setting 1 as the parameter value means one in every two arenas would be // sampled, on average. int32_t oldparam = ThreadSafeArenazSampleParameter(); SetThreadSafeArenazSampleParameter(1); SetThreadSafeArenazGlobalNextSample(0); constexpr int kSize = 571; int count_found_allocation = 0; auto& sampler = GlobalThreadSafeArenazSampler(); for (int i = 0; i < 10; ++i) { char block[kSize]; google::protobuf::Arena arena(/*initial_block=*/block, /*initial_block_size=*/kSize); sampler.Iterate([&](const ThreadSafeArenaStats& h) { const auto& histbin = h.block_histogram[ThreadSafeArenaStats::FindBin(kSize)]; if (histbin.bytes_allocated.load(std::memory_order_relaxed) == kSize) { count_found_allocation++; EXPECT_EQ(histbin.bytes_used, 0); EXPECT_EQ(histbin.bytes_wasted, 0); } }); } EXPECT_GT(count_found_allocation, 0); SetThreadSafeArenazSampleParameter(oldparam); } class ThreadSafeArenazSamplerTestThread : public Thread { protected: void Run() override { google::protobuf::ArenaSafeUniquePtr< protobuf_test_messages::proto2::TestAllTypesProto2> message = google::protobuf::MakeArenaSafeUnique< protobuf_test_messages::proto2::TestAllTypesProto2>(arena_); ABSL_CHECK(message != nullptr); // Signal that a message on the arena has been created. This should create // a SerialArena for this thread. if (barrier_->Block()) { delete barrier_; } } public: ThreadSafeArenazSamplerTestThread(const thread::Options& options, absl::string_view name, google::protobuf::Arena* arena, absl::Barrier* barrier) : Thread(options, name), arena_(arena), barrier_(barrier) {} private: google::protobuf::Arena* arena_; absl::Barrier* barrier_; }; TEST(ThreadSafeArenazSamplerTest, MultiThread) { SetThreadSafeArenazEnabled(true); // Setting 1 as the parameter value means one in every two arenas would be // sampled, on average. int32_t oldparam = ThreadSafeArenazSampleParameter(); SetThreadSafeArenazSampleParameter(1); SetThreadSafeArenazGlobalNextSample(0); auto& sampler = GlobalThreadSafeArenazSampler(); int count = 0; for (int i = 0; i < 10; ++i) { const int kNumThreads = 10; absl::Barrier* barrier = new absl::Barrier(kNumThreads + 1); google::protobuf::Arena arena; thread::Options options; options.set_joinable(true); std::vector> threads; for (int i = 0; i < kNumThreads; i++) { auto t = std::make_unique( options, absl::StrCat("thread", i), &arena, barrier); t->Start(); threads.push_back(std::move(t)); } // Wait till each thread has created a message on the arena. if (barrier->Block()) { delete barrier; } sampler.Iterate([&](const ThreadSafeArenaStats& h) { ++count; }); for (int i = 0; i < kNumThreads; i++) { threads[i]->Join(); } } EXPECT_GT(count, 0); SetThreadSafeArenazSampleParameter(oldparam); } class SampleFirstArenaThread : public Thread { protected: void Run() override { google::protobuf::Arena arena; google::protobuf::ArenaSafeUniquePtr< protobuf_test_messages::proto2::TestAllTypesProto2> message = google::protobuf::MakeArenaSafeUnique< protobuf_test_messages::proto2::TestAllTypesProto2>(&arena); ABSL_CHECK(message != nullptr); arena_created_.Notify(); samples_counted_.WaitForNotification(); } public: explicit SampleFirstArenaThread(const thread::Options& options) : Thread(options, "SampleFirstArenaThread") {} absl::Notification arena_created_; absl::Notification samples_counted_; }; // Test that the first arena created on a thread may and may not be chosen for // sampling. TEST(ThreadSafeArenazSamplerTest, SampleFirstArena) { SetThreadSafeArenazEnabled(true); auto& sampler = GlobalThreadSafeArenazSampler(); enum class SampleResult { kSampled, kUnsampled, kSpoiled, }; auto count_samples = [&]() { int count = 0; sampler.Iterate([&](const ThreadSafeArenaStats& h) { ++count; }); return count; }; auto run_sample_experiment = [&]() { int before = count_samples(); thread::Options options; options.set_joinable(true); SampleFirstArenaThread t(options); t.Start(); t.arena_created_.WaitForNotification(); int during = count_samples(); t.samples_counted_.Notify(); t.Join(); int after = count_samples(); // If we didn't get back where we were, some other thread may have // created an arena and produced an invalid experiment run. if (before != after) return SampleResult::kSpoiled; switch (during - before) { case 1: return SampleResult::kSampled; case 0: return SampleResult::kUnsampled; default: return SampleResult::kSpoiled; } }; constexpr int kTrials = 10000; bool sampled = false; bool unsampled = false; for (int i = 0; i < kTrials; ++i) { switch (run_sample_experiment()) { case SampleResult::kSampled: sampled = true; break; case SampleResult::kUnsampled: unsampled = true; break; default: break; } // This is the success criteria for the entire test. At some point // we sampled the first arena and at some point we did not. if (sampled && unsampled) return; } EXPECT_TRUE(sampled); EXPECT_TRUE(unsampled); } #endif // defined(PROTOBUF_ARENAZ_SAMPLE) } // namespace } // namespace internal } // namespace protobuf } // namespace google