Skip to content

Commit

Permalink
Make ingestion offset delay metric configurable (apache#14074)
Browse files Browse the repository at this point in the history
* Make ingestion offset delay metric configurable

* introduce additional checks for valid offset metric value

---------

Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
Co-authored-by: Kartik Khare <kharekartik@kartiks-macbook-pro.wyvern-sun.ts.net>
  • Loading branch information
3 people authored Sep 27, 2024
1 parent 7668b21 commit bba61ee
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.time.Clock;
Expand All @@ -37,15 +38,14 @@
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A Class to track realtime ingestion delay for table partitions on a given server.
* Highlights:
Expand Down Expand Up @@ -83,22 +83,36 @@
*
* TODO: handle bug situations like the one where a partition is not allocated to a given server due to a bug.
*/

public class IngestionDelayTracker {

private static class IngestionInfo {
final long _ingestionTimeMs;
final long _firstStreamIngestionTimeMs;
final StreamPartitionMsgOffset _currentOffset;
final StreamPartitionMsgOffset _latestOffset;

IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) {
volatile Long _ingestionTimeMs;
volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;
final Supplier<StreamPartitionMsgOffset> _latestOffsetFetcher;

IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
_latestOffsetFetcher = latestOffsetFetcher;
}

void updateCurrentOffset(StreamPartitionMsgOffset currentOffset) {
_currentOffset = currentOffset;
}

void updateLatestOffset(StreamPartitionMsgOffset latestOffset) {
_latestOffset = latestOffset;
}

void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(IngestionDelayTracker.class);
Expand All @@ -112,21 +126,25 @@ private static class IngestionInfo {

// Cache expire time for ignored segment if there is no update from the segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable";
public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval";

// Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently.
public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true;
public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute
public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L;

// Per partition info for all partitions active for the current table.
private final Map<Integer, IngestionInfo> _ingestionInfoMap = new ConcurrentHashMap<>();

// We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not
// go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
// TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay}
// instead.
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

private final Cache<String, Boolean> _segmentsToIgnore =
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build();

// TODO: Make thread pool a server/cluster level config
// ScheduledExecutorService to check partitions that are inactive against ideal state.
private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2);

Expand All @@ -139,6 +157,10 @@ private static class IngestionInfo {

private Clock _clock;

// Configuration parameters
private final boolean _enableOffsetLagMetric;
private final long _offsetLagUpdateIntervalMs;

@VisibleForTesting
public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs,
Expand All @@ -150,6 +172,23 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy
_realTimeTableDataManager = realtimeTableDataManager;
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;

if (realtimeTableDataManager.getInstanceDataManagerConfig() != null
&& realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) {
PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig();
_enableOffsetLagMetric =
pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC);
_offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY,
DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS);

Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL,
String.format("Value of Offset lag update interval config: %s must be greater than %d",
OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL));
} else {
_enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC;
_offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS;
}

// Handle negative timer values
if (scheduledExecutorThreadTickIntervalMs <= 0) {
throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s",
Expand All @@ -171,6 +210,11 @@ public Thread newThread(Runnable r) {

_scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);

if (_enableOffsetLagMetric) {
_scheduledExecutor.scheduleWithFixedDelay(this::updateLatestOffsets,
0, _offsetLagUpdateIntervalMs, TimeUnit.MILLISECONDS);
}
}

public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
Expand Down Expand Up @@ -253,18 +297,18 @@ void setClock(Clock clock) {
* @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from
* {@link RowMetadata})
* @param currentOffset offset of the last consumed message (from {@link RowMetadata})
* @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider})
* @param latestOffsetFetcher a lambda function to fetch the latest offset
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset) {
Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
// or once the table data manager has been shutdown.
return;
}

