Skip to content

Commit

Permalink
Implement cache refresh logic after file changed
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan committed May 19, 2024
1 parent c6ffb83 commit 665f62b
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class CacheConfig
private boolean validationEnabled;
private CacheQuotaScope cacheQuotaScope = GLOBAL;
private Optional<DataSize> defaultCacheQuota = Optional.empty();
private boolean lastModifiedTimeCheckEnabled;

@Nullable
public URI getBaseDirectory()
Expand Down Expand Up @@ -112,4 +113,17 @@ public CacheConfig setDefaultCacheQuota(DataSize defaultCacheQuota)
}
return this;
}

public boolean isLastModifiedTimeCheckEnabled()
{
return lastModifiedTimeCheckEnabled;
}

@Config("cache.last-modified-time-check-enabled")
@ConfigDescription("Enable the check of the last modified time for each cached file entry")
public CacheConfig setLastModifiedTimeCheckEnabled(boolean lastModifiedTimeCheckEnabled)
{
this.lastModifiedTimeCheckEnabled = lastModifiedTimeCheckEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,30 @@ public ExtendedFileSystem createCachingFileSystem(
ExtendedFileSystem fileSystem,
CacheManager cacheManager,
boolean cachingEnabled,
CacheType cacheType,
boolean validationEnabled)
CacheConfig cacheConfig)
throws IOException
{
if (!cachingEnabled) {
return fileSystem;
}

checkState(cacheType != null);
checkState(cacheConfig != null && cacheConfig.getCacheType() != null);

switch (cacheType) {
switch (cacheConfig.getCacheType()) {
case FILE_MERGE:
return new FileMergeCachingFileSystem(
factoryUri,
factoryConfig,
cacheManager,
fileSystem,
validationEnabled);
cacheConfig.isValidationEnabled());
case ALLUXIO:
ExtendedFileSystem cachingFileSystem = new AlluxioCachingFileSystem(fileSystem, factoryUri, validationEnabled);
ExtendedFileSystem cachingFileSystem = new AlluxioCachingFileSystem(fileSystem,
factoryUri, cacheConfig.isValidationEnabled(), cacheConfig.isLastModifiedTimeCheckEnabled());
cachingFileSystem.initialize(factoryUri, factoryConfig);
return cachingFileSystem;
default:
throw new IllegalArgumentException("Invalid CacheType: " + cacheType.name());
throw new IllegalArgumentException("Invalid CacheType: " + cacheConfig.getCacheType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ public class AlluxioCachingFileSystem
{
private static final int BUFFER_SIZE = 65536;
private final boolean cacheValidationEnabled;
private final boolean lastModifiedTimeCheckEnabled;
private boolean cacheQuotaEnabled;
private LocalCacheFileSystem localCacheFileSystem;

public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri)
{
this(dataTier, uri, false);
this(dataTier, uri, false, false);
}

public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri, boolean cacheValidationEnabled)
public AlluxioCachingFileSystem(ExtendedFileSystem dataTier, URI uri, boolean cacheValidationEnabled, boolean lastModifiedTimeCheckEnabled)
{
super(dataTier, uri);
this.cacheValidationEnabled = cacheValidationEnabled;
this.lastModifiedTimeCheckEnabled = lastModifiedTimeCheckEnabled;
}

@Override
Expand Down Expand Up @@ -87,6 +89,9 @@ public FSDataInputStream openFile(Path path, HiveFileContext hiveFileContext)
.setFolder(false)
.setLength(hiveFileContext.getFileSize().getAsLong());
String cacheIdentifier = md5().hashString(path.toString(), UTF_8).toString();
if (lastModifiedTimeCheckEnabled) {
cacheIdentifier = md5().hashString(cacheIdentifier + hiveFileContext.getModificationTime(), UTF_8).toString();
}
// CacheContext is the mechanism to pass the cache related context to the source filesystem
CacheContext cacheContext = PrestoCacheContext.build(cacheIdentifier, hiveFileContext, cacheQuotaEnabled);
URIStatus uriStatus = new URIStatus(info, cacheContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public void testDefaults()
.setBaseDirectory(null)
.setValidationEnabled(false)
.setCacheQuotaScope(GLOBAL)
.setDefaultCacheQuota(null));
.setDefaultCacheQuota(null)
.setLastModifiedTimeCheckEnabled(false));
}

