diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 48ad65d63d6..658b54c1b3b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.time.Clock; @@ -85,14 +86,15 @@ public class IngestionDelayTracker { private static class IngestionInfo { - volatile long _ingestionTimeMs; - volatile long _firstStreamIngestionTimeMs; + volatile Long _ingestionTimeMs; + volatile Long _firstStreamIngestionTimeMs; volatile StreamPartitionMsgOffset _currentOffset; volatile StreamPartitionMsgOffset _latestOffset; final Supplier _latestOffsetFetcher; - IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs, - @Nullable StreamPartitionMsgOffset currentOffset, Supplier latestOffsetFetcher) { + IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs, + @Nullable StreamPartitionMsgOffset currentOffset, + @Nullable Supplier latestOffsetFetcher) { _ingestionTimeMs = ingestionTimeMs; _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; _currentOffset = currentOffset; @@ -124,10 +126,13 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) // Cache expire time for ignored segment if there is no update from the segment. private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10; + public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable"; + public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval"; // Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently. public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true; public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute + public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L; // Per partition info for all partitions active for the current table. private final Map _ingestionInfoMap = new ConcurrentHashMap<>(); @@ -172,9 +177,13 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy && realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) { PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig(); _enableOffsetLagMetric = - pinotConfiguration.getProperty("offset.lag.tracking.enable", DEFAULT_ENABLE_OFFSET_LAG_METRIC); - _offsetLagUpdateIntervalMs = - pinotConfiguration.getProperty("offset.lag.tracking.update.interval", DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS); + pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC); + _offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, + DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS); + + Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL, + String.format("Value of Offset lag update interval config: %s must be greater than %d", + OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL)); } else { _enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC; _offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS; @@ -483,7 +492,17 @@ public long getPartitionIngestionOffsetLag(int partitionId) { if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { return 0; } - return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); + long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); + + if (offsetLag < 0) { + LOGGER.debug( + "Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to " + + "latestOffset not being updated", + partitionId, currentOffset, latestOffset); + return 0; + } + + return offsetLag; } catch (Exception e) { LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e); return 0;