if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && (currentOffset == null || latestOffset == null)) {
if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && currentOffset == null) {
// Do not publish metrics if stream does not return valid ingestion time or offset.
return;
}
Expand All @@ -285,12 +329,24 @@ public void updateIngestionMetrics(String segmentName, int partitionId, long ing
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
() -> getPartitionEndToEndIngestionDelayMs(partitionId));
}
if (currentOffset != null && latestOffset != null) {
if (_enableOffsetLagMetric) {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
() -> getPartitionIngestionOffsetLag(partitionId));
}
IngestionInfo ingestionInfo =
new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffsetFetcher);

if (latestOffsetFetcher != null) {
StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get();
ingestionInfo.updateLatestOffset(latestOffset);
}

return ingestionInfo;
} else {
v.updateIngestionTimes(ingestionTimeMs, firstStreamIngestionTimeMs);
v.updateCurrentOffset(currentOffset);
return v;
}
return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset);
});

// If we are consuming we do not need to track this partition for removal.
Expand Down Expand Up @@ -351,6 +407,28 @@ public void timeoutInactivePartitions() {
}
}

/**
* Updates the latest offsets for each partition at a configurable frequency to reduce load.
*/
private void updateLatestOffsets() {
if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) {
return;
}
for (Map.Entry<Integer, IngestionInfo> entry : _ingestionInfoMap.entrySet()) {
int partitionId = entry.getKey();
IngestionInfo ingestionInfo = entry.getValue();
Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = ingestionInfo._latestOffsetFetcher;
if (latestOffsetFetcher != null) {
try {
StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get();
ingestionInfo.updateLatestOffset(latestOffset);
} catch (Exception e) {
LOGGER.debug("Failed to fetch latest offset for partition {}", partitionId, e);
}
}
}
}

/**
* This function is invoked when a segment goes from CONSUMING to ONLINE, so we can assert whether the partition of
* the segment is still hosted by this server after some interval of time.
Expand Down Expand Up @@ -400,20 +478,35 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
}

public long getPartitionIngestionOffsetLag(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
if (currentOffset == null || latestOffset == null) {
return 0;
}
// TODO: Support other types of offsets
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
try {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
if (currentOffset == null || latestOffset == null) {
return 0;
}
// TODO: Support other types of offsets
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
return 0;
}
long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();

if (offsetLag < 0) {
LOGGER.debug(
"Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to "
+ "latestOffset not being updated",
partitionId, currentOffset, latestOffset);
return 0;
}

return offsetLag;
} catch (Exception e) {
LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e);
return 0;
}
return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1842,10 +1843,10 @@ private void createPartitionMetadataProvider(String reason) {
private void updateIngestionMetrics(RowMetadata metadata) {
if (metadata != null) {
try {
StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true);
Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = () -> fetchLatestStreamOffset(5000, true);
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId,
metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
latestOffset);
latestOffsetFetcher);
} catch (Exception e) {
_segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, Supplier<Boolea
@Override
protected void doInit() {
_leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType);
// Tracks ingestion delay of all partitions being served for this table

_ingestionDelayTracker =
new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
Expand Down Expand Up @@ -288,7 +288,7 @@ protected void doShutdown() {
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset) {
@Nullable Supplier<StreamPartitionMsgOffset> latestOffset) {
_ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset, latestOffset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.function.Supplier;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
Expand Down Expand Up @@ -308,23 +309,25 @@ public void testRecordIngestionDelayOffset() {

// Test tracking offset lag for a single partition
StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
Supplier<StreamPartitionMsgOffset> latestOffsetFetcher = () -> new LongMsgOffset(200);
ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
latestOffset0);
latestOffsetFetcher);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 100);

// Test tracking offset lag for another partition
StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
latestOffsetFetcher = () -> new LongMsgOffset(150);
ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
latestOffset1);
latestOffsetFetcher);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1), 100);

// Update offset lag for partition0
msgOffset0 = new LongMsgOffset(150);
latestOffset0 = new LongMsgOffset(200);
latestOffsetFetcher = () -> new LongMsgOffset(200);

ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
latestOffset0);
latestOffsetFetcher);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 50);

ingestionDelayTracker.shutdown();
Expand Down

0 comments on commit bba61ee

Please sign in to comment.