@Test
Expand All @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings()
.put("cache.validation-enabled", "true")
.put("cache.cache-quota-scope", "TABLE")
.put("cache.default-cache-quota", "1GB")
.put("cache.last-modified-time-check-enabled", "true")
.build();

CacheConfig expected = new CacheConfig()
Expand All @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings()
.setBaseDirectory(new URI("tcp://abc"))
.setValidationEnabled(true)
.setCacheQuotaScope(TABLE)
.setDefaultCacheQuota(DataSize.succinctDataSize(1, DataSize.Unit.GIGABYTE));
.setDefaultCacheQuota(DataSize.succinctDataSize(1, DataSize.Unit.GIGABYTE))
.setLastModifiedTimeCheckEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class TestAlluxioCachingFileSystem
private final byte[] data = new byte[DATA_LENGTH];
private URI cacheDirectory;
private String testFilePath;
private long lastModifiedTime;
private Map<String, Long> baseline = new HashMap<>();

@BeforeClass
Expand All @@ -92,6 +93,7 @@ public void setupMethod()
{
// This path is only used for in memory stream without file materialized.
testFilePath = String.format("/test/file_%d", new Random().nextLong());
lastModifiedTime = 0;
resetBaseline();
}

Expand Down Expand Up @@ -120,7 +122,7 @@ private void testBasic(boolean validationEnabled)
.setValidationEnabled(validationEnabled);
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
Path p = new Path("/tmp");
assertEquals(fileSystem.getDefaultBlockSize(p), 1024L);
assertEquals(fileSystem.getDefaultReplication(p), 10);
Expand Down Expand Up @@ -183,6 +185,64 @@ private void testBasic(boolean validationEnabled)
validateBuffer(data, pageOffset + PAGE_SIZE * 2 - 10, buffer, 400, PAGE_SIZE + 20);
}

@Test(timeOut = 30_000)
public void testCacheRefreshAfterFileChanged()
throws Exception
{
CacheConfig cacheConfig = new CacheConfig()
.setCacheType(ALLUXIO)
.setCachingEnabled(true)
.setBaseDirectory(cacheDirectory)
.setValidationEnabled(false)
.setLastModifiedTimeCheckEnabled(true);
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
Path p = new Path("/tmp");

byte[] buffer = new byte[PAGE_SIZE * 2];
int pageOffset = PAGE_SIZE;

// new read
resetBaseline();
assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
validateBuffer(data, pageOffset + 10, buffer, 0, 100);

// read from cache
resetBaseline();
assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
validateBuffer(data, pageOffset + 20, buffer, 0, 90);

//file updated
lastModifiedTime = 100;

//read from external as new read
resetBaseline();
assertEquals(readFully(fileSystem, pageOffset + 10, buffer, 0, 100), 100);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 100);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, PAGE_SIZE);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
validateBuffer(data, pageOffset + 10, buffer, 0, 100);

// read from cache
resetBaseline();
assertEquals(readFully(fileSystem, pageOffset + 20, buffer, 0, 90), 90);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE, 90);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_READ_EXTERNAL, 0);
checkMetrics(MetricKey.CLIENT_CACHE_BYTES_EVICTED, 0);
validateBuffer(data, pageOffset + 20, buffer, 0, 90);
}

