Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compressions and make LZ4 default for backup 2.0 #833

Open
wants to merge 5 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ private Path getV2Location() {
Path prefix =
Paths.get(
getV2Prefix().toString(),
type.toString(),
directives.getType().toString(),
getLastModified().toEpochMilli() + "");

if (type == BackupFileType.SST_V2) {
if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2) {
prefix = Paths.get(prefix.toString(), keyspace, columnFamily);
}

return Paths.get(
prefix.toString(),
getCompression().toString(),
getEncryption().toString(),
directives.getCompression().toString(),
directives.getEncryption().toString(),
fileName);
}

Expand All @@ -108,23 +108,25 @@ private void parseV2Location(String remoteFile) {
"Too few elements (expected: [%d]) in path: %s", 8, remoteFilePath));
int name_count_idx = 3;

type = BackupFileType.valueOf(remoteFilePath.getName(name_count_idx++).toString());
directives.withType(
UploadDownloadDirectives.BackupFileType.valueOf(
remoteFilePath.getName(name_count_idx++).toString()));
setLastModified(
Instant.ofEpochMilli(
Long.parseLong(remoteFilePath.getName(name_count_idx++).toString())));

if (type == BackupFileType.SST_V2) {
if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2) {
keyspace = remoteFilePath.getName(name_count_idx++).toString();
columnFamily = remoteFilePath.getName(name_count_idx++).toString();
}

setCompression(
ICompression.CompressionAlgorithm.valueOf(
remoteFilePath.getName(name_count_idx++).toString()));

setEncryption(
IFileCryptography.CryptographyAlgorithm.valueOf(
remoteFilePath.getName(name_count_idx++).toString()));
directives
.withCompression(
ICompression.CompressionAlgorithm.valueOf(
remoteFilePath.getName(name_count_idx++).toString()))
.withEncryption(
IFileCryptography.CryptographyAlgorithm.valueOf(
remoteFilePath.getName(name_count_idx++).toString()));
fileName = remoteFilePath.getName(name_count_idx).toString();
}

Expand All @@ -133,8 +135,8 @@ private Path getV1Location() {
Paths.get(
getV1Prefix().toString(),
DateUtil.formatyyyyMMddHHmm(time),
type.toString());
if (BackupFileType.isDataFile(type))
directives.getType().toString());
if (UploadDownloadDirectives.BackupFileType.isDataFile(directives.getType()))
path = Paths.get(path.toString(), keyspace, columnFamily);
return Paths.get(path.toString(), fileName);
}
Expand All @@ -147,8 +149,15 @@ private void parseV1Location(Path remoteFilePath) {
"Too few elements (expected: [%d]) in path: %s", 7, remoteFilePath));

