Skip to content

Commit

Permalink
Refactor Stream & Fix FileStream
Browse files Browse the repository at this point in the history
- Added member function flush()
- Update FileStream to flush to file on calling seek() and close() functions
- Fix FileStream close() function to catch exception on flush errors
  • Loading branch information
smlu committed Jun 14, 2020
1 parent 94b2c5d commit 3cc0ac3
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 127 deletions.
1 change: 1 addition & 0 deletions libraries/libim/io/binarystream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace libim {
virtual bool canWrite() const override;

protected:
virtual void flush() override {}
virtual std::size_t readsome(byte_t* data, std::size_t length) const override;
virtual std::size_t writesome(const byte_t* data, std::size_t length) override;

Expand Down
31 changes: 30 additions & 1 deletion libraries/libim/io/filestream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,43 @@ namespace libim {
explicit FileStream(std::string filePath, bool truncate, Mode mode = ReadWrite);
explicit FileStream(const std::filesystem::path& filePath, Mode mode = ReadWrite);
explicit FileStream(const std::filesystem::path& filePath, bool truncate, Mode mode = ReadWrite);

/**
* Destructor
* @note File stream is closed automatically.
* @throw can throw if call to close() throws.
* See close()
*/
virtual ~FileStream() override;

virtual void seek(std::size_t position) const override;
/**
* Move current reading or writing cursor position in file to new offset.
*
* @note When stream is writable it calls flush() before moving cursor.
* @param offset - new cursor offset relative to beginning of the stream.
* @throw FileStreamError - if call to flush() fails or
* unable to move cursor e.g. offset out of file size bounds.
*/
virtual void seek(std::size_t offset) const override;

virtual std::size_t size() const override;
virtual std::size_t tell() const override;
virtual bool canRead() const override;
virtual bool canWrite() const override;

/**
* Closes file stream.
* @note Before file is closed the flush() member function is called.
* @throw Can throw under a debugger on windows if CloseHandle fails.
*/
virtual void close();

/**
* Writes data from output buffer to file
* @throw FileStreamError if unable to write or flush data to file.
*/
virtual void flush() override;

protected:
virtual std::size_t readsome(byte_t* data, std::size_t length) const override;
virtual std::size_t writesome(const byte_t* data, std::size_t length) override;
Expand Down Expand Up @@ -59,6 +87,7 @@ namespace libim {
{}

private:
using FileStream::flush;
using FileStream::write;
};

Expand Down
212 changes: 94 additions & 118 deletions libraries/libim/io/impl/filestream.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "../filestream.h"
#include "../binarystream.h"
#include "iobuffer.h"

#include <libim/common.h>
#include <libim/types/safe_cast.h>

#include <algorithm>
#include <array>

#include <filesystem>
#include <iterator>

Expand Down Expand Up @@ -35,52 +37,6 @@ constexpr std::size_t kBufferSize = 4096;

#define MAX_WRITE_FILE_SIZE 1'000'000'000 // 1GB

template<std::size_t BufferSize>
struct IOBuffer final : public std::array<byte_t, BufferSize>
{
using Base_ = std::array<byte_t, BufferSize>;

IOBuffer()
{
reset();
}

std::size_t write(const byte_t* data, std::size_t size)
{
std::size_t nWrite = safe_cast<std::size_t>(std::distance(pos_, Base_::end()));
if(size < nWrite) {
nWrite = size;
}
pos_ = std::copy(data, data + nWrite, pos_);
return nWrite;
}

std::size_t size() const noexcept
{
return safe_cast<std::size_t>(
std::distance(Base_::cbegin(), typename Base_::const_iterator(pos_))
);
}

std::size_t capacity() const noexcept
{
return Base_::max_size();
}

bool hasData() const
{
return pos_ != Base_::begin();
}

void reset()
{
pos_ = Base_::begin();
}

private:
typename Base_::iterator pos_;
};


std::string getLastErrorAsString()
{
Expand Down Expand Up @@ -143,45 +99,49 @@ struct FileStream::FileStreamImpl
}

#ifdef OS_WINDOWS

/* Open file */
/* Open file
Since obuffer is used the write operations could be also done without buffering.
To do this add flags FILE_FLAG_NO_BUFFERING and FILE_FLAG_WRITE_THROUGH
to flag param of function CreateFileX.
See: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-flushfilebuffers#remarks
*/
#if _WIN32_WINNT >= _WIN32_WINNT_WIN8

std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> converter;

std::wstring wPath = converter.from_bytes(filePath.c_str());

fileHandle = CreateFile2(
wPath.c_str(),
flags,
FILE_SHARE_READ,
(flags == GENERIC_READ ? OPEN_EXISTING : OPEN_ALWAYS),
nullptr);
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> converter;
std::wstring wPath = converter.from_bytes(filePath.c_str());

hFile = CreateFile2(
wPath.c_str(),
flags,
FILE_SHARE_READ,
(flags == GENERIC_READ ? OPEN_EXISTING : OPEN_ALWAYS),
nullptr
);
#else
fileHandle = CreateFileA(
filePath.c_str(),
flags,
FILE_SHARE_READ,
NULL,
(flags == GENERIC_READ ? OPEN_EXISTING : OPEN_ALWAYS),
FILE_ATTRIBUTE_NORMAL,
NULL);
hFile = CreateFileA(
filePath.c_str(),
flags,
FILE_SHARE_READ,
NULL,
(flags == GENERIC_READ ? OPEN_EXISTING : OPEN_ALWAYS),
FILE_ATTRIBUTE_NORMAL,
NULL
);
#endif

if (fileHandle == INVALID_HANDLE_VALUE) {
if (hFile == INVALID_HANDLE_VALUE) {
throw FileStreamError(getLastErrorAsString());
}

/* Get file size */
LARGE_INTEGER lSize {{0, 0}};
if(!GetFileSizeEx(fileHandle, &lSize)) {
if(!GetFileSizeEx(hFile, &lSize)) {
throw FileStreamError("Error getting the file size: " + getLastErrorAsString());
}

#ifdef _WIN64
fileSize = lSize.QuadPart;
fileSize = lSize.QuadPart;
#else
fileSize = lSize.LowPart;
fileSize = lSize.LowPart;
#endif

#else // Not Win
Expand All @@ -205,7 +165,7 @@ struct FileStream::FileStreamImpl
{
ssize_t nRead = 0;
#ifdef OS_WINDOWS
if(!ReadFile(fileHandle, reinterpret_cast<LPVOID>(data), safe_cast<DWORD>(length), reinterpret_cast<LPDWORD>(&nRead), nullptr)) {
if(!ReadFile(hFile, reinterpret_cast<LPVOID>(data), safe_cast<DWORD>(length), reinterpret_cast<LPDWORD>(&nRead), nullptr)) {
#else
nRead = ::read(fd, data, length);
if(nRead == -1) {
Expand All @@ -218,26 +178,41 @@ struct FileStream::FileStreamImpl
}


std::size_t flush()
std::size_t flush(bool sync)
{
ssize_t nWritten = 0;
if((mode == Write || mode == ReadWrite) && buffer_.hasData())
if(mode == Write || mode == ReadWrite)
{
#ifdef OS_WINDOWS
if(!WriteFile(
fileHandle,
reinterpret_cast<LPCVOID>(buffer_.data()),
static_cast<DWORD>(buffer_.size()),
reinterpret_cast<LPDWORD>(&nWritten),
nullptr)){
#else
nWritten = ::write(fd, buffer_.data(), buffer_.size());
if(nWritten == -1) {
#endif
throw FileStreamError("Failed to write data to file: " + getLastErrorAsString());
if(obuffer_.hasData())
{
#ifdef OS_WINDOWS
if (!WriteFile(
hFile,
reinterpret_cast<LPCVOID>(obuffer_.data()),
static_cast<DWORD>(obuffer_.size()),
reinterpret_cast<LPDWORD>(&nWritten),
nullptr)){
#else // Unix
nWritten = ::write(fd, obuffer_.data(), obuffer_.size());
if (nWritten == -1) {
#endif
throw FileStreamError("Failed to write data to file: " + getLastErrorAsString());
}

// Clear out buffer
obuffer_.reset();
}

buffer_.reset();
if(sync)
{
#ifdef OS_WINDOW
if(!FlushFileBuffers(hFile)) {
#else // Unix
if (fsync(fd) != 0) {
#endif
throw FileStreamError("Failed to flush data to file: " + getLastErrorAsString());
}
}
}

return static_cast<std::size_t>(nWritten);
Expand All @@ -248,10 +223,10 @@ struct FileStream::FileStreamImpl
std::size_t nTotalWritten = 0;
do
{
std::size_t nWritten = buffer_.write(data, length - nTotalWritten);
std::size_t nWritten = obuffer_.write(data, length - nTotalWritten);
if(nWritten < length)
{
auto nFlushed = flush();
auto nFlushed = flush(/*sync=*/false);
if(nFlushed < nWritten) {
return nFlushed;
}
Expand All @@ -276,52 +251,49 @@ struct FileStream::FileStreamImpl
return nTotalWritten;
}

void seek(std::size_t position) const
void seek(std::size_t offset) const
{
const_cast<FileStreamImpl*>(this)->flush();
const_cast<FileStreamImpl*>(this)->flush(/*sync=*/true);
#ifdef OS_WINDOWS
LARGE_INTEGER li;
li.QuadPart = position;
li.LowPart = SetFilePointer(fileHandle, li.LowPart, &li.HighPart, FILE_BEGIN);
li.QuadPart = offset;
li.LowPart = SetFilePointer(hFile, li.LowPart, &li.HighPart, FILE_BEGIN);
if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
#else
auto off = lseek(fd, position, SEEK_SET);
auto off = lseek(fd, offset, SEEK_SET);
if(off == -1) {
#endif
throw FileStreamError(std::string("Failed to seek to position: ") + getLastErrorAsString());
throw FileStreamError(std::string("Failed to seek to offset: ") + getLastErrorAsString());
}

currentOffset = position;
currentOffset = offset;
if(currentOffset > fileSize) {
fileSize = currentOffset;
}
}

void close()
{
flush();
try {
flush(/*sync=*/true);
}
// catch any exception that could occur
catch(...){}

#ifdef OS_WINDOWS
if(fileHandle != INVALID_HANDLE_VALUE)
// Close file handle
#ifdef OS_WINDOWS
if(hFile != INVALID_HANDLE_VALUE)
{
if(mode == Write || mode == ReadWrite){
FlushFileBuffers(fileHandle);
}

CloseHandle(fileHandle);
fileHandle = INVALID_HANDLE_VALUE;
CloseHandle(hFile); // can throw under a debugger
hFile = INVALID_HANDLE_VALUE;
}
#else
#else
if(fd > 0)
{
if(mode == Write || mode == ReadWrite){
fsync(fd);
}

::close(fd);
fd = -1;
}
#endif
#endif
}

~FileStreamImpl()
Expand All @@ -336,17 +308,16 @@ struct FileStream::FileStreamImpl
mutable std::size_t currentOffset = 0;

private:
IOBuffer<kBufferSize> buffer_;
IOBuffer<kBufferSize> obuffer_;

#ifdef OS_WINDOWS
HANDLE fileHandle = INVALID_HANDLE_VALUE;

//#error "FileStreamImpl not defined!"
HANDLE hFile = INVALID_HANDLE_VALUE;
#else
int fd = 0;
#endif
};


FileStream::FileStream(std::string filePath, Mode mode) :
FileStream(std::move(filePath), false, mode)
{}
Expand Down Expand Up @@ -374,9 +345,9 @@ std::size_t FileStream::writesome(const byte_t* data, std::size_t length)
return m_fs->write(data, length);
}

void FileStream::seek(std::size_t position) const
void FileStream::seek(std::size_t offset) const
{
m_fs->seek(position);
m_fs->seek(offset);
}

std::size_t FileStream::size() const
Expand Down Expand Up @@ -404,6 +375,11 @@ void FileStream::close()
m_fs->close();
}

void FileStream::flush()
{
m_fs->flush(/*sync=*/true);
}

std::size_t FileStream::readsome(byte_t* data, std::size_t length) const
{
if(m_fs->currentOffset + length >= m_fs->fileSize){
Expand Down
Loading

0 comments on commit 3cc0ac3

Please sign in to comment.