From 8d8c891eaca8f60a9d11c1942a59a8542612f13e Mon Sep 17 00:00:00 2001 From: Kartik Khare Date: Tue, 1 Oct 2024 14:11:17 +0530 Subject: [PATCH] Revert "Make ingestion offset delay metric configurable (#14074)" This reverts commit bba61eef14a49e7ed7a5c4e73c640c12b916a5d6. --- .../realtime/IngestionDelayTracker.java | 153 ++++-------------- .../realtime/RealtimeSegmentDataManager.java | 5 +- .../realtime/RealtimeTableDataManager.java | 4 +- .../realtime/IngestionDelayTrackerTest.java | 13 +- 4 files changed, 39 insertions(+), 136 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 658b54c1b3b..fd31d8f72b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.time.Clock; @@ -38,14 +37,15 @@ import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A Class to track realtime ingestion delay for table partitions on a given server. * Highlights: @@ -83,36 +83,22 @@ * * TODO: handle bug situations like the one where a partition is not allocated to a given server due to a bug. */ + public class IngestionDelayTracker { private static class IngestionInfo { - volatile Long _ingestionTimeMs; - volatile Long _firstStreamIngestionTimeMs; - volatile StreamPartitionMsgOffset _currentOffset; - volatile StreamPartitionMsgOffset _latestOffset; - final Supplier _latestOffsetFetcher; - - IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs, - @Nullable StreamPartitionMsgOffset currentOffset, - @Nullable Supplier latestOffsetFetcher) { + final long _ingestionTimeMs; + final long _firstStreamIngestionTimeMs; + final StreamPartitionMsgOffset _currentOffset; + final StreamPartitionMsgOffset _latestOffset; + + IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs, + @Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) { _ingestionTimeMs = ingestionTimeMs; _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; _currentOffset = currentOffset; - _latestOffsetFetcher = latestOffsetFetcher; - } - - void updateCurrentOffset(StreamPartitionMsgOffset currentOffset) { - _currentOffset = currentOffset; - } - - void updateLatestOffset(StreamPartitionMsgOffset latestOffset) { _latestOffset = latestOffset; } - - void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) { - _ingestionTimeMs = ingestionTimeMs; - _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; - } } private static final Logger LOGGER = LoggerFactory.getLogger(IngestionDelayTracker.class); @@ -126,13 +112,6 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) // Cache expire time for ignored segment if there is no update from the segment. private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10; - public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable"; - public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval"; - - // Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently. - public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true; - public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute - public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L; // Per partition info for all partitions active for the current table. private final Map _ingestionInfoMap = new ConcurrentHashMap<>(); @@ -140,11 +119,14 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) // We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons. + // TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay} + // instead. private final Map _partitionsMarkedForVerification = new ConcurrentHashMap<>(); private final Cache _segmentsToIgnore = CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build(); + // TODO: Make thread pool a server/cluster level config // ScheduledExecutorService to check partitions that are inactive against ideal state. private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2); @@ -157,10 +139,6 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) private Clock _clock; - // Configuration parameters - private final boolean _enableOffsetLagMetric; - private final long _offsetLagUpdateIntervalMs; - @VisibleForTesting public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs, @@ -172,23 +150,6 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy _realTimeTableDataManager = realtimeTableDataManager; _clock = Clock.systemUTC(); _isServerReadyToServeQueries = isServerReadyToServeQueries; - - if (realtimeTableDataManager.getInstanceDataManagerConfig() != null - && realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) { - PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig(); - _enableOffsetLagMetric = - pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC); - _offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, - DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS); - - Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL, - String.format("Value of Offset lag update interval config: %s must be greater than %d", - OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL)); - } else { - _enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC; - _offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS; - } - // Handle negative timer values if (scheduledExecutorThreadTickIntervalMs <= 0) { throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s", @@ -210,11 +171,6 @@ public Thread newThread(Runnable r) { _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions, INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS); - - if (_enableOffsetLagMetric) { - _scheduledExecutor.scheduleWithFixedDelay(this::updateLatestOffsets, - 0, _offsetLagUpdateIntervalMs, TimeUnit.MILLISECONDS); - } } public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, @@ -297,18 +253,18 @@ void setClock(Clock clock) { * @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from * {@link RowMetadata}) * @param currentOffset offset of the last consumed message (from {@link RowMetadata}) - * @param latestOffsetFetcher a lambda function to fetch the latest offset + * @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider}) */ public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs, long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, - Supplier latestOffsetFetcher) { + @Nullable StreamPartitionMsgOffset latestOffset) { if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) { // Do not update the ingestion delay metrics during server startup period // or once the table data manager has been shutdown. return; } - if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && currentOffset == null) { + if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && (currentOffset == null || latestOffset == null)) { // Do not publish metrics if stream does not return valid ingestion time or offset. return; } @@ -329,24 +285,12 @@ public void updateIngestionMetrics(String segmentName, int partitionId, long ing ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId)); } - if (_enableOffsetLagMetric) { + if (currentOffset != null && latestOffset != null) { _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionId)); } - IngestionInfo ingestionInfo = - new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffsetFetcher); - - if (latestOffsetFetcher != null) { - StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get(); - ingestionInfo.updateLatestOffset(latestOffset); - } - - return ingestionInfo; - } else { - v.updateIngestionTimes(ingestionTimeMs, firstStreamIngestionTimeMs); - v.updateCurrentOffset(currentOffset); - return v; } + return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset); }); // If we are consuming we do not need to track this partition for removal. @@ -407,28 +351,6 @@ public void timeoutInactivePartitions() { } } - /** - * Updates the latest offsets for each partition at a configurable frequency to reduce load. - */ - private void updateLatestOffsets() { - if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) { - return; - } - for (Map.Entry entry : _ingestionInfoMap.entrySet()) { - int partitionId = entry.getKey(); - IngestionInfo ingestionInfo = entry.getValue(); - Supplier latestOffsetFetcher = ingestionInfo._latestOffsetFetcher; - if (latestOffsetFetcher != null) { - try { - StreamPartitionMsgOffset latestOffset = latestOffsetFetcher.get(); - ingestionInfo.updateLatestOffset(latestOffset); - } catch (Exception e) { - LOGGER.debug("Failed to fetch latest offset for partition {}", partitionId, e); - } - } - } - } - /** * This function is invoked when a segment goes from CONSUMING to ONLINE, so we can assert whether the partition of * the segment is still hosted by this server after some interval of time. @@ -478,35 +400,20 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionId) { } public long getPartitionIngestionOffsetLag(int partitionId) { - try { - IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); - if (ingestionInfo == null) { - return 0; - } - StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; - StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; - if (currentOffset == null || latestOffset == null) { - return 0; - } - // TODO: Support other types of offsets - if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { - return 0; - } - long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); - - if (offsetLag < 0) { - LOGGER.debug( - "Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to " - + "latestOffset not being updated", - partitionId, currentOffset, latestOffset); - return 0; - } - - return offsetLag; - } catch (Exception e) { - LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e); + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + if (ingestionInfo == null) { + return 0; + } + StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; + StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; + if (currentOffset == null || latestOffset == null) { + return 0; + } + // TODO: Support other types of offsets + if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { return 0; } + return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); } /* diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index f95cf2918e9..ecf5cb12cd8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.BooleanSupplier; -import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -1843,10 +1842,10 @@ private void createPartitionMetadataProvider(String reason) { private void updateIngestionMetrics(RowMetadata metadata) { if (metadata != null) { try { - Supplier latestOffsetFetcher = () -> fetchLatestStreamOffset(5000, true); + StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true); _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(), - latestOffsetFetcher); + latestOffset); } catch (Exception e) { _segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", e); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 2051010acdf..afc1e445291 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -141,7 +141,7 @@ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, Supplier latestOffset) { + @Nullable StreamPartitionMsgOffset latestOffset) { _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 85109fe2582..9cb527b121d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -22,7 +22,6 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; -import java.util.function.Supplier; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.stream.LongMsgOffset; @@ -309,25 +308,23 @@ public void testRecordIngestionDelayOffset() { // Test tracking offset lag for a single partition StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100); - Supplier latestOffsetFetcher = () -> new LongMsgOffset(200); + StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200); ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, - latestOffsetFetcher); + latestOffset0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 100); // Test tracking offset lag for another partition StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50); StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150); - latestOffsetFetcher = () -> new LongMsgOffset(150); ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1, - latestOffsetFetcher); + latestOffset1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1), 100); // Update offset lag for partition0 msgOffset0 = new LongMsgOffset(150); - latestOffsetFetcher = () -> new LongMsgOffset(200); - + latestOffset0 = new LongMsgOffset(200); ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, - latestOffsetFetcher); + latestOffset0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 50); ingestionDelayTracker.shutdown();