Skip to content

Commit

Permalink
Rework SubprocessImpl for extern-worker
Browse files Browse the repository at this point in the history
Summary:
SubprocessImpl works, but is pretty slow. Most of this slowness comes
down to how it stores blobs and it's input/output format. For blobs,
it uses a single file per blob. This can result in a huge number of
files, which stresses the file system. Moreover, the input/output
format it uses has a lot of sub-directories filled with symlinks,
which is again stressful.

Rework SubprocessImpl to be more efficient. Instead of an individual
file per blob, it keeps a pool of blob files. Blobs are just appended
to the end of the next free blob file. Blob files are allocated
per-thread, so there's no need for synchronization. As an additional
optimization, blobs below a certain threshold aren't stored on disk at
all, and are instead stored "inline" in the RefId. To allow for all of
this, RefId now has a m_size and a m_extra field. m_size is *always*
the blob size (which is useful), and can be used to distinguish inline
from not. m_extra contains the offset of the blob within the blob
file.

To address input/output overhead, the notion of creating directories
is discarded entirely. Instead the metadata is encoded in a string
(using BlobEncoder and BlobDecoder), and sent over to the worker via
stdin. Once the worker has done it's work and written any blobs to
disk, it likewise returns it's output via a pipe, which the parent can
read. This avoids writing to disk at all.

Reviewed By: edwinsmith

Differential Revision: D39153960

fbshipit-source-id: 933d10d39463e57897ba75e58e93b790d88be654
  • Loading branch information
