diff --git a/presto-cache/src/main/java/com/facebook/presto/cache/CacheConfig.java b/presto-cache/src/main/java/com/facebook/presto/cache/CacheConfig.java index 831f6d1b626b..7c223b0ab749 100644 --- a/presto-cache/src/main/java/com/facebook/presto/cache/CacheConfig.java +++ b/presto-cache/src/main/java/com/facebook/presto/cache/CacheConfig.java @@ -33,6 +33,7 @@ public class CacheConfig private boolean validationEnabled; private CacheQuotaScope cacheQuotaScope = GLOBAL; private Optional defaultCacheQuota = Optional.empty(); + private boolean lastModifiedTimeCheckEnabled; @Nullable public URI getBaseDirectory() @@ -112,4 +113,17 @@ public CacheConfig setDefaultCacheQuota(DataSize defaultCacheQuota) } return this; } + + public boolean isLastModifiedTimeCheckEnabled() + { + return lastModifiedTimeCheckEnabled; + } + + @Config("cache.last-modified-time-check-enabled") + @ConfigDescription("Enable the check of the last modified time for each cached file entry") + public CacheConfig setLastModifiedTimeCheckEnabled(boolean lastModifiedTimeCheckEnabled) + { + this.lastModifiedTimeCheckEnabled = lastModifiedTimeCheckEnabled; + return this; + } } diff --git a/presto-cache/src/main/java/com/facebook/presto/cache/CacheFactory.java b/presto-cache/src/main/java/com/facebook/presto/cache/CacheFactory.java index d6aa11a83ecc..5e5a2018bdca 100644 --- a/presto-cache/src/main/java/com/facebook/presto/cache/CacheFactory.java +++ b/presto-cache/src/main/java/com/facebook/presto/cache/CacheFactory.java @@ -31,30 +31,30 @@ public ExtendedFileSystem createCachingFileSystem( ExtendedFileSystem fileSystem, CacheManager cacheManager, boolean cachingEnabled, - CacheType cacheType, - boolean validationEnabled) + CacheConfig cacheConfig) throws IOException { if (!cachingEnabled) { return fileSystem; } - checkState(cacheType != null); + checkState(cacheConfig != null && cacheConfig.getCacheType() != null); - switch (cacheType) { + switch (cacheConfig.getCacheType()) { case FILE_MERGE: return new FileMergeCachingFileSystem( factoryUri, factoryConfig, cacheManager, fileSystem, - validationEnabled); + cacheConfig.isValidationEnabled()); case ALLUXIO: - ExtendedFileSystem cachingFileSystem = new AlluxioCachingFileSystem(fileSystem, factoryUri, validationEnabled); + ExtendedFileSystem cachingFileSystem = new AlluxioCachingFileSystem(fileSystem, + factoryUri, cacheConfig.isValidationEnabled(), cacheConfig.isLastModifiedTimeCheckEnabled()); cachingFileSystem.initialize(factoryUri, factoryConfig); return cachingFileSystem; default: - throw new IllegalArgumentException("Invalid CacheType: " + cacheType.name()); + throw new IllegalArgumentException("Invalid CacheType: " + cacheConfig.getCacheType()); } } } diff --git a/presto-cache/src/main/java/com/facebook/presto/cache/alluxio/AlluxioCachingFileSystem.java b/presto-cache/src/main/java/com/facebook/presto/cache/alluxio/AlluxioCachingFileSystem.java index 7ffdbce49c94..54c4627187f7 100644 --- a/presto-cache/src/main/java/com/facebook/presto/cache/alluxio/AlluxioCachingFileSystem.java +++ b/presto-cache/src/main/java/com/facebook/presto/cache/alluxio/AlluxioCachingFileSystem.java @@ -37,18 +37,20 @@ public class AlluxioCachingFileSystem { private static final int BUFFER_SIZE = 65536; private final boolean cacheValidationEnabled; + private final boolean lastModifiedTimeCheckEnabled; private boolean cacheQuotaEnabled; private LocalCacheFileSystem localCacheFileSystem; public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri) { - this(dataTier, uri, false); + this(dataTier, uri, false, false); } - public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri, boolean cacheValidationEnabled) + public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri, boolean cacheValidationEnabled, boolean lastModifiedTimeCheckEnabled) { super(dataTier, uri); this.cacheValidationEnabled = cacheValidationEnabled; + this.lastModifiedTimeCheckEnabled = lastModifiedTimeCheckEnabled; } @Override @@ -87,6 +89,9 @@ public FSDataInputStream openFile(Path path, HiveFileContext hiveFileContext) .setFolder(false) .setLength(hiveFileContext.getFileSize().getAsLong()); String cacheIdentifier = md5().hashString(path.toString(), UTF_8).toString(); + if (lastModifiedTimeCheckEnabled) { + cacheIdentifier = md5().hashString(cacheIdentifier + hiveFileContext.getModificationTime(), UTF_8).toString(); + } // CacheContext is the mechanism to pass the cache related context to the source filesystem CacheContext cacheContext = PrestoCacheContext.build(cacheIdentifier, hiveFileContext, cacheQuotaEnabled); URIStatus uriStatus = new URIStatus(info, cacheContext); diff --git a/presto-cache/src/test/java/com/facebook/presto/cache/TestCacheConfig.java b/presto-cache/src/test/java/com/facebook/presto/cache/TestCacheConfig.java index d7af4c4c3fd1..11312cf785a6 100644 --- a/presto-cache/src/test/java/com/facebook/presto/cache/TestCacheConfig.java +++ b/presto-cache/src/test/java/com/facebook/presto/cache/TestCacheConfig.java @@ -38,7 +38,8 @@ public void testDefaults() .setBaseDirectory(null) .setValidationEnabled(false) .setCacheQuotaScope(GLOBAL) - .setDefaultCacheQuota(null)); + .setDefaultCacheQuota(null) + .setLastModifiedTimeCheckEnabled(false)); } @Test @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings() .put("cache.validation-enabled", "true") .put("cache.cache-quota-scope", "TABLE") .put("cache.default-cache-quota", "1GB") + .put("cache.last-modified-time-check-enabled", "true") .build(); CacheConfig expected = new CacheConfig() @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings() .setBaseDirectory(new URI("tcp://abc")) .setValidationEnabled(true) .setCacheQuotaScope(TABLE) - .setDefaultCacheQuota(DataSize.succinctDataSize(1, DataSize.Unit.GIGABYTE)); + .setDefaultCacheQuota(DataSize.succinctDataSize(1, DataSize.Unit.GIGABYTE)) + .setLastModifiedTimeCheckEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-cache/src/test/java/com/facebook/presto/cache/alluxio/TestAlluxioCachingFileSystem.java b/presto-cache/src/test/java/com/facebook/presto/cache/alluxio/TestAlluxioCachingFileSystem.java index 1f1689af12dd..8285e436178f 100644 --- a/presto-cache/src/test/java/com/facebook/presto/cache/alluxio/TestAlluxioCachingFileSystem.java +++ b/presto-cache/src/test/java/com/facebook/presto/cache/alluxio/TestAlluxioCachingFileSystem.java @@ -69,6 +69,7 @@ public class TestAlluxioCachingFileSystem private final byte[] data = new byte[DATA_LENGTH]; private URI cacheDirectory; private String testFilePath; + private long lastModifiedTime; private Map baseline = new HashMap<>(); @BeforeClass @@ -92,6 +93,7 @@ public void setupMethod() { // This path is only used for in memory stream without file materialized. testFilePath = String.format("/test/file_%d", new Random().nextLong()); + lastModifiedTime = 0; resetBaseline(); } @@ -120,7 +122,7 @@ private void testBasic(boolean validationEnabled) .setValidationEnabled(validationEnabled); AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig(); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); - AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig); Path p = new Path("/tmp"); assertEquals(fileSystem.getDefaultBlockSize(p), 1024L); assertEquals(fileSystem.getDefaultReplication(p), 10); @@ -183,6 +185,64 @@ private void testBasic(boolean validationEnabled) validateBuffer(data, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20); } + @Test(timeOut = 30_000) + public void testCacheRefreshAfterFileChanged() + throws Exception + { + CacheConfig cacheConfig = new CacheConfig() + .setCacheType(ALLUXIO) + .setCachingEnabled(true) + .setBaseDirectory(cacheDirectory) + .setValidationEnabled(false) + .setLastModifiedTimeCheckEnabled(true); + AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig(); + Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig); + Path p = new Path("/tmp"); + + byte[] buffer = new byte[PAGE_SIZE * 2]; + int pageOffset = PAGE_SIZE; + + // new read + resetBaseline(); + assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0); + validateBuffer(data, pageOffset + 10, buffer, 0, 100); + + // read from cache + resetBaseline(); + assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0); + validateBuffer(data, pageOffset + 20, buffer, 0, 90); + + //file updated + lastModifiedTime = 100; + + //read from external as new read + resetBaseline(); + assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0); + validateBuffer(data, pageOffset + 10, buffer, 0, 100); + + // read from cache + resetBaseline(); + assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0); + checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0); + validateBuffer(data, pageOffset + 20, buffer, 0, 90); + } + @Test(invocationCount = 10) public void testStress() throws ExecutionException, InterruptedException, URISyntaxException, IOException @@ -194,7 +254,7 @@ public void testStress() AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig() .setMaxCacheSize(new DataSize(10, KILOBYTE)); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); - AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig); stressTest(data, (position, buffer, offset, length) -> { try { readFully(cachingFileSystem, position, buffer, offset, length); @@ -219,7 +279,7 @@ public void testSyncRestoreFailure() AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig(); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); try { - cachingFileSystem(configuration); + cachingFileSystem(configuration, cacheConfig); } finally { cacheDirectory.setWritable(true); @@ -240,7 +300,7 @@ public void testBasicReadWithAsyncRestoreFailure() Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); configuration.set("alluxio.user.client.cache.async.restore.enabled", String.valueOf(true)); try { - AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig); long state = MetricsSystem.counter(MetricKey.CLIENT_CACHE_STATE.getName()).getCount(); assertTrue(state == CacheManager.State.READ_ONLY.getValue() || state == CacheManager.State.NOT_IN_USE.getValue()); // different cases of read can still proceed even cache is read-only or not-in-use @@ -322,7 +382,7 @@ public void testQuotaBasics() .setCacheQuotaScope(TABLE); AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); - AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig); byte[] buffer = new byte[10240]; @@ -358,7 +418,7 @@ public void testQuotaUpdated() .setCacheQuotaScope(TABLE); AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); - AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig); byte[] buffer = new byte[10240]; @@ -397,7 +457,7 @@ public void testStressWithQuota() .setMaxCacheSize(new DataSize(10, KILOBYTE)) .setCacheQuotaEnabled(true); Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig); - AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig); stressTest(data, (position, buffer, offset, length) -> { try { readFully(cachingFileSystem, cacheQuota, position, buffer, offset, length); @@ -425,7 +485,7 @@ public void testInitialization() configuration.set("sink.jmx.class", jmxClass); configuration.set("sink.jmx.domain", metricsDomain); - AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration); + AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, new CacheConfig()); Configuration conf = fileSystem.getConf(); assertTrue(conf.getBoolean("alluxio.user.local.cache.enabled", false)); assertEquals(cacheDirectory.getPath(), conf.get("alluxio.user.client.cache.dirs", "bad result")); @@ -468,14 +528,15 @@ private void checkMetrics(MetricKey metricsKey, long expected) assertEquals(MetricsSystem.meter(metricsKey.getName()).getCount() - baseline.getOrDefault(metricsKey.getName(), 0L), expected); } - private AlluxioCachingFileSystem cachingFileSystem(Configuration configuration) + private AlluxioCachingFileSystem cachingFileSystem(Configuration configuration, CacheConfig cacheConfig) throws URISyntaxException, IOException { Map files = new HashMap<>(); files.put(new Path(testFilePath), data); ExtendedFileSystem testingFileSystem = new TestingFileSystem(files, configuration); URI uri = new URI("alluxio://test:8020/"); - AlluxioCachingFileSystem cachingFileSystem = new AlluxioCachingFileSystem(testingFileSystem, uri); + AlluxioCachingFileSystem cachingFileSystem = new AlluxioCachingFileSystem(testingFileSystem, uri, + cacheConfig.isValidationEnabled(), cacheConfig.isLastModifiedTimeCheckEnabled()); cachingFileSystem.initialize(uri, configuration); return cachingFileSystem; } @@ -511,7 +572,7 @@ private int readFully(AlluxioCachingFileSystem fileSystem, CacheQuota quota, lon OptionalLong.of(DATA_LENGTH), OptionalLong.of(offset), OptionalLong.of(length), - 0, + lastModifiedTime, false))) { return stream.read(position, buffer, offset, length); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/cache/HiveCachingHdfsConfiguration.java b/presto-hive/src/main/java/com/facebook/presto/hive/cache/HiveCachingHdfsConfiguration.java index 52d7327ef5b7..937d2fa6887c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/cache/HiveCachingHdfsConfiguration.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/cache/HiveCachingHdfsConfiguration.java @@ -78,8 +78,7 @@ public Configuration getConfiguration(HdfsContext context, URI uri) (ExtendedFileSystem) fileSystem, cacheManager, context.getSession().map(HiveSessionProperties::isCacheEnabled).orElse(cacheConfig.isCachingEnabled()), - cacheConfig.getCacheType(), - cacheConfig.isValidationEnabled()); + cacheConfig); } catch (IOException e) { throw new PrestoException(GENERIC_INTERNAL_ERROR, "cannot create caching file system", e);