Skip to content

Commit

Permalink
[native]Update Prestissimo to report the actual used task memory
Browse files Browse the repository at this point in the history
Changes to report the cumulative bytes using usedBytes from non-leaf pool instead of
reserved bytes so as to track the actual memory usage better.
  • Loading branch information
xiaoxmeng committed May 29, 2024
1 parent 853e56f commit 0a40a0b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ void PrestoServer::populateMemAndCPUInfo() {
size_t numContexts{0};
queryCtxMgr->visitAllContexts([&](const protocol::QueryId& queryId,
const velox::core::QueryCtx* queryCtx) {
const protocol::Long bytes = queryCtx->pool()->currentBytes();
const protocol::Long bytes = queryCtx->pool()->usedBytes();
poolInfo.queryMemoryReservations.insert({queryId, bytes});
// TODO(spershin): Might want to see what Java exports and export similar
// info (like child memory pools).
Expand Down Expand Up @@ -1199,7 +1199,7 @@ protocol::NodeStatus PrestoServer::fetchNodeStatus() {
(int)std::thread::hardware_concurrency(),
cpuLoadPct,
cpuLoadPct,
pool_ ? pool_->currentBytes() : 0,
pool_ ? pool_->usedBytes() : 0,
nodeMemoryGb * 1024 * 1024 * 1024,
nonHeapUsed};

Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,15 @@ void PrestoTask::updateMemoryInfoLocked(
protocol::TaskStats& prestoTaskStats = info.stats;

const auto veloxTaskMemStats = task->pool()->stats();
prestoTaskStats.userMemoryReservationInBytes = veloxTaskMemStats.currentBytes;
const auto currentBytes = task->pool()->usedBytes();
prestoTaskStats.userMemoryReservationInBytes = currentBytes;
prestoTaskStats.systemMemoryReservationInBytes = 0;
prestoTaskStats.peakUserMemoryInBytes = veloxTaskMemStats.peakBytes;
prestoTaskStats.peakTotalMemoryInBytes = veloxTaskMemStats.peakBytes;

// TODO(venkatra): Populate these memory stats as well.
prestoTaskStats.revocableMemoryReservationInBytes = {};

const int64_t currentBytes = veloxTaskMemStats.currentBytes;
const int64_t averageMemoryForLastPeriod =
(currentBytes + lastMemoryReservation) / 2;
const double sinceLastPeriodMs = currentTimeMs - lastTaskStatsUpdateMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ std::string bodyAsString(http::HttpResponse& response, MemoryPool* pool) {
oss << std::string((const char*)body->data(), body->length());
pool->free(body->writableData(), body->capacity());
}
EXPECT_EQ(pool->currentBytes(), 0);
EXPECT_EQ(pool->usedBytes(), 0);
return oss.str();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ TEST_P(PrestoExchangeSourceTest, basic) {

auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue);

size_t beforePoolSize = pool_->currentBytes();
size_t beforePoolSize = pool_->usedBytes();
size_t beforeQueueSize = queue->totalBytes();
requestNextPage(queue, exchangeSource);
for (int i = 0; i < pages.size(); i++) {
Expand All @@ -498,14 +498,14 @@ TEST_P(PrestoExchangeSourceTest, basic) {
}
waitForEndMarker(queue);

size_t deltaPool = pool_->currentBytes() - beforePoolSize;
size_t deltaQueue = queue->totalBytes() - beforeQueueSize;
EXPECT_EQ(deltaPool, deltaQueue);
size_t deltaPoolBytes = pool_->usedBytes() - beforePoolSize;
size_t deltaQueueBytes = queue->totalBytes() - beforeQueueSize;
EXPECT_EQ(deltaPoolBytes, deltaQueueBytes);

producer->waitForDeleteResults();
exchangeCpuExecutor_->stop();
serverWrapper.stop();
EXPECT_EQ(pool_->currentBytes(), 0);
EXPECT_EQ(pool_->usedBytes(), 0);

const auto stats = exchangeSource->stats();
ASSERT_EQ(stats.size(), 2);
Expand Down Expand Up @@ -631,7 +631,7 @@ TEST_P(PrestoExchangeSourceTest, earlyTerminatingConsumer) {

producer->waitForDeleteResults();
serverWrapper.stop();
EXPECT_EQ(pool_->currentBytes(), 0);
EXPECT_EQ(pool_->usedBytes(), 0);

const auto stats = exchangeSource->stats();
ASSERT_EQ(stats.size(), 2);
Expand All @@ -654,8 +654,8 @@ TEST_P(PrestoExchangeSourceTest, slowProducer) {
auto queue = makeSingleSourceQueue();
auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue);

size_t beforePoolSize = pool_->currentBytes();
size_t beforeQueueSize = queue->totalBytes();
const size_t beforePoolSize = pool_->usedBytes();
const size_t beforeQueueSize = queue->totalBytes();
requestNextPage(queue, exchangeSource);

for (int i = 0; i < pages.size(); i++) {
Expand All @@ -667,13 +667,13 @@ TEST_P(PrestoExchangeSourceTest, slowProducer) {
producer->noMoreData();
waitForEndMarker(queue);

size_t deltaPool = pool_->currentBytes() - beforePoolSize;
size_t deltaQueue = queue->totalBytes() - beforeQueueSize;
EXPECT_EQ(deltaPool, deltaQueue);
const size_t deltaPoolBytes = pool_->usedBytes() - beforePoolSize;
const size_t deltaQueueBytes = queue->totalBytes() - beforeQueueSize;
EXPECT_EQ(deltaPoolBytes, deltaQueueBytes);

producer->waitForDeleteResults();
serverWrapper.stop();
EXPECT_EQ(pool_->currentBytes(), 0);
EXPECT_EQ(pool_->usedBytes(), 0);

const auto stats = exchangeSource->stats();
ASSERT_EQ(stats.size(), 2);
Expand Down Expand Up @@ -828,7 +828,7 @@ DEBUG_ONLY_TEST_P(
// response data other than just failing the query.
ASSERT_GE(exchangeSource->testingFailedAttempts(), 1);
}
ASSERT_EQ(leafPool->currentBytes(), 0);
ASSERT_EQ(leafPool->usedBytes(), 0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ TEST_F(TaskManagerTest, testCumulativeMemory) {
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
const auto memoryUsage = veloxTask->queryCtx()->pool()->currentBytes();
const auto memoryUsage = veloxTask->queryCtx()->pool()->usedBytes();
ASSERT_GT(memoryUsage, 0);

const uint64_t lastTimeMs = velox::getCurrentTimeMs();
Expand Down

0 comments on commit 0a40a0b

Please sign in to comment.