Skip to content

Commit

Permalink
introduce additional checks for valid offset metric value
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare committed Sep 26, 2024
1 parent e674f37 commit dba2057
Showing 1 changed file with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.time.Clock;
Expand Down Expand Up @@ -85,14 +86,15 @@
public class IngestionDelayTracker {

private static class IngestionInfo {
volatile long _ingestionTimeMs;
volatile long _firstStreamIngestionTimeMs;
volatile Long _ingestionTimeMs;
volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;
final Supplier<StreamPartitionMsgOffset> _latestOffsetFetcher;

IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset, Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable Supplier<StreamPartitionMsgOffset> latestOffsetFetcher) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
Expand Down Expand Up @@ -124,10 +126,13 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs)

// Cache expire time for ignored segment if there is no update from the segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable";
public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval";

// Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently.
public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true;
public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute
public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L;

// Per partition info for all partitions active for the current table.
private final Map<Integer, IngestionInfo> _ingestionInfoMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -172,9 +177,13 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy
&& realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) {
PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig();
_enableOffsetLagMetric =
pinotConfiguration.getProperty("offset.lag.tracking.enable", DEFAULT_ENABLE_OFFSET_LAG_METRIC);
_offsetLagUpdateIntervalMs =
pinotConfiguration.getProperty("offset.lag.tracking.update.interval", DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS);
pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC);
_offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY,
DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS);

Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL,
String.format("Value of Offset lag update interval config: %s must be greater than %d",
OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL));
} else {
_enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC;
_offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS;
Expand Down Expand Up @@ -483,7 +492,17 @@ public long getPartitionIngestionOffsetLag(int partitionId) {
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
return 0;
}
return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();

if (offsetLag < 0) {
LOGGER.debug(
"Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to "
+ "latestOffset not being updated",
partitionId, currentOffset, latestOffset);
return 0;
}

return offsetLag;
} catch (Exception e) {
LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e);
return 0;
Expand Down

0 comments on commit dba2057

Please sign in to comment.