ricklavoie authored and facebook-github-bot committed Sep 1, 2022
1 parent 2cb71fd commit 280e689
Show file tree
Hide file tree
Showing 6 changed files with 1,170 additions and 379 deletions.
2 changes: 2 additions & 0 deletions hphp/hhbbc/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5640,6 +5640,8 @@ materialize_inputs(const IndexData& index,
auto program = std::make_unique<php::Program>();

auto const loadAndParse = [&] (Chunk chunk) -> coro::Task<void> {
HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR;

auto [classes, funcs, units] = HPHP_CORO_AWAIT(coro::collect(
client->load(std::move(chunk.classes)),
client->load(std::move(chunk.funcs)),
Expand Down
2 changes: 1 addition & 1 deletion hphp/util/blob-encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct BlobEncoder {
void writeRaw(const char* ptr, size_t size) {
auto const start = m_blob.size();
m_blob.resize(start + size);
std::copy(ptr, ptr + size, &m_blob[start]);
std::copy(ptr, ptr + size, m_blob.data() + start);
}

/*
Expand Down
41 changes: 34 additions & 7 deletions hphp/util/extern-worker-detail.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,32 @@ auto typesToValues(F&& f) {

//////////////////////////////////////////////////////////////////////

// Abstracts away how a worker should obtain it's inputs, and write
// it's outputs. These functions are tightly coupled with the logic in
// JobBase::serialize and JobBase::deserialize.
struct ISource {
virtual ~ISource() = default;
virtual std::string blob() = 0;
virtual Optional<std::string> optBlob() = 0;
virtual std::vector<std::string> variadic() = 0;
virtual void initDone() = 0;
virtual bool inputEnd() const = 0;
virtual void nextInput() = 0;
virtual void finish() = 0;
};

struct ISink {
virtual ~ISink() = default;
virtual void blob(const std::string&) = 0;
virtual void optBlob(const Optional<std::string>&) = 0;
virtual void variadic(const std::vector<std::string>&) = 0;
virtual void nextOutput() = 0;
virtual void startFini() = 0;
virtual void finish() = 0;
};

//////////////////////////////////////////////////////////////////////

// Base class for Jobs. This provide a consistent interface to invoke
// through.
struct JobBase {
Expand All @@ -323,15 +349,16 @@ struct JobBase {
explicit JobBase(const std::string& name);
virtual ~JobBase() = default;

template <typename T> static T deserialize(const std::filesystem::path&);
template <typename T> static void serialize(const T&,
size_t,
const std::filesystem::path&);
template <typename T> static T deserialize(ISource&);
template <typename T> static void serialize(const T&, ISink&);

private:
virtual void init(const std::filesystem::path&) const = 0;
virtual void fini(const std::filesystem::path&) const = 0;
virtual void run(const std::filesystem::path&, const std::filesystem::path&) const = 0;
template <typename T> static T deserializeBlob(std::string);
template <typename T> static std::string serializeBlob(const T&);

virtual void init(ISource&) const = 0;
virtual void fini(ISink&) const = 0;
virtual void run(ISource&, ISink&) const = 0;

std::string m_name;

Expand Down
174 changes: 99 additions & 75 deletions hphp/util/extern-worker-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,63 +43,63 @@ Job<C>::Job() : detail::JobBase{C::name()} {}
// expects, then invoke the function with those types.

template <typename C>
void Job<C>::init(const std::filesystem::path& root) const {
void Job<C>::init(detail::ISource& source) const {
using namespace detail;
using Args = typename Params<decltype(C::init)>::type;

// For each expected input, load the file, and deserialize it into
// For each expected input, load it, and deserialize it into
// the appropriate type. Return the types as a tuple, which can then
// std::apply to C::init, passing the inputs.
size_t DEBUG_ONLY nextIdx = 0;
std::apply(
C::init,
typesToValues<Args>(
[&] (size_t idx, auto tag) {
return deserialize<typename decltype(tag)::Type>(
root / folly::to<std::string>(idx)
);
assertx(idx == nextIdx++);
return deserialize<typename decltype(tag)::Type>(source);
}
)
);
source.initDone();

using Ret = typename Return<decltype(C::init)>::type;
static_assert(std::is_void_v<Ret>, "init() must return void");
}

template <typename C>
void Job<C>::fini(const std::filesystem::path& outputRoot) const {
void Job<C>::fini(detail::ISink& sink) const {
using namespace detail;

sink.startFini();
using Ret = typename Return<decltype(C::fini)>::type;
if constexpr (std::is_void_v<Ret>) {
C::fini();
} else {
auto const output = outputRoot / "fini";
std::filesystem::create_directory(output, outputRoot);
auto const v = C::fini();
time("writing fini outputs", [&] { return serialize(v, 0, output); });
time("writing fini outputs", [&] { return serialize(v, sink); });
}
}

template <typename C>
void Job<C>::run(const std::filesystem::path& inputRoot,
const std::filesystem::path& outputRoot) const {
void Job<C>::run(detail::ISource& source, detail::ISink& sink) const {
using namespace detail;

// For each expected input, load the file, and deserialize it into
// the appropriate type, turning all of the types into a tuple.
// For each expected input, load it, and deserialize it into the
// appropriate type, turning all of the types into a tuple.
using Args = typename Params<decltype(C::run)>::type;
size_t DEBUG_ONLY nextIdx = 0;
auto inputs = time(
"loading inputs",
[&] {
return typesToValues<Args>(
[&] (size_t idx, auto tag) {
return deserialize<typename decltype(tag)::Type>(
inputRoot / folly::to<std::string>(idx)
);
assertx(idx == nextIdx++);
return deserialize<typename decltype(tag)::Type>(source);
}
);
}
);
source.nextInput();

// Apply the tuple to C::run, passing the types as parameters.
auto outputs = time(
Expand All @@ -110,8 +110,9 @@ void Job<C>::run(const std::filesystem::path& inputRoot,
using Ret = typename Return<decltype(C::run)>::type;
static_assert(!std::is_void_v<Ret>, "run() must return something");

// Serialize the outputs into the output directory.
time("writing outputs", [&] { return serialize(outputs, 0, outputRoot); });
// Serialize the outputs
time("writing outputs", [&] { return serialize(outputs, sink); });
sink.nextOutput();
}

//////////////////////////////////////////////////////////////////////
Expand All @@ -120,81 +121,99 @@ namespace detail {

//////////////////////////////////////////////////////////////////////

// Given a file path, load the contents of the file, deserialize them
// into the type T, and return it.
// Turn a blob into a specific (non-marker) type
template <typename T>
T JobBase::deserialize(const std::filesystem::path& path) {
T JobBase::deserializeBlob(std::string blob) {
using namespace detail;
static_assert(!IsMarker<T>::value, "Special markers cannot be nested");
if constexpr (std::is_same<T, std::string>::value) {
// A std::string is always stored as itself (this lets us store
// files as their contents without having to encode them).
return readFile(path);
} else if constexpr (IsVariadic<T>::value) {
// files directly as their contents without having to encode
// them).
return blob;
} else {
// For most types, the data is encoded using BlobEncoder, so undo
// that.
BlobDecoder decoder{blob.data(), blob.size()};
return decoder.makeWhole<T>();
}
}

// Deserialize the given input source into the type T and return
// it. The type might include markers, which might trigger
// sub-deserializations.
template <typename T>
T JobBase::deserialize(ISource& source) {
using namespace detail;
static_assert(!IsMulti<T>::value, "Multi can only be used as return type");

if constexpr (IsVariadic<T>::value) {
static_assert(!IsMarker<typename T::Type>::value,
"Special markers cannot be nested");
// Variadic<T> is actually a directory, not a file. Recurse into
// it, and do the deserialization for every file within it.
auto const blobs = source.variadic();
T out;
for (size_t i = 0;; ++i) {
auto const valPath = path / folly::to<std::string>(i);
// A break in the numbering means the end of the vector.
if (!std::filesystem::exists(valPath)) break;
out.vals.emplace_back(deserialize<typename T::Type>(valPath));
out.vals.reserve(blobs.size());
for (auto const& blob : blobs) {
out.vals.emplace_back(deserializeBlob<typename T::Type>(blob));
}
return out;
} else if constexpr (IsOpt<T>::value) {
static_assert(!IsMarker<typename T::Type>::value,
"Special markers cannot be nested");
// Opt<T> is like T, except the file may not exist (so is nullopt
// Opt<T> is like T, except the data may not exist (so is nullopt
// otherwise).
T out;
if (std::filesystem::exists(path)) {
out.val.emplace(deserialize<typename T::Type>(path));
if (auto const blob = source.optBlob()) {
out.val.emplace(deserializeBlob<typename T::Type>(*blob));
}
return out;
} else {
// For most types, the data is encoded using BlobEncoder, so undo
// that.
static_assert(!IsMulti<T>::value, "Multi can only be used as return type");
auto const data = readFile(path);
BlobDecoder decoder{data.data(), data.size()};
return decoder.makeWhole<T>();
return deserializeBlob<T>(source.blob());
}
}

// Given a value, an index of that value (its positive in the output
// values), and an output root, serialize the value, and write its
// contents to the appropriate file.
// Serialize the given (non-marker) value into a blob
template <typename T>
void JobBase::serialize(const T& v,
size_t idx,
const std::filesystem::path& root) {
std::string JobBase::serializeBlob(const T& v) {
using namespace detail;
static_assert(!IsMarker<T>::value,
"Special markers cannot be nested");
if constexpr (std::is_same<T, std::string>::value) {
// std::string isn't serialized, but written as itself as
// root/idx.
return writeFile(root / folly::to<std::string>(idx), v.data(), v.size());
} else if constexpr (IsVariadic<T>::value) {
// For Variadic<T>, we create a directory root/idx, and under it,
// write a file for every element in the vector.
// std::string always encodes to itself
return v;
} else {
BlobEncoder encoder;
encoder(v);
return std::string{(const char*)encoder.data(), encoder.size()};
}
}

// Serialize the given value into a blob and write it out to the given
// output sink. The value's type might be a marker, which can trigger
// sub-serializations.
template <typename T>
void JobBase::serialize(const T& v, ISink& sink) {
using namespace detail;
if constexpr (IsVariadic<T>::value) {
static_assert(!IsMarker<typename T::Type>::value,
"Special markers cannot be nested");
auto const path = root / folly::to<std::string>(idx);
std::filesystem::create_directory(path, root);
for (size_t i = 0; i < v.vals.size(); ++i) {
serialize(v.vals[i], i, path);
}
using namespace folly::gen;
auto const blobs = from(v.vals)
| map([&] (const typename T::Type& t) { return serializeBlob(t); })
| as<std::vector>();
sink.variadic(blobs);
} else if constexpr (IsOpt<T>::value) {
// Opt<T> is like T, except nothing is written if the value isn't
// present.
static_assert(!IsMarker<typename T::Type>::value,
"Special markers cannot be nested");
if (!v.val.has_value()) return;
serialize(*v.val, idx, root);
sink.optBlob(
v.val.has_value() ? serializeBlob(*v.val) : Optional<std::string>{}
);
} else if constexpr (IsMulti<T>::value) {
// Treat Multi as equivalent to std::tuple (IE, write each element
// to a separate file).
assertx(idx == 0);
// separately).
size_t DEBUG_ONLY nextTupleIdx = 0;
for_each(
v.vals,
[&] (auto const& elem, size_t tupleIdx) {
Expand All @@ -204,18 +223,12 @@ void JobBase::serialize(const T& v,
>::value,
"Multi cannot be nested"
);
serialize(elem, tupleIdx, root);
assertx(tupleIdx == nextTupleIdx++);
serialize(elem, sink);
}
);
} else {
// Most types are just encoded with BlobEncoder and written as
// root/idx
BlobEncoder encoder;
encoder(v);
writeFile(
root / folly::to<std::string>(idx),
(const char*)encoder.data(), encoder.size()
);
sink.blob(serializeBlob(v));
}
}

Expand All @@ -225,24 +238,35 @@ void JobBase::serialize(const T& v,

//////////////////////////////////////////////////////////////////////

inline RefId::RefId(std::string id, size_t size)
: m_id{std::move(id)}, m_size{size}
inline RefId::RefId(std::string id, size_t size, size_t extra)
: m_id{std::move(id)}, m_size{size}, m_extra{extra}
{}

inline bool RefId::operator==(const RefId& o) const {
return std::tie(m_id, m_size) == std::tie(o.m_id, o.m_size);
return
std::tie(m_id, m_extra, m_size) ==
std::tie(o.m_id, o.m_extra, o.m_size);
}

inline bool RefId::operator!=(const RefId& o) const {
return !(*this == o);
}

inline bool RefId::operator<(const RefId& o) const {
return std::tie(m_size, m_id) < std::tie(o.m_size, o.m_id);
return
std::tie(m_id, m_extra, m_size) <
std::tie(o.m_id, o.m_extra, o.m_size);
}

inline std::string RefId::toString() const {
return folly::sformat("{}:{}", m_id, m_size);
// Don't print out the extra field if it's zero, to avoid clutter
// for implementations which don't use it. The id might contain
// binary data, so escape it before printing.
if (m_extra) {
return folly::sformat("{}:{}:{}", folly::humanify(m_id), m_extra, m_size);
} else {
return folly::sformat("{}:{}", folly::humanify(m_id), m_size);
}
}

//////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 280e689

Please sign in to comment.