time = DateUtil.getDate(remoteFilePath.getName(4).toString());
type = BackupFileType.valueOf(remoteFilePath.getName(5).toString());
if (BackupFileType.isDataFile(type)) {
/*
We put this as hard-coded value as Backup V1 will always remain snappy compressed to keep backwards compatibility
*/
directives
.withCompression(ICompression.CompressionAlgorithm.SNAPPY)
.withType(
UploadDownloadDirectives.BackupFileType.valueOf(
remoteFilePath.getName(5).toString()));
if (UploadDownloadDirectives.BackupFileType.isDataFile(directives.getType())) {
keyspace = remoteFilePath.getName(6).toString();
columnFamily = remoteFilePath.getName(7).toString();
}
Expand Down Expand Up @@ -179,7 +188,8 @@ private void parseV1Prefix(Path remoteFilePath) {
*/
@Override
public String getRemotePath() {
if (type == BackupFileType.SST_V2 || type == BackupFileType.META_V2) {
if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2
|| directives.getType() == UploadDownloadDirectives.BackupFileType.META_V2) {
return getV2Location().toString();
} else {
return getV1Location().toString();
Expand All @@ -191,14 +201,18 @@ public void parseRemote(String remoteFilePath) {
// Check for all backup file types to ensure how we parse
// TODO: We should clean this hack to get backupFileType for parsing when we delete V1 of
// backups.
for (BackupFileType fileType : BackupFileType.values()) {
directives.withRemotePath(Paths.get(remoteFilePath));

for (UploadDownloadDirectives.BackupFileType fileType :
UploadDownloadDirectives.BackupFileType.values()) {
if (remoteFilePath.contains(PATH_SEP + fileType.toString() + PATH_SEP)) {
type = fileType;
directives.withType(fileType);
break;
}
}

if (type == BackupFileType.SST_V2 || type == BackupFileType.META_V2) {
if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2
|| directives.getType() == UploadDownloadDirectives.BackupFileType.META_V2) {
parseV2Location(remoteFilePath);
} else {
parseV1Location(Paths.get(remoteFilePath));
Expand All @@ -221,7 +235,7 @@ public String remotePrefix(Date start, Date end, String location) {
}

@Override
public Path remoteV2Prefix(Path location, BackupFileType fileType) {
public Path remoteV2Prefix(Path location, UploadDownloadDirectives.BackupFileType fileType) {
if (location.getNameCount() <= 1) {
baseDir = config.getBackupLocation();
clusterName = config.getAppName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import com.netflix.priam.backup.BackupRestoreException;
import com.netflix.priam.backup.RangeReadInputStream;
import com.netflix.priam.compress.ChunkedStream;
import com.netflix.priam.compress.ICompression;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.cred.ICredential;
import com.netflix.priam.cryptography.IFileCryptography;
import com.netflix.priam.identity.config.InstanceInfo;
import com.netflix.priam.merics.BackupMetrics;
import com.netflix.priam.notification.BackupNotificationMgr;
import java.io.*;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
Expand All @@ -52,15 +50,14 @@ public class S3EncryptedFileSystem extends S3FileSystemBase {
@Inject
public S3EncryptedFileSystem(
Provider<AbstractBackupPath> pathProvider,
ICompression compress,
final IConfiguration config,
ICredential cred,
@Named("filecryptoalgorithm") IFileCryptography fileCryptography,
BackupMetrics backupMetrics,
BackupNotificationMgr backupNotificationMgr,
InstanceInfo instanceInfo) {

super(pathProvider, compress, config, backupMetrics, backupNotificationMgr);
super(pathProvider, config, backupMetrics, backupNotificationMgr);
this.encryptor = fileCryptography;
super.s3Client =
AmazonS3Client.builder()
Expand All @@ -70,14 +67,15 @@ public S3EncryptedFileSystem(
}

@Override
protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRestoreException {
try (OutputStream os = new FileOutputStream(localPath.toFile());
protected void downloadFileImpl(final AbstractBackupPath.UploadDownloadDirectives directives)
throws BackupRestoreException {
try (OutputStream os = new FileOutputStream(directives.getLocalPath().toFile());
RangeReadInputStream rris =
new RangeReadInputStream(
s3Client,
getShard(),
super.getFileSize(remotePath),
remotePath.toString())) {
super.getFileSize(directives.getRemotePath()),
directives.getRemotePath().toString())) {
/*
* To handle use cases where decompression should be done outside of the download. For example, the file have been compressed and then encrypted.
* Hence, decompressing it here would compromise the decryption.
Expand All @@ -86,7 +84,7 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe
} catch (Exception e) {
throw new BackupRestoreException(
"Exception encountered downloading "
+ remotePath
+ directives.getRemotePath()
+ " from S3 bucket "
+ getShard()
+ ", Msg: "
Expand All @@ -96,33 +94,36 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe
}

@Override
protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException {
long chunkSize = getChunkSize(localPath);
protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives)
throws BackupRestoreException {
long chunkSize = getChunkSize(directives.getLocalPath());
// initialize chunking request to aws
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(config.getBackupPrefix(), remotePath.toString());
new InitiateMultipartUploadRequest(
config.getBackupPrefix(), directives.getRemotePath().toString());
// Fetch the aws generated upload id for this chunking request
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
DataPart part =
new DataPart(
config.getBackupPrefix(),
remotePath.toString(),
directives.getRemotePath().toString(),
initResponse.getUploadId());
// Metadata on number of parts to be uploaded
List<PartETag> partETags = Lists.newArrayList();

// Read chunks from src, compress it, and write to temp file
File compressedDstFile = new File(localPath.toString() + ".compressed");
File compressedDstFile = new File(directives.getLocalPath().toString() + ".compressed");
if (logger.isDebugEnabled())
logger.debug(
"Compressing {} with chunk size {}",
compressedDstFile.getAbsolutePath(),
chunkSize);

try (InputStream in = new FileInputStream(localPath.toFile());
try (InputStream in = new FileInputStream(directives.getLocalPath().toFile());
BufferedOutputStream compressedBos =
new BufferedOutputStream(new FileOutputStream(compressedDstFile))) {
Iterator<byte[]> compressedChunks = new ChunkedStream(ICompression.DEFAULT_COMPRESSION, in, chunkSize);
Iterator<byte[]> compressedChunks =
new ChunkedStream(directives.getCompression(), in, chunkSize);
while (compressedChunks.hasNext()) {
byte[] compressedChunk = compressedChunks.next();
compressedBos.write(compressedChunk);
Expand All @@ -139,7 +140,8 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest
try (BufferedInputStream compressedBis =
new BufferedInputStream(new FileInputStream(compressedDstFile))) {
Iterator<byte[]> chunks =
this.encryptor.encryptStream(compressedBis, remotePath.toString());
this.encryptor.encryptStream(
compressedBis, directives.getRemotePath().toString());

// identifies this part position in the object we are uploading
int partNum = 0;
Expand All @@ -155,7 +157,7 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest
++partNum,
chunk,
config.getBackupPrefix(),
remotePath.toString(),
directives.getRemotePath().toString(),
initResponse.getUploadId());
S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags);
encryptedFileSize += chunk.length;
Expand All @@ -176,11 +178,12 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest
// identifies the combined object datav
CompleteMultipartUploadResult resultS3MultiPartUploadComplete =
new S3PartUploader(s3Client, part, partETags).completeUpload();
checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath);
checkSuccessfulUpload(resultS3MultiPartUploadComplete, directives.getLocalPath());
return encryptedFileSize;
} catch (Exception e) {
new S3PartUploader(s3Client, part, partETags).abortUpload();
throw new BackupRestoreException("Error uploading file: " + localPath, e);
throw new BackupRestoreException(
"Error uploading file: " + directives.getLocalPath(), e);
} finally {
if (compressedDstFile.exists()) compressedDstFile.delete();
}
Expand Down
Loading