From 280e6899b442796a197b5b236d3ddd50d0b62b5f Mon Sep 17 00:00:00 2001 From: Rick Lavoie Date: Thu, 1 Sep 2022 10:34:54 -0700 Subject: [PATCH] Rework SubprocessImpl for extern-worker Summary: SubprocessImpl works, but is pretty slow. Most of this slowness comes down to how it stores blobs and it's input/output format. For blobs, it uses a single file per blob. This can result in a huge number of files, which stresses the file system. Moreover, the input/output format it uses has a lot of sub-directories filled with symlinks, which is again stressful. Rework SubprocessImpl to be more efficient. Instead of an individual file per blob, it keeps a pool of blob files. Blobs are just appended to the end of the next free blob file. Blob files are allocated per-thread, so there's no need for synchronization. As an additional optimization, blobs below a certain threshold aren't stored on disk at all, and are instead stored "inline" in the RefId. To allow for all of this, RefId now has a m_size and a m_extra field. m_size is *always* the blob size (which is useful), and can be used to distinguish inline from not. m_extra contains the offset of the blob within the blob file. To address input/output overhead, the notion of creating directories is discarded entirely. Instead the metadata is encoded in a string (using BlobEncoder and BlobDecoder), and sent over to the worker via stdin. Once the worker has done it's work and written any blobs to disk, it likewise returns it's output via a pipe, which the parent can read. This avoids writing to disk at all. Reviewed By: edwinsmith Differential Revision: D39153960 fbshipit-source-id: 933d10d39463e57897ba75e58e93b790d88be654 --- hphp/hhbbc/index.cpp | 2 + hphp/util/blob-encoder.h | 2 +- hphp/util/extern-worker-detail.h | 41 +- hphp/util/extern-worker-inl.h | 174 ++-- hphp/util/extern-worker.cpp | 1317 +++++++++++++++++++++++------- hphp/util/extern-worker.h | 13 +- 6 files changed, 1170 insertions(+), 379 deletions(-) diff --git a/hphp/hhbbc/index.cpp b/hphp/hhbbc/index.cpp index 1b63dd4fab2e2..f1f1cad3535d2 100644 --- a/hphp/hhbbc/index.cpp +++ b/hphp/hhbbc/index.cpp @@ -5640,6 +5640,8 @@ materialize_inputs(const IndexData& index, auto program = std::make_unique(); auto const loadAndParse = [&] (Chunk chunk) -> coro::Task { + HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR; + auto [classes, funcs, units] = HPHP_CORO_AWAIT(coro::collect( client->load(std::move(chunk.classes)), client->load(std::move(chunk.funcs)), diff --git a/hphp/util/blob-encoder.h b/hphp/util/blob-encoder.h index 7aa669c05907d..8e558d6d3afd8 100644 --- a/hphp/util/blob-encoder.h +++ b/hphp/util/blob-encoder.h @@ -127,7 +127,7 @@ struct BlobEncoder { void writeRaw(const char* ptr, size_t size) { auto const start = m_blob.size(); m_blob.resize(start + size); - std::copy(ptr, ptr + size, &m_blob[start]); + std::copy(ptr, ptr + size, m_blob.data() + start); } /* diff --git a/hphp/util/extern-worker-detail.h b/hphp/util/extern-worker-detail.h index f6db64710e3b1..3944ecbb0b283 100644 --- a/hphp/util/extern-worker-detail.h +++ b/hphp/util/extern-worker-detail.h @@ -315,6 +315,32 @@ auto typesToValues(F&& f) { ////////////////////////////////////////////////////////////////////// +// Abstracts away how a worker should obtain it's inputs, and write +// it's outputs. These functions are tightly coupled with the logic in +// JobBase::serialize and JobBase::deserialize. +struct ISource { + virtual ~ISource() = default; + virtual std::string blob() = 0; + virtual Optional optBlob() = 0; + virtual std::vector variadic() = 0; + virtual void initDone() = 0; + virtual bool inputEnd() const = 0; + virtual void nextInput() = 0; + virtual void finish() = 0; +}; + +struct ISink { + virtual ~ISink() = default; + virtual void blob(const std::string&) = 0; + virtual void optBlob(const Optional&) = 0; + virtual void variadic(const std::vector&) = 0; + virtual void nextOutput() = 0; + virtual void startFini() = 0; + virtual void finish() = 0; +}; + +////////////////////////////////////////////////////////////////////// + // Base class for Jobs. This provide a consistent interface to invoke // through. struct JobBase { @@ -323,15 +349,16 @@ struct JobBase { explicit JobBase(const std::string& name); virtual ~JobBase() = default; - template static T deserialize(const std::filesystem::path&); - template static void serialize(const T&, - size_t, - const std::filesystem::path&); + template static T deserialize(ISource&); + template static void serialize(const T&, ISink&); private: - virtual void init(const std::filesystem::path&) const = 0; - virtual void fini(const std::filesystem::path&) const = 0; - virtual void run(const std::filesystem::path&, const std::filesystem::path&) const = 0; + template static T deserializeBlob(std::string); + template static std::string serializeBlob(const T&); + + virtual void init(ISource&) const = 0; + virtual void fini(ISink&) const = 0; + virtual void run(ISource&, ISink&) const = 0; std::string m_name; diff --git a/hphp/util/extern-worker-inl.h b/hphp/util/extern-worker-inl.h index d23eca80e5b18..13427b57aafbf 100644 --- a/hphp/util/extern-worker-inl.h +++ b/hphp/util/extern-worker-inl.h @@ -43,63 +43,63 @@ Job::Job() : detail::JobBase{C::name()} {} // expects, then invoke the function with those types. template -void Job::init(const std::filesystem::path& root) const { +void Job::init(detail::ISource& source) const { using namespace detail; using Args = typename Params::type; - // For each expected input, load the file, and deserialize it into + // For each expected input, load it, and deserialize it into // the appropriate type. Return the types as a tuple, which can then // std::apply to C::init, passing the inputs. + size_t DEBUG_ONLY nextIdx = 0; std::apply( C::init, typesToValues( [&] (size_t idx, auto tag) { - return deserialize( - root / folly::to(idx) - ); + assertx(idx == nextIdx++); + return deserialize(source); } ) ); + source.initDone(); using Ret = typename Return::type; static_assert(std::is_void_v, "init() must return void"); } template -void Job::fini(const std::filesystem::path& outputRoot) const { +void Job::fini(detail::ISink& sink) const { using namespace detail; + sink.startFini(); using Ret = typename Return::type; if constexpr (std::is_void_v) { C::fini(); } else { - auto const output = outputRoot / "fini"; - std::filesystem::create_directory(output, outputRoot); auto const v = C::fini(); - time("writing fini outputs", [&] { return serialize(v, 0, output); }); + time("writing fini outputs", [&] { return serialize(v, sink); }); } } template -void Job::run(const std::filesystem::path& inputRoot, - const std::filesystem::path& outputRoot) const { +void Job::run(detail::ISource& source, detail::ISink& sink) const { using namespace detail; - // For each expected input, load the file, and deserialize it into - // the appropriate type, turning all of the types into a tuple. + // For each expected input, load it, and deserialize it into the + // appropriate type, turning all of the types into a tuple. using Args = typename Params::type; + size_t DEBUG_ONLY nextIdx = 0; auto inputs = time( "loading inputs", [&] { return typesToValues( [&] (size_t idx, auto tag) { - return deserialize( - inputRoot / folly::to(idx) - ); + assertx(idx == nextIdx++); + return deserialize(source); } ); } ); + source.nextInput(); // Apply the tuple to C::run, passing the types as parameters. auto outputs = time( @@ -110,8 +110,9 @@ void Job::run(const std::filesystem::path& inputRoot, using Ret = typename Return::type; static_assert(!std::is_void_v, "run() must return something"); - // Serialize the outputs into the output directory. - time("writing outputs", [&] { return serialize(outputs, 0, outputRoot); }); + // Serialize the outputs + time("writing outputs", [&] { return serialize(outputs, sink); }); + sink.nextOutput(); } ////////////////////////////////////////////////////////////////////// @@ -120,81 +121,99 @@ namespace detail { ////////////////////////////////////////////////////////////////////// -// Given a file path, load the contents of the file, deserialize them -// into the type T, and return it. +// Turn a blob into a specific (non-marker) type template -T JobBase::deserialize(const std::filesystem::path& path) { +T JobBase::deserializeBlob(std::string blob) { using namespace detail; + static_assert(!IsMarker::value, "Special markers cannot be nested"); if constexpr (std::is_same::value) { // A std::string is always stored as itself (this lets us store - // files as their contents without having to encode them). - return readFile(path); - } else if constexpr (IsVariadic::value) { + // files directly as their contents without having to encode + // them). + return blob; + } else { + // For most types, the data is encoded using BlobEncoder, so undo + // that. + BlobDecoder decoder{blob.data(), blob.size()}; + return decoder.makeWhole(); + } +} + +// Deserialize the given input source into the type T and return +// it. The type might include markers, which might trigger +// sub-deserializations. +template +T JobBase::deserialize(ISource& source) { + using namespace detail; + static_assert(!IsMulti::value, "Multi can only be used as return type"); + + if constexpr (IsVariadic::value) { static_assert(!IsMarker::value, "Special markers cannot be nested"); - // Variadic is actually a directory, not a file. Recurse into - // it, and do the deserialization for every file within it. + auto const blobs = source.variadic(); T out; - for (size_t i = 0;; ++i) { - auto const valPath = path / folly::to(i); - // A break in the numbering means the end of the vector. - if (!std::filesystem::exists(valPath)) break; - out.vals.emplace_back(deserialize(valPath)); + out.vals.reserve(blobs.size()); + for (auto const& blob : blobs) { + out.vals.emplace_back(deserializeBlob(blob)); } return out; } else if constexpr (IsOpt::value) { static_assert(!IsMarker::value, "Special markers cannot be nested"); - // Opt is like T, except the file may not exist (so is nullopt + // Opt is like T, except the data may not exist (so is nullopt // otherwise). T out; - if (std::filesystem::exists(path)) { - out.val.emplace(deserialize(path)); + if (auto const blob = source.optBlob()) { + out.val.emplace(deserializeBlob(*blob)); } return out; } else { - // For most types, the data is encoded using BlobEncoder, so undo - // that. - static_assert(!IsMulti::value, "Multi can only be used as return type"); - auto const data = readFile(path); - BlobDecoder decoder{data.data(), data.size()}; - return decoder.makeWhole(); + return deserializeBlob(source.blob()); } } -// Given a value, an index of that value (its positive in the output -// values), and an output root, serialize the value, and write its -// contents to the appropriate file. +// Serialize the given (non-marker) value into a blob template -void JobBase::serialize(const T& v, - size_t idx, - const std::filesystem::path& root) { +std::string JobBase::serializeBlob(const T& v) { using namespace detail; + static_assert(!IsMarker::value, + "Special markers cannot be nested"); if constexpr (std::is_same::value) { - // std::string isn't serialized, but written as itself as - // root/idx. - return writeFile(root / folly::to(idx), v.data(), v.size()); - } else if constexpr (IsVariadic::value) { - // For Variadic, we create a directory root/idx, and under it, - // write a file for every element in the vector. + // std::string always encodes to itself + return v; + } else { + BlobEncoder encoder; + encoder(v); + return std::string{(const char*)encoder.data(), encoder.size()}; + } +} + +// Serialize the given value into a blob and write it out to the given +// output sink. The value's type might be a marker, which can trigger +// sub-serializations. +template +void JobBase::serialize(const T& v, ISink& sink) { + using namespace detail; + if constexpr (IsVariadic::value) { static_assert(!IsMarker::value, "Special markers cannot be nested"); - auto const path = root / folly::to(idx); - std::filesystem::create_directory(path, root); - for (size_t i = 0; i < v.vals.size(); ++i) { - serialize(v.vals[i], i, path); - } + using namespace folly::gen; + auto const blobs = from(v.vals) + | map([&] (const typename T::Type& t) { return serializeBlob(t); }) + | as(); + sink.variadic(blobs); } else if constexpr (IsOpt::value) { // Opt is like T, except nothing is written if the value isn't // present. static_assert(!IsMarker::value, "Special markers cannot be nested"); - if (!v.val.has_value()) return; - serialize(*v.val, idx, root); + sink.optBlob( + v.val.has_value() ? serializeBlob(*v.val) : Optional{} + ); } else if constexpr (IsMulti::value) { // Treat Multi as equivalent to std::tuple (IE, write each element - // to a separate file). - assertx(idx == 0); + // separately). + size_t DEBUG_ONLY nextTupleIdx = 0; for_each( v.vals, [&] (auto const& elem, size_t tupleIdx) { @@ -204,18 +223,12 @@ void JobBase::serialize(const T& v, >::value, "Multi cannot be nested" ); - serialize(elem, tupleIdx, root); + assertx(tupleIdx == nextTupleIdx++); + serialize(elem, sink); } ); } else { - // Most types are just encoded with BlobEncoder and written as - // root/idx - BlobEncoder encoder; - encoder(v); - writeFile( - root / folly::to(idx), - (const char*)encoder.data(), encoder.size() - ); + sink.blob(serializeBlob(v)); } } @@ -225,12 +238,14 @@ void JobBase::serialize(const T& v, ////////////////////////////////////////////////////////////////////// -inline RefId::RefId(std::string id, size_t size) - : m_id{std::move(id)}, m_size{size} +inline RefId::RefId(std::string id, size_t size, size_t extra) + : m_id{std::move(id)}, m_size{size}, m_extra{extra} {} inline bool RefId::operator==(const RefId& o) const { - return std::tie(m_id, m_size) == std::tie(o.m_id, o.m_size); + return + std::tie(m_id, m_extra, m_size) == + std::tie(o.m_id, o.m_extra, o.m_size); } inline bool RefId::operator!=(const RefId& o) const { @@ -238,11 +253,20 @@ inline bool RefId::operator!=(const RefId& o) const { } inline bool RefId::operator<(const RefId& o) const { - return std::tie(m_size, m_id) < std::tie(o.m_size, o.m_id); + return + std::tie(m_id, m_extra, m_size) < + std::tie(o.m_id, o.m_extra, o.m_size); } inline std::string RefId::toString() const { - return folly::sformat("{}:{}", m_id, m_size); + // Don't print out the extra field if it's zero, to avoid clutter + // for implementations which don't use it. The id might contain + // binary data, so escape it before printing. + if (m_extra) { + return folly::sformat("{}:{}:{}", folly::humanify(m_id), m_extra, m_size); + } else { + return folly::sformat("{}:{}", folly::humanify(m_id), m_size); + } } ////////////////////////////////////////////////////////////////////// diff --git a/hphp/util/extern-worker.cpp b/hphp/util/extern-worker.cpp index d97d3cb70f7da..1db0a77a3efcd 100644 --- a/hphp/util/extern-worker.cpp +++ b/hphp/util/extern-worker.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -67,6 +68,22 @@ TRACE_SET_MOD(extern_worker); ////////////////////////////////////////////////////////////////////// +// If passed via the command-line, the worker is running in "local" +// mode and will use a different mechanism to read inputs/write +// outputs. +const char* const g_local_option = "--local"; + +// If running in local mode, the FD of the pipe used to communicate +// output back to the parent. +constexpr int g_local_pipe_fd = 3; + +// For the subprocess impl, if the size of the data is <= this +// constant, we'll store it "inline" in the ref itself, rather than +// writing it to disk. This values probably needs more tuning. +constexpr size_t g_inline_size = 64; + +////////////////////////////////////////////////////////////////////// + struct Registry { hphp_fast_string_map registry; std::mutex lock; @@ -79,48 +96,196 @@ Registry& registry() { ////////////////////////////////////////////////////////////////////// -// folly::writeFile expects a container-like input, so this makes a -// ptr/length pair behave like one (without copying). -struct Adaptor { - size_t size() const { return m_size; } - bool empty() const { return !size(); } - const char& operator[](size_t idx) const { return m_ptr[idx]; } - const char* m_ptr; - size_t m_size; -}; +// Represents an open file. Used this over readFile/writeFile if you +// want to maintain a persistent FD, or want to just read/write just a +// portion of the file. Only reading or appending to the file is +// supported. +struct FD { + FD(const fs::path& path, bool read, bool write, bool create) + : m_path{path}, m_offset{0} { + assertx(IMPLIES(create, write)); + + auto flags = O_CLOEXEC; + if (read) { + flags |= (write ? O_RDWR : O_RDONLY); + } else { + assertx(write); + flags |= O_WRONLY; + } -////////////////////////////////////////////////////////////////////// + if (write) flags |= O_APPEND; + if (create) { + flags |= O_CREAT | O_EXCL; + // We're creating it, so the file is empty + m_offset = 0; + } -} + auto fd = folly::openNoInt(m_path.c_str(), flags); + if (fd < 0) { + throw Error{ + folly::sformat( + "Unable to open {} [{}]", + m_path.native(), folly::errnoStr(errno) + ) + }; + } + m_fd = fd; + SCOPE_FAIL { ::close(m_fd); }; -////////////////////////////////////////////////////////////////////// + // If we're going to write to the file, but not creating it, we + // don't know what the end of the file is, so find the current + // offset. + if (write && !create) syncOffset(); + } + ~FD() { if (m_fd >= 0) ::close(m_fd); } + + FD(const FD&) = delete; + FD(FD&& o) noexcept : m_fd{o.m_fd}, m_path{o.m_path}, m_offset{o.m_offset} + { o.m_fd = -1; } + FD& operator=(const FD&) = delete; + FD& operator=(FD&& o) noexcept { + std::swap(m_fd, o.m_fd); + std::swap(m_path, o.m_path); + std::swap(m_offset, o.m_offset); + return *this; + } -namespace detail { + const fs::path& path() const { return m_path; } -////////////////////////////////////////////////////////////////////// + std::string read(size_t offset, size_t size) const { + assertx(m_fd >= 0); -// Wrappers around the folly functions with error handling + std::string data; + folly::resizeWithoutInitialization(data, size); -std::string readFile(const fs::path& path) { - std::string s; - if (!folly::readFile(path.c_str(), s)) { + auto const read = folly::preadFull(m_fd, data.data(), size, offset); + if (read == size) return data; + if (read < 0) { + throw Error{ + folly::sformat( + "Failed reading {} bytes from {} at {} [{}]", + size, m_path.native(), offset, folly::errnoStr(errno) + ) + }; + } throw Error{ folly::sformat( - "Unable to read input from {} [{}]", - path.c_str(), folly::errnoStr(errno) + "Partial read from {} at {} (expected {}, actual {})", + m_path.native(), offset, size, read ) }; } - return s; + + size_t append(const std::string& data) { + assertx(m_fd >= 0); + + auto const written = folly::writeFull(m_fd, data.data(), data.size()); + if (written < 0) { + throw Error{ + folly::sformat( + "Failed writing {} bytes to {} [{}]", + data.size(), m_path.native(), folly::errnoStr(errno) + ) + }; + } + if (written != data.size()) { + throw Error{ + folly::sformat( + "Partial write to {} (expected {}, actual {})", + m_path.native(), data.size(), written + ) + }; + } + auto const prev = m_offset; + m_offset += written; + return prev; + } + + // Update m_offset to the end of the file. This is only needed if + // you've opened an already created file, or if you know someone + // else has written to it. + void syncOffset() { + assertx(m_fd >= 0); + auto const size = ::lseek(m_fd, 0, SEEK_END); + if (size < 0) { + throw Error{ + folly::sformat( + "Unable to seek to end of {}", + m_path.native() + ) + }; + } + m_offset = size; + } + +private: + int m_fd; + fs::path m_path; + size_t m_offset; +}; + +////////////////////////////////////////////////////////////////////// + +// Read from the FD (which is assumed to be a pipe) and append the +// contents to the given string. Return true if the pipe is now +// closed, or false otherwise. The false case can only happen if the +// FD is non-blocking. +bool readFromPipe(int fd, std::string& s) { + auto realEnd = s.size(); + SCOPE_EXIT { s.resize(realEnd); }; + while (true) { + assertx(realEnd <= s.size()); + auto spaceLeft = s.size() - realEnd; + while (spaceLeft < 1024) { + folly::resizeWithoutInitialization( + s, std::max(4096, s.size() * 2) + ); + spaceLeft = s.size() - realEnd; + } + auto const read = + folly::readNoInt(fd, s.data() + realEnd, spaceLeft); + if (read < 0) { + if (errno == EAGAIN) return false; + throw Error{ + folly::sformat( + "Failed reading from pipe {} [{}]", + fd, + folly::errnoStr(errno) + ) + }; + } else if (read == 0) { + return true; + } + realEnd += read; + } } -void writeFile(const fs::path& path, - const char* ptr, size_t size) { - if (!folly::writeFile(Adaptor{ptr, size}, path.c_str())) { +// Read from the FD (which is assumed to be a pipe) until it is closed +// (returns EOF). Returns all data read. Only use this with a blocking +// FD, as it will spin otherwise. +std::string readFromPipe(int fd) { + std::string out; + while (!readFromPipe(fd, out)) {} + return out; +} + +// Write all of the given data to the FD (which is assumed to be a +// pipe). +void writeToPipe(int fd, const char* data, size_t size) { + auto const written = folly::writeFull(fd, data, size); + if (written < 0) { throw Error{ folly::sformat( - "Unable to write output to {} [{}]", - path.c_str(), folly::errnoStr(errno) + "Failed writing {} bytes to pipe {} [{}]", + size, fd, folly::errnoStr(errno) + ) + }; + } + if (written != size) { + throw Error{ + folly::sformat( + "Partial write to pipe {} (expected {}, actual {})", + fd, size, written ) }; } @@ -128,25 +293,209 @@ void writeFile(const fs::path& path, ////////////////////////////////////////////////////////////////////// -JobBase::JobBase(const std::string& name) - : m_name{name} -{ - FTRACE(4, "registering remote worker command \"{}\"\n", m_name); - auto& r = registry(); - std::lock_guard _{r.lock}; - auto const insert = r.registry.emplace(m_name, this); - always_assert(insert.second); -} +/* + * "Serialized" API: + * + * For this mode a worker expects to be invoked with --local, followed + * by the name of the command, the root of the blob files, and the + * name of the blob file to write results to. + * + * This mode uses blob files written locally, and communicates its + * input/output via serialized RefIds via stdin and pipes. It's meant + * for SubprocessImpl and is more efficient than the "File" mode + * below. + * + * See SubprocessImpl for more description. + */ -////////////////////////////////////////////////////////////////////// +struct SerializedSource : public detail::ISource { + SerializedSource(fs::path root, std::string s) + : m_source{std::move(s)} + , m_decoder{m_source.data(), m_source.size()} + , m_currentInput{0} + , m_numInputs{0} + , m_root{std::move(root)} + {} + ~SerializedSource() = default; + + std::string blob() override { + return refToBlob(decodeRefId(m_decoder)); + } + Optional optBlob() override { + auto r = decodeOptRefId(m_decoder); + if (!r) return std::nullopt; + return refToBlob(*r); + } -} + BlobVec variadic() override { + return from(decodeRefIdVec(m_decoder)) + | map([&] (const RefId& r) { return refToBlob(r); }) + | as(); + } + + void initDone() override { + assertx(m_numInputs == 0); + assertx(m_currentInput == 0); + m_decoder(m_numInputs); + } + bool inputEnd() const override { + return m_currentInput >= m_numInputs; + } + void nextInput() override { + assertx(!inputEnd()); + ++m_currentInput; + } + void finish() override { m_decoder.assertDone(); } + + static RefId decodeRefId(BlobDecoder& d) { + // Optimize RefId representation for space. If m_size is <= the + // inline size, we know that m_extra is zero, and that m_id has + // the same length as m_size, so we do not need to encode it + // twice. + decltype(RefId::m_size) size; + d(size); + if (size <= g_inline_size) { + assertx(d.remaining() >= size); + std::string id{(const char*)d.data(), size}; + d.advance(size); + return RefId{std::move(id), size, 0}; + } else { + decltype(RefId::m_id) id; + decltype(RefId::m_extra) offset; + d(id); + d(offset); + return RefId{std::move(id), size, offset}; + } + } + + static Optional decodeOptRefId(BlobDecoder& d) { + bool present; + d(present); + if (!present) return std::nullopt; + return decodeRefId(d); + } + + static IdVec decodeRefIdVec(BlobDecoder& d) { + std::vector out; + size_t size; + d(size); + out.reserve(size); + std::generate_n( + std::back_inserter(out), + size, + [&] { return decodeRefId(d); } + ); + return out; + } + +private: + std::string refToBlob(const RefId& r) { + if (r.m_size <= g_inline_size) { + assertx(r.m_id.size() == r.m_size); + assertx(!r.m_extra); + return r.m_id; + } + + fs::path path{r.m_id}; + if (path.is_absolute()) { + return FD{path, true, false, false}.read(r.m_extra, r.m_size); + } + + auto it = m_fdCache.find(path.native()); + if (it == m_fdCache.end()) { + auto [elem, emplaced] = m_fdCache.emplace( + path.native(), + FD{m_root / path, true, false, false} + ); + assertx(emplaced); + it = elem; + } + return it->second.read(r.m_extra, r.m_size); + } + + std::string m_source; + BlobDecoder m_decoder; + size_t m_currentInput; + size_t m_numInputs; + + fs::path m_root; + hphp_fast_map m_fdCache; +}; + +struct SerializedSink : public detail::ISink { + explicit SerializedSink(fs::path outputFile) + : m_fd{outputFile, false, true, false} {} + + void blob(const std::string& b) override { + encodeRefId(makeRefId(b), m_encoder); + } + void optBlob(const Optional& b) override { + encodeOptRefId(b ? makeRefId(*b) : Optional{}, m_encoder); + } + void variadic(const BlobVec& v) override { + auto const refs = from(v) + | map([&] (const std::string& b) { return makeRefId(b); }) + | as(); + encodeRefIdVec(refs, m_encoder); + } + void nextOutput() override {} + void startFini() override {} + + void finish() override { + writeToPipe( + g_local_pipe_fd, + (const char*)m_encoder.data(), + m_encoder.size() + ); + ::close(g_local_pipe_fd); + } + + static void encodeRefId(const RefId& r, BlobEncoder& e) { + // Optimized RefId encoding. If it's an inline ref, we can encode + // it more optimally. See above in SerializedSource::decodeRefId. + e(r.m_size); + if (r.m_size <= g_inline_size) { + assertx(r.m_id.size() == r.m_size); + assertx(!r.m_extra); + e.writeRaw(r.m_id.data(), r.m_id.size()); + } else { + e(r.m_id); + e(r.m_extra); + } + } + + static void encodeOptRefId(const Optional& r, BlobEncoder& e) { + if (r) { + e(true); + encodeRefId(*r, e); + } else { + e(false); + } + } + + static void encodeRefIdVec(const IdVec& v, BlobEncoder& e) { + e((size_t)v.size()); + for (auto const& r : v) encodeRefId(r, e); + } + +private: + RefId makeRefId(const std::string& b) { + if (b.size() <= g_inline_size) return RefId{b, b.size(), 0}; + auto const offset = m_fd.append(b); + return RefId{m_fd.path().filename(), b.size(), offset}; + } + + FD m_fd; + BlobEncoder m_encoder; +}; ////////////////////////////////////////////////////////////////////// /* - * A note on conventions: A worker expects to be invoked with the name - * of the command, followed by 3 paths to directories. The directories + * "File" API: + * + * For this mode a worker expects to be invoked with the name of the + * command, followed by 3 paths to directories. The directories * represent the config inputs (for init()), the inputs (for multiple * runs()), and the last is the output directory, which will be * written to. @@ -185,28 +534,257 @@ JobBase::JobBase(const std::string& name) * * NB: Marker types cannot nest, so there aren't any possible deeper * levels than this. - * - * All implementations must follow this layout when setting up - * execution. The actual execution on the worker side is completely - * agnostic to the implementation. */ -int main(int argc, char** argv) { - try { - always_assert(argc > 1); - always_assert(!strcmp(argv[1], s_option)); +struct FileSource : public detail::ISource { + FileSource(fs::path config, fs::path input) + : m_configPath{std::move(config)} + , m_inputPath{std::move(input)} + , m_itemIdx{0} + , m_inputIdx{0} + , m_itemBase{m_configPath} {} + ~FileSource() = default; + + std::string blob() override { + auto const filename = m_itemBase / folly::to(m_itemIdx++); + return detail::readFile(filename); + } + Optional optBlob() override { + auto const filename = m_itemBase / folly::to(m_itemIdx++); + if (!fs::exists(filename)) return std::nullopt; + return detail::readFile(filename); + } + + BlobVec variadic() override { + BlobVec out; + auto const vecBase = m_itemBase / folly::to(m_itemIdx++); + for (size_t i = 0;; ++i) { + auto const valPath = vecBase / folly::to(i); + // A break in the numbering means the end of the vector. + if (!fs::exists(valPath)) break; + out.emplace_back(detail::readFile(valPath)); + } + return out; + } + + void initDone() override { + assertx(m_itemBase == m_configPath); + assertx(m_inputIdx == 0); + m_itemIdx = 0; + m_itemBase = m_inputPath / "0"; + } + + bool inputEnd() const override { + assertx(m_itemBase == m_inputPath / folly::to(m_inputIdx)); + return !fs::exists(m_itemBase); + } + + void nextInput() override { + assertx(m_itemBase == m_inputPath / folly::to(m_inputIdx)); + m_itemIdx = 0; + m_itemBase = m_inputPath / folly::to(++m_inputIdx); + } + + void finish() override {} +private: + fs::path m_configPath; + fs::path m_inputPath; + + size_t m_itemIdx; + size_t m_inputIdx; + fs::path m_itemBase; +}; + +struct FileSink : public detail::ISink { + explicit FileSink(fs::path base) + : m_base{std::move(base)} + , m_itemIdx{0} + , m_outputIdx{0} + { + // We insist on a clean output directory. + if (!fs::create_directory(m_base)) { + throw Error{ + folly::sformat("Output directory {} already exists", m_base.native()) + }; + } + } + + void blob(const std::string& b) override { write(b); } + void optBlob(const Optional& b) override { + if (!b) { + makeDir(); + ++m_itemIdx; + } else { + write(*b); + } + } + void variadic(const BlobVec& v) override { + auto const dir = makeDir(); + auto const vecDir = dir / folly::to(m_itemIdx); + fs::create_directory(vecDir, dir); + for (size_t i = 0; i < v.size(); ++i) { + auto const& b = v[i]; + writeFile(vecDir / folly::to(i), b.data(), b.size()); + } + ++m_itemIdx; + } + void nextOutput() override { + assertx(m_outputIdx.has_value()); + ++*m_outputIdx; + m_itemIdx = 0; + } + void startFini() override { + assertx(m_outputIdx.has_value()); + m_outputIdx.reset(); + m_itemIdx = 0; + } + void finish() override {} +private: + fs::path currentDir() { + if (!m_outputIdx) return m_base / "fini"; + return m_base / folly::to(*m_outputIdx); + } + + fs::path makeDir() { + auto const outputDir = currentDir(); + if (!m_itemIdx) fs::create_directory(outputDir, m_base); + return outputDir; + } + + void write(const std::string& b) { + auto const dir = makeDir(); + writeFile(dir / folly::to(m_itemIdx), b.data(), b.size()); + ++m_itemIdx; + } + + fs::path m_base; + size_t m_itemIdx; + Optional m_outputIdx; +}; + +////////////////////////////////////////////////////////////////////// + +// folly::writeFile expects a container-like input, so this makes a +// ptr/length pair behave like one (without copying). +struct Adaptor { + size_t size() const { return m_size; } + bool empty() const { return !size(); } + const char& operator[](size_t idx) const { return m_ptr[idx]; } + const char* m_ptr; + size_t m_size; +}; + +////////////////////////////////////////////////////////////////////// + +} + +////////////////////////////////////////////////////////////////////// + +namespace detail { + +////////////////////////////////////////////////////////////////////// + +// Wrappers around the folly functions with error handling + +std::string readFile(const fs::path& path) { + std::string s; + if (!folly::readFile(path.c_str(), s)) { + throw Error{ + folly::sformat( + "Unable to read input from {} [{}]", + path.c_str(), folly::errnoStr(errno) + ) + }; + } + return s; +} + +void writeFile(const fs::path& path, + const char* ptr, size_t size) { + if (!folly::writeFile(Adaptor{ptr, size}, path.c_str())) { + throw Error{ + folly::sformat( + "Unable to write output to {} [{}]", + path.c_str(), folly::errnoStr(errno) + ) + }; + } +} + +////////////////////////////////////////////////////////////////////// + +JobBase::JobBase(const std::string& name) + : m_name{name} +{ + FTRACE(4, "registering remote worker command \"{}\"\n", m_name); + auto& r = registry(); + std::lock_guard _{r.lock}; + auto const insert = r.registry.emplace(m_name, this); + always_assert(insert.second); +} + +////////////////////////////////////////////////////////////////////// + +} + +////////////////////////////////////////////////////////////////////// + +namespace { + +// Parse the command-line and create the appropriate Source and Sink +// for input and output. +std::tuple< + std::unique_ptr, + std::unique_ptr, + std::string +> +parseOptions(int argc, char** argv) { + always_assert(argc > 1); + always_assert(!strcmp(argv[1], s_option)); + + if (argc >= 2 && !strcmp(argv[2], g_local_option)) { if (argc != 6) { std::cerr << "Usage: " << argv[0] - << " " << s_option << " " - << " " - << " " - << " " + << " " << s_option + << " " << g_local_option + << " " + << " " + << " " << std::endl; - return EXIT_FAILURE; + return std::make_tuple(nullptr, nullptr, ""); } + std::string name{argv[3]}; + fs::path root{argv[4]}; + fs::path outputFile{argv[5]}; + + FTRACE(2, "extern worker run (local) (\"{}\", {}, {})\n", + name, root.native(), outputFile.native()); + + // Input comes from STDIN + auto source = + time("read-pipe", [] { return readFromPipe(STDIN_FILENO); }); + return std::make_tuple( + std::make_unique( + std::move(root), + std::move(source) + ), + std::make_unique(std::move(outputFile)), + std::move(name) + ); + } else if (argc != 6) { + std::cerr << "Usage: " + << argv[0] + << " " << s_option + << " " + << " " + << " " + << " " + << std::endl; + return std::make_tuple(nullptr, nullptr, ""); + } else { std::string name{argv[2]}; fs::path configPath{argv[3]}; fs::path outputPath{argv[4]}; @@ -216,8 +794,30 @@ int main(int argc, char** argv) { name, configPath.native(), outputPath.native(), inputPath.native()); + return std::make_tuple( + std::make_unique( + std::move(configPath), + std::move(inputPath) + ), + std::make_unique(std::move(outputPath)), + std::move(name) + ); + } +} + +} + +////////////////////////////////////////////////////////////////////// + +int main(int argc, char** argv) { + Timer _{"main"}; + + try { + auto const [source, sink, name] = parseOptions(argc, argv); + if (!source) return EXIT_FAILURE; + // Lookup the registered job for the requested name. - auto const worker = [&] { + auto const worker = [&, name = name] { auto& r = registry(); std::lock_guard _{r.lock}; auto const it = r.registry.find(name); @@ -227,38 +827,26 @@ int main(int argc, char** argv) { return it->second; }(); - // We insist on a clean output directory. - if (!fs::create_directory(outputPath)) { - throw Error{ - folly::sformat("Output directory {} already exists", outputPath.native()) - }; - } - g_in_job = true; SCOPE_EXIT { g_in_job = false; }; // First do any global initialization. - time("init", [&] { worker->init(configPath); }); - time("run-all", [&] { - // Then execute run() for each given set of inputs. - for (size_t i = 0;; ++i) { - auto const thisInput = inputPath / folly::to(i); - if (!fs::exists(thisInput)) break; - - auto const thisOutput = outputPath / folly::to(i); - FTRACE(4, "executing #{} ({} -> {})\n", - i, thisInput.native(), thisOutput.native()); - fs::create_directory(thisOutput, outputPath); - + time("init", [&, &source = source] { worker->init(*source); }); + time("run-all", [&, &source = source, &sink = sink] { + // Then execute run() until we're out of inputs. + size_t run = 0; + while (!source->inputEnd()) { time( - [&]{ return folly::sformat("run {}", i); }, - [&] { worker->run(thisInput, thisOutput); } + [&] { return folly::sformat("run {}", run); }, + [&, &source = source, &sink = sink] { worker->run(*source, *sink); } ); + ++run; } + source->finish(); }); - // Do any cleanup - time("fini", [&] { worker->fini(outputPath); }); - + // Do any cleanup and flush output to its final destination + time("fini", [&, &sink = sink] { worker->fini(*sink); }); + time("flush", [&sink = sink] { sink->finish(); }); return 0; } catch (const std::exception& exn) { std::cerr << "Error: " << exn.what() << std::endl; @@ -278,27 +866,38 @@ namespace { * be "reliable" (IE, never needs a fallback). * * All data is stored under the "root" (which is the working-dir - * specified in Options). The two sub-directories are "blobs" and - * "execs". + * specified in Options). + * + * Data is stored in blob files under the root, whose names are + * assigned numerically. The blob files are pooled. When a thread + * needs to write, it checks out a free file, append the data to it, + * then "returns" it back to the pool. The particular blob file data + * is stored to is therefore arbitrary. Files are not stored at all + * (since they're already on disk). + * + * The RefIds are just the path to the file, either a blob file, or + * some other file. If the data was stored in a blob file, the m_extra + * of the RefId will contain its offset in the blob file. For stored + * files, m_extra will always be zero. * - * Data is stored under blobs/, using files with numerically - * increasing names. Stored files are not stored at all (since they're - * already on disk). The RefIds are just the path to the file (either - * under blobs/ or where-ever the stored file was). + * As an optimization, if the data is less or equal to g_inline_size, + * then its stored "inline" in the RefId. That is, m_id is the blob + * itself. Inline/non-inline blobs can be distinguished by the m_size + * field. This also applies to stored files (if sufficiently small, + * we'll read it and store it inline). * - * Executions are stored under execs/. Each entry is a sub-directory - * with numerically increasing names. Within each sub-directory is the - * directory layout that the worker expects (see comment above int - * main()). The data is not actually copied into the input - * directories. Instead symlinks are created using the path in the - * RefId. Outputs are not copied either, they just "live" in their - * output directory. + * Workers are forked, given their inputs, calculate outputs, then + * write their outputs and exit. Input is given to the worker via + * stdin (as a stream of serialized RefIds). As part of the + * command-line, the worker is given an output file to write its + * output to. It does (creating RefIds in the process), and reports + * the output RefIds via a pipe (g_local_pipe_fd). */ struct SubprocessImpl : public Client::Impl { SubprocessImpl(const Options&, Client&); ~SubprocessImpl() override; - std::string session() const override { return m_root.native(); } + std::string session() const override { return m_fdManager->root().native(); } bool isSubprocess() const override { return true; } bool supportsOptimistic() const override { return false; } @@ -316,60 +915,85 @@ struct SubprocessImpl : public Client::Impl { const folly::Range*) override; private: - fs::path newBlob(); - fs::path newExec(); - static fs::path newRoot(const Options&); + // Manage the pool of blob files + struct FDManager { + explicit FDManager(fs::path); + + const fs::path& root() const { return m_root; } + + // Acquire a FD to the given file to read from. If the path does + // not correspond to a blob file, nullptr is returned. + const FD* acquireForRead(const fs::path&); + // Acquire a FD to a blob file to append to. You don't have a say + // in which file you get. The FD must be returned via release() + // when done. You have exclusive access to this FD. + FD* acquireForAppend(); + // Release a FD acquired from acquireForAppend. + void release(FD&); + private: + std::unique_ptr newFD(); + + fs::path m_root; + folly_concurrent_hash_map_simd> m_fds; + + std::mutex m_lock; + std::stack m_forAppend; + size_t m_nextBlob; + }; - coro::Task doSubprocess(const RequestId&, - const std::string&, - const fs::path&, - const fs::path&, - const fs::path&, - const fs::path&); + // Similar to FDManager, but manages trace files. We have workers + // re-use trace files to avoid generating a huge number of time. + struct TraceFileManager { + fs::path get(); + void put(fs::path); + private: + std::mutex m_lock; + std::stack m_paths; + size_t m_nextId{0}; + }; - Options m_options; + static fs::path newRoot(const Options&); - fs::path m_root; - // SubprocessImpl doesn't need the m_size portion of RefId. So we - // store an unique integer in it to help verify that the RefId came - // from this implementation instance. - size_t m_marker; + coro::Task doSubprocess(const RequestId&, + const std::string&, + std::string, + const fs::path&); - std::atomic m_nextBlob; - std::atomic m_nextExec; + Options m_options; + std::unique_ptr m_fdManager; + TraceFileManager m_traceManager; }; SubprocessImpl::SubprocessImpl(const Options& options, Client& parent) : Impl{"subprocess", parent} , m_options{options} - , m_root{newRoot(m_options)} - // Cheap way of generating semi-unique integer: - , m_marker{std::hash{}(m_root.native())} - , m_nextBlob{0} - , m_nextExec{0} + , m_fdManager{std::make_unique(newRoot(m_options))} { FTRACE(2, "Using subprocess extern-worker impl with root at {}\n", - m_root.native()); - Logger::FInfo("Using subprocess extern-worker at {}", m_root.native()); - - fs::create_directory(m_root / "blobs"); - fs::create_directory(m_root / "execs"); + m_fdManager->root().native()); + Logger::FInfo( + "Using subprocess extern-worker at {}", + m_fdManager->root().native() + ); } SubprocessImpl::~SubprocessImpl() { - // Deleting everything under blobs/ and execs/ can take quite a - // while... + auto const root = m_fdManager->root(); + // Destroy FD manager first, to ensure no open FDs to any files + // while cleaning up. + m_fdManager.reset(); + if (m_options.m_cleanup) { Logger::FInfo( "Cleaning up subprocess extern-worker at {}...", - m_root.native() + root.native() ); auto const before = std::chrono::steady_clock::now(); auto const removed = time( "subprocess cleanup", [&] { std::error_code ec; // Suppress exceptions - return fs::remove_all(m_root, ec); + return fs::remove_all(root, ec); } ); auto const elapsed = std::chrono::duration_cast>( @@ -384,28 +1008,6 @@ SubprocessImpl::~SubprocessImpl() { } } -// Create paths for a new blob or a new execution. I lied a little bit -// above in the description of the directory layout. It's true that -// blobs and execs use numerically increasing names, but I shard them -// into directories of 10000 each, to keep their sizes reasonable. -fs::path SubprocessImpl::newBlob() { - auto const id = m_nextBlob++; - auto const blobRoot = m_root / "blobs"; - auto const mid = blobRoot / folly::sformat("{:05}", id / 10000); - fs::create_directory(mid, blobRoot); - return mid / folly::sformat("{:04}", id % 10000); -} - -fs::path SubprocessImpl::newExec() { - auto const id = m_nextExec++; - auto const execRoot = m_root / "execs"; - auto const mid = execRoot / folly::sformat("{:05}", id / 10000); - fs::create_directory(mid, execRoot); - auto const full = mid / folly::sformat("{:04}", id % 10000); - fs::create_directory(full, mid); - return full; -} - // Ensure we always have an unique root under the working directory. fs::path SubprocessImpl::newRoot(const Options& opts) { auto const base = opts.m_workingDir / "hphp-extern-worker"; @@ -423,13 +1025,21 @@ coro::Task SubprocessImpl::load(const RequestId& requestId, // contents. auto out = from(ids) | mapped([&] (const RefId& id) { - assertx(id.m_size == m_marker); FTRACE(4, "{} reading blob from {}\n", - requestId.tracePrefix(), id.m_id); - auto blob = readFile(id.m_id); - FTRACE(4, "{} blob is {} bytes\n", - requestId.tracePrefix(), blob.size()); - return blob; + requestId.tracePrefix(), id.toString()); + if (id.m_size <= g_inline_size) { + // Inline data requires no file read + assertx(id.m_id.size() == id.m_size); + assertx(!id.m_extra); + return id.m_id; + } + if (auto const fd = m_fdManager->acquireForRead(id.m_id)) { + // The data is in a blob file, so use a cached FD + return fd->read(id.m_extra, id.m_size); + } + // It's some other (non-blob) file. Create an ephemeral FD to + // read it. + return FD{id.m_id, true, false, false}.read(id.m_extra, id.m_size); }) | as(); HPHP_CORO_MOVE_RETURN(out); @@ -439,27 +1049,56 @@ coro::Task SubprocessImpl::store(const RequestId& requestId, PathVec paths, BlobVec blobs, bool) { - // SubprocessImpl never "uploads" files because it uses symlinks. It - // must write blobs to disk, however (which we classify as an - // upload). - stats().blobsUploaded += blobs.size(); - for (auto& b : blobs) stats().blobBytesUploaded += b.size(); - - // Create RefIds from the given paths, then write the blobs to disk, - // then use their paths. + // SubprocessImpl never "uploads" files, but it must write to disk + // (which we classify as an upload). + + FD* fd = nullptr; + SCOPE_EXIT { if (fd) m_fdManager->release(*fd); }; + + // Update stats. Skip blobs which we'll store inline. + for (auto const& b : blobs) { + if (b.size() <= g_inline_size) continue; + ++stats().blobsUploaded; + stats().blobBytesUploaded += b.size(); + if (!fd) fd = m_fdManager->acquireForAppend(); + assertx(fd); + } + auto out = ((from(paths) | mapped([&] (const fs::path& p) { - return RefId{fs::canonical(p).native(), m_marker}; + auto fileSize = fs::file_size(p); + if (fileSize <= g_inline_size) { + FTRACE(4, "{} storing file {} inline\n", + requestId.tracePrefix(), + p.native()); + auto const contents = readFile(p); + // Size of file could theoretically change between + // stat-ing and reading it. + if (contents.size() <= g_inline_size) { + return RefId{contents, contents.size(), 0}; + } + fileSize = contents.size(); + } + // We distinguish blob files from non-blob files by making + // sure non-blob files are always absolute paths (blob files + // are always relative). + return RefId{fs::canonical(p).native(), fileSize, 0}; }) ) + (from(blobs) | mapped([&] (const std::string& b) { - auto const path = newBlob(); + if (b.size() <= g_inline_size) { + FTRACE(4, "{} storing blob inline\n", + requestId.tracePrefix()); + return RefId{b, b.size(), 0}; + } FTRACE(4, "{} writing size {} blob to {}\n", - requestId.tracePrefix(), b.size(), path.native()); - writeFile(path, b.data(), b.size()); - return RefId{fs::canonical(path).native(), m_marker}; + requestId.tracePrefix(), b.size(), fd->path().native()); + auto const offset = fd->append(b); + RefId r{fd->path().filename().native(), b.size(), offset}; + FTRACE(4, "{} written as {}\n", requestId.tracePrefix(), r.toString()); + return r; }) )) | as(); @@ -473,217 +1112,236 @@ SubprocessImpl::exec(const RequestId& requestId, std::vector inputs, const folly::Range& output, const folly::Range* finiOutput) { - auto const execPath = newExec(); - auto const configPath = execPath / "config"; - auto const inputsPath = execPath / "input"; - auto const outputsPath = execPath / "output"; - - FTRACE(4, "{} executing \"{}\" inside {} ({} runs)\n", - requestId.tracePrefix(), command, - execPath.native(), inputs.size()); + FTRACE(4, "{} executing \"{}\" ({} runs)\n", + requestId.tracePrefix(), command, inputs.size()); HPHP_CORO_SAFE_POINT; - // Set up the directory structure that the worker expects: - - auto const symlink = [&] (const RefId& id, - const fs::path& path) { - assertx(id.m_size == m_marker); - fs::create_symlink(id.m_id, path); - FTRACE(4, "{} symlinked {} to {}\n", - requestId.tracePrefix(), id.m_id, path.native()); - }; + // Each set of inputs should always have the same size. + if (debug && !inputs.empty()) { + auto const size = inputs[0].size(); + for (size_t i = 1; i < inputs.size(); ++i) { + always_assert(inputs[i].size() == size); + } + } - auto const prepare = [&] (const RefValVec& params, - const fs::path& parent) { - for (size_t paramIdx = 0; paramIdx < params.size(); ++paramIdx) { - auto const& p = params[paramIdx]; - auto const path = parent / folly::to(paramIdx); + // Encode all the inputs + BlobEncoder encoder; + auto const encodeParams = [&] (const RefValVec& params) { + for (auto const& param : params) { match( - p, - [&] (const RefId& id) { symlink(id, path); }, + param, + [&] (const RefId& id) { + SerializedSink::encodeRefId(id, encoder); + }, [&] (const Optional& id) { - if (!id.has_value()) return; - symlink(*id, path); + SerializedSink::encodeOptRefId(id, encoder); }, [&] (const IdVec& ids) { - fs::create_directory(path, parent); - for (size_t vecIdx = 0; vecIdx < ids.size(); ++vecIdx) { - symlink(ids[vecIdx], path / folly::to(vecIdx)); - } + SerializedSink::encodeRefIdVec(ids, encoder); } ); } }; + encodeParams(config); + encoder((size_t)inputs.size()); + for (auto const& input : inputs) encodeParams(input); - fs::create_directory(configPath, execPath); - prepare(config, configPath); - - // Each set of inputs should always have the same size. - if (debug && !inputs.empty()) { - auto const size = inputs[0].size(); - for (size_t i = 1; i < inputs.size(); ++i) { - always_assert(inputs[i].size() == size); - } - } - - fs::create_directory(inputsPath, execPath); - for (size_t i = 0; i < inputs.size(); ++i) { - auto const path = inputsPath / folly::to(i); - fs::create_directory(path, inputsPath); - prepare(inputs[i], path); - } + // Acquire a FD. We're not actually going to write to it, but the + // worker will. This ensures the worker has exclusive access to the + // file while it's running. + auto fd = m_fdManager->acquireForAppend(); + SCOPE_EXIT { if (fd) m_fdManager->release(*fd); }; // Do the actual fork+exec. - HPHP_CORO_AWAIT(doSubprocess( - requestId, - command, - execPath, - configPath, - inputsPath, - outputsPath - )); - - // Make RefIds corresponding to the outputs. + auto const outputBlob = HPHP_CORO_AWAIT( + doSubprocess( + requestId, + command, + std::string{(const char*)encoder.data(), encoder.size()}, + fd->path() + ) + ); + // The worker (maybe) wrote to the file, so we need to re-sync our + // tracked offset. + fd->syncOffset(); - auto const makeOutput = - [&] (OutputType type, fs::path path) -> RefVal { + // Decode the output + BlobDecoder decoder{outputBlob.data(), outputBlob.size()}; + auto const makeOutput = [&] (OutputType type) -> RefVal { switch (type) { case OutputType::Val: - assertx(fs::exists(path)); - return RefId{std::move(path), m_marker}; + return SerializedSource::decodeRefId(decoder); case OutputType::Opt: - if (!fs::exists(path)) return std::nullopt; - return make_optional(RefId{std::move(path), m_marker}); - case OutputType::Vec: { - assertx(fs::exists(path)); - IdVec vec; - size_t i = 0; - while (true) { - auto valPath = path / folly::to(i); - if (!fs::exists(valPath)) break; - vec.emplace_back(valPath.native(), m_marker); - ++i; - } - return vec; - } + return SerializedSource::decodeOptRefId(decoder); + case OutputType::Vec: + return SerializedSource::decodeRefIdVec(decoder); } always_assert(false); }; - auto const makeOutputs = - [&] (const fs::path& path, - const folly::Range& outputTypes) { + auto const makeOutputs = [&] (const folly::Range& types) { RefValVec vec; - vec.reserve(outputTypes.size()); - for (size_t i = 0; i < outputTypes.size(); ++i) { - vec.emplace_back( - makeOutput( - outputTypes[i], - path / folly::to(i) - ) - ); + vec.reserve(types.size()); + for (auto const type : types) { + vec.emplace_back(makeOutput(type)); } return vec; }; std::vector out; out.reserve(inputs.size() + (finiOutput ? 1 : 0)); - for (size_t i = 0; i < inputs.size(); ++i) { - out.emplace_back( - makeOutputs( - outputsPath / folly::to(i), - output - ) - ); - } - if (finiOutput) { - out.emplace_back( - makeOutputs( - outputsPath / "fini", - *finiOutput - ) - ); - } + std::generate_n( + std::back_inserter(out), + inputs.size(), + [&] { return makeOutputs(output); } + ); + if (finiOutput) out.emplace_back(makeOutputs(*finiOutput)); + + decoder.assertDone(); HPHP_CORO_MOVE_RETURN(out); } -coro::Task SubprocessImpl::doSubprocess(const RequestId& requestId, - const std::string& command, - const fs::path& execPath, - const fs::path& configPath, - const fs::path& inputPath, - const fs::path& outputPath) { +coro::Task +SubprocessImpl::doSubprocess(const RequestId& requestId, + const std::string& command, + std::string inputBlob, + const fs::path& outputPath) { std::vector args{ current_executable_path(), s_option, + g_local_option, command, - configPath, - outputPath, - inputPath + m_fdManager->root().native(), + outputPath.native() }; // Propagate the TRACE option in the environment. We'll copy the // trace output into this process' trace output. std::vector env; Optional traceFile; + SCOPE_EXIT { if (traceFile) m_traceManager.put(std::move(*traceFile)); }; if (auto const trace = getenv("TRACE")) { - traceFile = execPath / "trace.log"; + traceFile = m_traceManager.get(); + auto const fullPath = m_fdManager->root() / *traceFile; env.emplace_back(folly::sformat("TRACE={}", trace)); - env.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", traceFile->c_str())); + env.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", fullPath.c_str())); } - FTRACE(4, "{} executing subprocess\n", - requestId.tracePrefix()); + FTRACE( + 4, "{} executing subprocess for '{}'{}(input size: {})\n", + requestId.tracePrefix(), + command, + traceFile + ? folly::sformat(" (trace-file: {}) ", traceFile->native()) + : "", + inputBlob.size() + ); HPHP_CORO_SAFE_POINT; - auto const DEBUG_ONLY before = std::chrono::steady_clock::now(); + auto const before = std::chrono::steady_clock::now(); // Do the actual fork+exec. folly::Subprocess subprocess{ args, folly::Subprocess::Options{} - .stdinFd(folly::Subprocess::DEV_NULL) + .parentDeathSignal(SIGKILL) + .pipeStdin() .pipeStdout() .pipeStderr() + .fd(g_local_pipe_fd, folly::Subprocess::PIPE_OUT) .closeOtherFds(), nullptr, &env }; - auto const [_, stderr] = subprocess.communicate(); + std::string output; + std::string stderr; + size_t inputWritten = 0; + + // Communicate with the worker. When available, read from worker's + // stderr and output pipe and store the data. Throw everything else + // away. Attempt to write inputs to the worker's stdin whenever it + // has free space. + subprocess.communicate( + [&] (int parentFd, int childFd) { // Read + if (childFd == g_local_pipe_fd) { + return readFromPipe(parentFd, output); + } else if (childFd == STDERR_FILENO) { + return readFromPipe(parentFd, stderr); + } else { + // Worker is writing to some other random FD (including + // stdout). Ignore it. + char dummy[512]; + folly::readNoInt(parentFd, dummy, sizeof dummy); + } + return false; + }, + [&] (int parentFd, int childFd) { // Write + // Close any writable FD except stdin + if (childFd != STDIN_FILENO) return true; + // We've written all of the input, so close stdin. This will + // signal to the worker that input is all sent. + if (inputWritten >= inputBlob.size()) return true; + + // Otherwise write what we can + auto const toWrite = inputBlob.size() - inputWritten; + auto const written = + folly::writeNoInt(parentFd, inputBlob.data() + inputWritten, toWrite); + if (written < 0) { + if (errno == EAGAIN) return false; + throw Error{ + folly::sformat( + "Failed writing {} bytes to subprocess input for '{}' [{}]", + toWrite, command, folly::errnoStr(errno) + ) + }; + } else if (written == 0) { + return true; + } + inputWritten += written; + return false; + } + ); + auto const returnCode = subprocess.wait(); + auto const elapsed = std::chrono::steady_clock::now() - before; - HPHP_CORO_SAFE_POINT; + stats().execCpuUsec += + std::chrono::duration_cast(elapsed).count(); + ++stats().execAllocatedCores; FTRACE( 4, - "{} subprocess finished (took {}). Return code: {}, Stderr: {}\n", + "{} subprocess finished (took {}). " + "Output size: {}, Return code: {}, Stderr: {}\n", requestId.tracePrefix(), prettyPrint( std::chrono::duration_cast>( - std::chrono::steady_clock::now() - before + elapsed ).count(), folly::PRETTY_TIME, false ), + output.size(), returnCode.str(), stderr ); + HPHP_CORO_SAFE_POINT; + // Do this before checking the return code. If the process failed, // we want to capture anything it logged before throwing. - if (traceFile && fs::exists(*traceFile)) { - auto const contents = readFile(*traceFile); + if (traceFile && fs::exists(m_fdManager->root() / *traceFile)) { + auto const contents = readFile(m_fdManager->root() / *traceFile); if (!contents.empty()) { Trace::ftraceRelease( - "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\" {}) vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n" + "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\") vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n" "{}" "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n", requestId.toString(), command, - execPath.native(), contents ); } @@ -699,8 +1357,89 @@ coro::Task SubprocessImpl::doSubprocess(const RequestId& requestId, ) }; } + if (inputWritten != inputBlob.size()) { + throw WorkerError{ + folly::sformat( + "Execution of `{}` failed: Process failed to consume input\n", + command + ) + }; + } + + HPHP_CORO_MOVE_RETURN(output); +} + +////////////////////////////////////////////////////////////////////// + +SubprocessImpl::FDManager::FDManager(fs::path root) + : m_root{std::move(root)} + , m_nextBlob{0} +{} + +const FD* SubprocessImpl::FDManager::acquireForRead(const fs::path& path) { + // Blob files are always relative + if (path.is_absolute()) return nullptr; + // Only already created blob files should be in m_fds. So, if it's + // not present, it can't be a blob file. + auto const it = m_fds.find(path.native()); + if (it == m_fds.end()) return nullptr; + return it->second.get(); +} + +FD* SubprocessImpl::FDManager::acquireForAppend() { + // Use the blob file at the top of m_forAppend. + std::scoped_lock _{m_lock}; + if (m_forAppend.empty()) { + // If empty, there's no free blob file. We need to create a new + // one. + auto fd = newFD(); + auto const path = fd->path(); + auto const [elem, emplaced] = + m_fds.emplace(path.filename().native(), std::move(fd)); + assertx(emplaced); + m_forAppend.push(elem->second.get()); + } + auto fd = m_forAppend.top(); + m_forAppend.pop(); + return fd; +} + +void SubprocessImpl::FDManager::release(FD& fd) { + if (debug) { + // Should be returning something cached + auto const it = m_fds.find(fd.path().filename().native()); + always_assert(it != m_fds.end()); + always_assert(it->second.get() == &fd); + } + std::scoped_lock _{m_lock}; + m_forAppend.push(&fd); +} + +std::unique_ptr SubprocessImpl::FDManager::newFD() { + // We deliberately keep the blob filename as small as possible + // because they get serialized a lot and it keeps the input/output + // sizes small. + auto const id = m_nextBlob++; + auto const filename = m_root / folly::to(id); + return std::make_unique(filename, true, true, true); +} + +////////////////////////////////////////////////////////////////////// + +fs::path SubprocessImpl::TraceFileManager::get() { + std::scoped_lock _{m_lock}; + if (m_paths.empty()) { + auto const id = m_nextId++; + m_paths.push(folly::sformat("trace-{:04}.log", id)); + } + auto path = std::move(m_paths.top()); + m_paths.pop(); + return path; +} - HPHP_CORO_RETURN_VOID; +void SubprocessImpl::TraceFileManager::put(fs::path path) { + std::scoped_lock _{m_lock}; + m_paths.push(std::move(path)); } ////////////////////////////////////////////////////////////////////// diff --git a/hphp/util/extern-worker.h b/hphp/util/extern-worker.h index 076e57178a835..9a4a14b093845 100644 --- a/hphp/util/extern-worker.h +++ b/hphp/util/extern-worker.h @@ -172,10 +172,9 @@ struct Job : public detail::JobBase { using ExecT = typename detail::ExecRet::type; private: - void init(const std::filesystem::path&) const override; - void fini(const std::filesystem::path&) const override; - void run(const std::filesystem::path&, - const std::filesystem::path&) const override; + void init(detail::ISource&) const override; + void fini(detail::ISink&) const override; + void run(detail::ISource&, detail::ISink&) const override; }; ////////////////////////////////////////////////////////////////////// @@ -215,16 +214,16 @@ struct Multi { // Identifier for a Ref. Used by the implementation to track them. The // meaning of the identifier is private to the implementation. struct RefId { - RefId(std::string, size_t); + RefId(std::string, size_t, size_t extra = 0); std::string toString() const; bool operator==(const RefId&) const; bool operator!=(const RefId&) const; bool operator<(const RefId&) const; - // Despite their names, these fields can be used for anything. std::string m_id; - size_t m_size; + size_t m_size; // Size of data + size_t m_extra; // For internal usage }; // Represents a piece of data "inside" the extern-worker