@Test(invocationCount = 10)
public void testStress()
throws ExecutionException, InterruptedException, URISyntaxException, IOException
Expand All @@ -194,7 +254,7 @@ public void testStress()
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig()
.setMaxCacheSize(new DataSize(10, KILOBYTE));
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig);
stressTest(data, (position, buffer, offset, length) -> {
try {
readFully(cachingFileSystem, position, buffer, offset, length);
Expand All @@ -219,7 +279,7 @@ public void testSyncRestoreFailure()
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig();
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
try {
cachingFileSystem(configuration);
cachingFileSystem(configuration, cacheConfig);
}
finally {
cacheDirectory.setWritable(true);
Expand All @@ -240,7 +300,7 @@ public void testBasicReadWithAsyncRestoreFailure()
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
configuration.set("alluxio.user.client.cache.async.restore.enabled", String.valueOf(true));
try {
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);
long state = MetricsSystem.counter(MetricKey.CLIENT_CACHE_STATE.getName()).getCount();
assertTrue(state == CacheManager.State.READ_ONLY.getValue() || state == CacheManager.State.NOT_IN_USE.getValue());
// different cases of read can still proceed even cache is read-only or not-in-use
Expand Down Expand Up @@ -322,7 +382,7 @@ public void testQuotaBasics()
.setCacheQuotaScope(TABLE);
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true);
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);

byte[] buffer = new byte[10240];

Expand Down Expand Up @@ -358,7 +418,7 @@ public void testQuotaUpdated()
.setCacheQuotaScope(TABLE);
AlluxioCacheConfig alluxioCacheConfig = new AlluxioCacheConfig().setCacheQuotaEnabled(true);
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, cacheConfig);

byte[] buffer = new byte[10240];

Expand Down Expand Up @@ -397,7 +457,7 @@ public void testStressWithQuota()
.setMaxCacheSize(new DataSize(10, KILOBYTE))
.setCacheQuotaEnabled(true);
Configuration configuration = getHdfsConfiguration(cacheConfig, alluxioCacheConfig);
AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem cachingFileSystem = cachingFileSystem(configuration, cacheConfig);
stressTest(data, (position, buffer, offset, length) -> {
try {
readFully(cachingFileSystem, cacheQuota, position, buffer, offset, length);
Expand Down Expand Up @@ -425,7 +485,7 @@ public void testInitialization()
configuration.set("sink.jmx.class", jmxClass);
configuration.set("sink.jmx.domain", metricsDomain);

AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration);
AlluxioCachingFileSystem fileSystem = cachingFileSystem(configuration, new CacheConfig());
Configuration conf = fileSystem.getConf();
assertTrue(conf.getBoolean("alluxio.user.local.cache.enabled", false));
assertEquals(cacheDirectory.getPath(), conf.get("alluxio.user.client.cache.dirs", "bad result"));
Expand Down Expand Up @@ -468,14 +528,15 @@ private void checkMetrics(MetricKey metricsKey, long expected)
assertEquals(MetricsSystem.meter(metricsKey.getName()).getCount() - baseline.getOrDefault(metricsKey.getName(), 0L), expected);
}

private AlluxioCachingFileSystem cachingFileSystem(Configuration configuration)
private AlluxioCachingFileSystem cachingFileSystem(Configuration configuration, CacheConfig cacheConfig)
throws URISyntaxException, IOException
{
Map<Path, byte[]> files = new HashMap<>();
files.put(new Path(testFilePath), data);
ExtendedFileSystem testingFileSystem = new TestingFileSystem(files, configuration);
URI uri = new URI("alluxio://test:8020/");
AlluxioCachingFileSystem cachingFileSystem = new AlluxioCachingFileSystem(testingFileSystem, uri);
AlluxioCachingFileSystem cachingFileSystem = new AlluxioCachingFileSystem(testingFileSystem, uri,
cacheConfig.isValidationEnabled(), cacheConfig.isLastModifiedTimeCheckEnabled());
cachingFileSystem.initialize(uri, configuration);
return cachingFileSystem;
}
Expand Down Expand Up @@ -511,7 +572,7 @@ private int readFully(AlluxioCachingFileSystem fileSystem, CacheQuota quota, lon
OptionalLong.of(DATA_LENGTH),
OptionalLong.of(offset),
OptionalLong.of(length),
0,
lastModifiedTime,
false))) {
return stream.read(position, buffer, offset, length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public Configuration getConfiguration(HdfsContext context, URI uri)
(ExtendedFileSystem) fileSystem,
cacheManager,
context.getSession().map(HiveSessionProperties::isCacheEnabled).orElse(cacheConfig.isCachingEnabled()),
cacheConfig.getCacheType(),
cacheConfig.isValidationEnabled());
cacheConfig);
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "cannot create caching file system", e);
Expand Down

0 comments on commit 665f62b

Please sign in to comment.