Skip to content

Commit

Permalink
FFDB-006: Introduce RandomAccessLog with a FileChannel backed impleme…
Browse files Browse the repository at this point in the history
…ntation + Segment model (#12)
  • Loading branch information
godcrampy authored Dec 29, 2023
1 parent bcc5aed commit c7311b6
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.sahilbondre.firefly.log;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileChannelRandomAccessLog implements RandomAccessLog {

private final String filePath;
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;

public FileChannelRandomAccessLog(String filePath) throws IOException {
this.filePath = filePath;
this.randomAccessFile = new RandomAccessFile(filePath, "rw");
this.fileChannel = randomAccessFile.getChannel();
}

@Override
public long size() throws IOException {
return fileChannel.size();
}

@Override
public String getFilePath() {
return filePath;
}

@Override
public void append(byte[] message) throws IOException {
fileChannel.position(fileChannel.size());
ByteBuffer buffer = ByteBuffer.wrap(message);
fileChannel.write(buffer);
}

@Override
public byte[] read(long offset, long length) throws IOException, InvalidRangeException {
long fileSize = fileChannel.size();

if (offset < 0 || offset >= fileSize || length <= 0 || offset + length > fileSize) {
throw new InvalidRangeException("Invalid offset or length");
}

fileChannel.position(offset);
ByteBuffer buffer = ByteBuffer.allocate((int) length);
fileChannel.read(buffer);
return buffer.array();
}

public void close() throws IOException {
fileChannel.close();
randomAccessFile.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.sahilbondre.firefly.log;

public class InvalidRangeException extends IllegalArgumentException {
public InvalidRangeException(String message) {
super(message);
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/sahilbondre/firefly/log/RandomAccessLog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sahilbondre.firefly.log;

import java.io.IOException;

public interface RandomAccessLog {
long size() throws IOException;

String getFilePath();

void append(byte[] message) throws IOException;

byte[] read(long offset, long length) throws IOException, InvalidRangeException;

void close() throws IOException;
}
131 changes: 131 additions & 0 deletions src/main/java/com/sahilbondre/firefly/model/Segment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.sahilbondre.firefly.model;

public class Segment {

private static final int CRC_LENGTH = 2;
private static final int KEY_SIZE_LENGTH = 2;
private static final int VALUE_SIZE_LENGTH = 4;
/**
* Class representing a segment of the log file.
* <p>
* Two big decisions here to save on performance:
* 1. We're using byte[] instead of ByteBuffer.
* 2. We're trusting that the byte[] is immutable and hence avoiding copying it.
* <p>
* <p>
* 2 bytes: CRC
* 2 bytes: Key Size
* 4 bytes: Value Size
* n bytes: Key
* m bytes: Value
* <p>
* Note: Value size is four bytes because we're using a 32-bit integer to store the size.
* Int is 32-bit signed, so we can only store 2^31 - 1 bytes in the value.
* Hence, the maximum size of the value is 2,147,483,647 bytes or 2.14 GB.
*/
private final byte[] bytes;

private Segment(byte[] bytes) {
this.bytes = bytes;
}

public static Segment fromByteArray(byte[] data) {
return new Segment(data);
}

public static Segment fromKeyValuePair(byte[] key, byte[] value) {
int keySize = key.length;
int valueSize = value.length;
int totalSize = CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize + valueSize;

byte[] segment = new byte[totalSize];

// Set key size
segment[2] = (byte) ((keySize >> 8) & 0xFF);
segment[3] = (byte) (keySize & 0xFF);

// Set value size
segment[4] = (byte) ((valueSize >> 24) & 0xFF);
segment[5] = (byte) ((valueSize >> 16) & 0xFF);
segment[6] = (byte) ((valueSize >> 8) & 0xFF);
segment[7] = (byte) (valueSize & 0xFF);

System.arraycopy(key, 0, segment, CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH, keySize);

System.arraycopy(value, 0, segment, CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize, valueSize);

byte[] crc = new Segment(segment).crc16();
segment[0] = crc[0];
segment[1] = crc[1];

return new Segment(segment);
}

public byte[] getBytes() {
return bytes;
}

public byte[] getKey() {
int keySize = getKeySize();
return extractBytes(CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH, keySize);
}

public byte[] getValue() {
int keySize = getKeySize();
int valueSize = getValueSize();
return extractBytes(CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + keySize, valueSize);
}

public int getKeySize() {
return ((bytes[2] & 0xff) << 8) | (bytes[3] & 0xff);
}

public int getValueSize() {
return ((bytes[4] & 0xff) << 24) | ((bytes[5] & 0xff) << 16) |
((bytes[6] & 0xff) << 8) | (bytes[7] & 0xff);
}

public byte[] getCrc() {
return extractBytes(0, CRC_LENGTH);
}

public boolean isChecksumValid() {
byte[] crc = crc16();
return crc[0] == bytes[0] && crc[1] == bytes[1];
}

public boolean isSegmentValid() {
return isChecksumValid() && getKeySize() > 0 && getValueSize() >= 0
&& bytes.length == CRC_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH + getKeySize() + getValueSize();
}

private byte[] extractBytes(int offset, int length) {
byte[] result = new byte[length];
System.arraycopy(bytes, offset, result, 0, length);
return result;
}

private byte[] crc16(byte[] segment) {
int crc = 0xFFFF; // Initial CRC value
int polynomial = 0x1021; // CRC-16 polynomial

for (int index = CRC_LENGTH; index < segment.length; index++) {
byte b = segment[index];
crc ^= (b & 0xFF) << 8;

for (int i = 0; i < 8; i++) {
if ((crc & 0x8000) != 0) {
crc = (crc << 1) ^ polynomial;
} else {
crc <<= 1;
}
}
}

return new byte[]{(byte) ((crc >> 8) & 0xFF), (byte) (crc & 0xFF)};
}

private byte[] crc16() {
return crc16(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.sahilbondre.firefly.log;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.junit.jupiter.api.Assertions.*;

class FileChannelRandomAccessLogTest {

private static final String TEST_FILE_NAME = "src/test/resources/test.log";
private static final Path TEST_FILE_PATH = Paths.get(TEST_FILE_NAME);
private FileChannelRandomAccessLog randomAccessLog;

@BeforeEach
void setUp() throws IOException {
Files.deleteIfExists(TEST_FILE_PATH);
Files.createFile(TEST_FILE_PATH);
randomAccessLog = new FileChannelRandomAccessLog(TEST_FILE_NAME);
}

@AfterEach
void tearDown() throws IOException {
randomAccessLog.close();
Files.deleteIfExists(TEST_FILE_PATH);
}

@Test
void givenEmptyLog_whenGetSize_thenReturnsZero() throws IOException {
// Given
// An empty log

// When
long size = randomAccessLog.size();

// Then
assertEquals(0, size);
}


@Test
void givenLogWithContent_whenGetSize_thenReturnsCorrectSize() throws IOException {
// Given
// A log with content

// When
randomAccessLog.append("Hello".getBytes());
randomAccessLog.append("World".getBytes());

// Then
assertEquals(10, randomAccessLog.size());
}

@Test
void givenLog_whenGetFilePath_thenReturnsCorrectPath() {
// Given
// A log instance

// When
String filePath = randomAccessLog.getFilePath();

// Then
assertEquals(TEST_FILE_NAME, filePath);
}

@Test
void givenLogWithContent_whenAppend_thenAppendsCorrectly() throws IOException {
// Given
// A log with existing content

// When
randomAccessLog.append("Hello".getBytes());
randomAccessLog.append("World".getBytes());
byte[] result = randomAccessLog.read(0, randomAccessLog.size());

// Then
assertArrayEquals("HelloWorld".getBytes(), result);
}

@Test
void givenLogWithContent_whenReadSubset_thenReturnsSubset() throws IOException, InvalidRangeException {
// Given
// A log with existing content

// When
randomAccessLog.append("The quick brown fox".getBytes());
byte[] result = randomAccessLog.read(4, 5);

// Then
assertArrayEquals("quick".getBytes(), result);
}

@Test
void givenInvalidRange_whenRead_thenThrowsInvalidRangeException() throws IOException {
// Given
randomAccessLog.append("Hello".getBytes());
// An invalid range for reading

// When/Then
assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(0, -1));
assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(-1, 5));
assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(15, 10));
assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(2, 10));
assertThrows(InvalidRangeException.class, () -> randomAccessLog.read(0, 6));
}

@Test
void givenLog_whenClose_thenFileIsNotAccessible() throws IOException {
// Given
// An open log

// When
randomAccessLog.close();

// Then
assertTrue(Files.exists(TEST_FILE_PATH));
assertThrows(IOException.class, () -> randomAccessLog.append("NewContent".getBytes()));
}
}
Loading

0 comments on commit c7311b6

Please sign in to comment.