From d19877745022409b41fb9816407a39229afb1c25 Mon Sep 17 00:00:00 2001 From: "Yang, Bo" Date: Wed, 5 Oct 2022 13:05:32 -0700 Subject: [PATCH 1/2] Run tests in repo mode --- .github/workflows/nix.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/nix.yml b/.github/workflows/nix.yml index 71b01b8b75bdd..a6338ff67844c 100644 --- a/.github/workflows/nix.yml +++ b/.github/workflows/nix.yml @@ -105,6 +105,9 @@ jobs: - hhvm_clang os: - ${{ github.event_name == 'pull_request' && '16-core' || 'ubuntu-latest' }} + flags: + - '' + - --repo runs-on: ${{matrix.os}} steps: - uses: actions/checkout@v3 @@ -143,7 +146,7 @@ jobs: - run: sudo apt-get install ./${{matrix.package}}.deb - run: echo "HHVM_BIN=$(command -v hhvm)" >> "$GITHUB_ENV" - run: | - "$HHVM_BIN" hphp/test/run.php -x hphp/test/github_excluded_tests all + "$HHVM_BIN" hphp/test/run.php ${{matrix.flags}} -x hphp/test/github_excluded_tests all upload-deb: if: github.event_name == 'push' && github.ref_type == 'tag' runs-on: ubuntu-latest From 76eef32ad78ef05bb9ad5b4e91e7def59e077662 Mon Sep 17 00:00:00 2001 From: "Yang, Bo" Date: Wed, 5 Oct 2022 20:35:41 +0000 Subject: [PATCH 2/2] Revert "Rework SubprocessImpl for extern-worker" This reverts commit 280e6899b442796a197b5b236d3ddd50d0b62b5f. --- hphp/hhbbc/index.cpp | 2 - hphp/util/blob-encoder.h | 2 +- hphp/util/extern-worker-detail.h | 41 +- hphp/util/extern-worker-inl.h | 174 ++-- hphp/util/extern-worker.cpp | 1317 +++++++----------------------- hphp/util/extern-worker.h | 13 +- 6 files changed, 379 insertions(+), 1170 deletions(-) diff --git a/hphp/hhbbc/index.cpp b/hphp/hhbbc/index.cpp index 98a7d0a9378b8..689dc295abd35 100644 --- a/hphp/hhbbc/index.cpp +++ b/hphp/hhbbc/index.cpp @@ -5650,8 +5650,6 @@ materialize_inputs(const IndexData& index, auto program = std::make_unique(); auto const loadAndParse = [&] (Chunk chunk) -> coro::Task { - HPHP_CORO_RESCHEDULE_ON_CURRENT_EXECUTOR; - auto [classes, funcs, units] = HPHP_CORO_AWAIT(coro::collect( client->load(std::move(chunk.classes)), client->load(std::move(chunk.funcs)), diff --git a/hphp/util/blob-encoder.h b/hphp/util/blob-encoder.h index 8e558d6d3afd8..7aa669c05907d 100644 --- a/hphp/util/blob-encoder.h +++ b/hphp/util/blob-encoder.h @@ -127,7 +127,7 @@ struct BlobEncoder { void writeRaw(const char* ptr, size_t size) { auto const start = m_blob.size(); m_blob.resize(start + size); - std::copy(ptr, ptr + size, m_blob.data() + start); + std::copy(ptr, ptr + size, &m_blob[start]); } /* diff --git a/hphp/util/extern-worker-detail.h b/hphp/util/extern-worker-detail.h index 3944ecbb0b283..f6db64710e3b1 100644 --- a/hphp/util/extern-worker-detail.h +++ b/hphp/util/extern-worker-detail.h @@ -315,32 +315,6 @@ auto typesToValues(F&& f) { ////////////////////////////////////////////////////////////////////// -// Abstracts away how a worker should obtain it's inputs, and write -// it's outputs. These functions are tightly coupled with the logic in -// JobBase::serialize and JobBase::deserialize. -struct ISource { - virtual ~ISource() = default; - virtual std::string blob() = 0; - virtual Optional optBlob() = 0; - virtual std::vector variadic() = 0; - virtual void initDone() = 0; - virtual bool inputEnd() const = 0; - virtual void nextInput() = 0; - virtual void finish() = 0; -}; - -struct ISink { - virtual ~ISink() = default; - virtual void blob(const std::string&) = 0; - virtual void optBlob(const Optional&) = 0; - virtual void variadic(const std::vector&) = 0; - virtual void nextOutput() = 0; - virtual void startFini() = 0; - virtual void finish() = 0; -}; - -////////////////////////////////////////////////////////////////////// - // Base class for Jobs. This provide a consistent interface to invoke // through. struct JobBase { @@ -349,16 +323,15 @@ struct JobBase { explicit JobBase(const std::string& name); virtual ~JobBase() = default; - template static T deserialize(ISource&); - template static void serialize(const T&, ISink&); + template static T deserialize(const std::filesystem::path&); + template static void serialize(const T&, + size_t, + const std::filesystem::path&); private: - template static T deserializeBlob(std::string); - template static std::string serializeBlob(const T&); - - virtual void init(ISource&) const = 0; - virtual void fini(ISink&) const = 0; - virtual void run(ISource&, ISink&) const = 0; + virtual void init(const std::filesystem::path&) const = 0; + virtual void fini(const std::filesystem::path&) const = 0; + virtual void run(const std::filesystem::path&, const std::filesystem::path&) const = 0; std::string m_name; diff --git a/hphp/util/extern-worker-inl.h b/hphp/util/extern-worker-inl.h index 13427b57aafbf..d23eca80e5b18 100644 --- a/hphp/util/extern-worker-inl.h +++ b/hphp/util/extern-worker-inl.h @@ -43,63 +43,63 @@ Job::Job() : detail::JobBase{C::name()} {} // expects, then invoke the function with those types. template -void Job::init(detail::ISource& source) const { +void Job::init(const std::filesystem::path& root) const { using namespace detail; using Args = typename Params::type; - // For each expected input, load it, and deserialize it into + // For each expected input, load the file, and deserialize it into // the appropriate type. Return the types as a tuple, which can then // std::apply to C::init, passing the inputs. - size_t DEBUG_ONLY nextIdx = 0; std::apply( C::init, typesToValues( [&] (size_t idx, auto tag) { - assertx(idx == nextIdx++); - return deserialize(source); + return deserialize( + root / folly::to(idx) + ); } ) ); - source.initDone(); using Ret = typename Return::type; static_assert(std::is_void_v, "init() must return void"); } template -void Job::fini(detail::ISink& sink) const { +void Job::fini(const std::filesystem::path& outputRoot) const { using namespace detail; - sink.startFini(); using Ret = typename Return::type; if constexpr (std::is_void_v) { C::fini(); } else { + auto const output = outputRoot / "fini"; + std::filesystem::create_directory(output, outputRoot); auto const v = C::fini(); - time("writing fini outputs", [&] { return serialize(v, sink); }); + time("writing fini outputs", [&] { return serialize(v, 0, output); }); } } template -void Job::run(detail::ISource& source, detail::ISink& sink) const { +void Job::run(const std::filesystem::path& inputRoot, + const std::filesystem::path& outputRoot) const { using namespace detail; - // For each expected input, load it, and deserialize it into the - // appropriate type, turning all of the types into a tuple. + // For each expected input, load the file, and deserialize it into + // the appropriate type, turning all of the types into a tuple. using Args = typename Params::type; - size_t DEBUG_ONLY nextIdx = 0; auto inputs = time( "loading inputs", [&] { return typesToValues( [&] (size_t idx, auto tag) { - assertx(idx == nextIdx++); - return deserialize(source); + return deserialize( + inputRoot / folly::to(idx) + ); } ); } ); - source.nextInput(); // Apply the tuple to C::run, passing the types as parameters. auto outputs = time( @@ -110,9 +110,8 @@ void Job::run(detail::ISource& source, detail::ISink& sink) const { using Ret = typename Return::type; static_assert(!std::is_void_v, "run() must return something"); - // Serialize the outputs - time("writing outputs", [&] { return serialize(outputs, sink); }); - sink.nextOutput(); + // Serialize the outputs into the output directory. + time("writing outputs", [&] { return serialize(outputs, 0, outputRoot); }); } ////////////////////////////////////////////////////////////////////// @@ -121,99 +120,81 @@ namespace detail { ////////////////////////////////////////////////////////////////////// -// Turn a blob into a specific (non-marker) type +// Given a file path, load the contents of the file, deserialize them +// into the type T, and return it. template -T JobBase::deserializeBlob(std::string blob) { +T JobBase::deserialize(const std::filesystem::path& path) { using namespace detail; - static_assert(!IsMarker::value, "Special markers cannot be nested"); if constexpr (std::is_same::value) { // A std::string is always stored as itself (this lets us store - // files directly as their contents without having to encode - // them). - return blob; - } else { - // For most types, the data is encoded using BlobEncoder, so undo - // that. - BlobDecoder decoder{blob.data(), blob.size()}; - return decoder.makeWhole(); - } -} - -// Deserialize the given input source into the type T and return -// it. The type might include markers, which might trigger -// sub-deserializations. -template -T JobBase::deserialize(ISource& source) { - using namespace detail; - static_assert(!IsMulti::value, "Multi can only be used as return type"); - - if constexpr (IsVariadic::value) { + // files as their contents without having to encode them). + return readFile(path); + } else if constexpr (IsVariadic::value) { static_assert(!IsMarker::value, "Special markers cannot be nested"); - auto const blobs = source.variadic(); + // Variadic is actually a directory, not a file. Recurse into + // it, and do the deserialization for every file within it. T out; - out.vals.reserve(blobs.size()); - for (auto const& blob : blobs) { - out.vals.emplace_back(deserializeBlob(blob)); + for (size_t i = 0;; ++i) { + auto const valPath = path / folly::to(i); + // A break in the numbering means the end of the vector. + if (!std::filesystem::exists(valPath)) break; + out.vals.emplace_back(deserialize(valPath)); } return out; } else if constexpr (IsOpt::value) { static_assert(!IsMarker::value, "Special markers cannot be nested"); - // Opt is like T, except the data may not exist (so is nullopt + // Opt is like T, except the file may not exist (so is nullopt // otherwise). T out; - if (auto const blob = source.optBlob()) { - out.val.emplace(deserializeBlob(*blob)); + if (std::filesystem::exists(path)) { + out.val.emplace(deserialize(path)); } return out; } else { - return deserializeBlob(source.blob()); + // For most types, the data is encoded using BlobEncoder, so undo + // that. + static_assert(!IsMulti::value, "Multi can only be used as return type"); + auto const data = readFile(path); + BlobDecoder decoder{data.data(), data.size()}; + return decoder.makeWhole(); } } -// Serialize the given (non-marker) value into a blob +// Given a value, an index of that value (its positive in the output +// values), and an output root, serialize the value, and write its +// contents to the appropriate file. template -std::string JobBase::serializeBlob(const T& v) { +void JobBase::serialize(const T& v, + size_t idx, + const std::filesystem::path& root) { using namespace detail; - static_assert(!IsMarker::value, - "Special markers cannot be nested"); if constexpr (std::is_same::value) { - // std::string always encodes to itself - return v; - } else { - BlobEncoder encoder; - encoder(v); - return std::string{(const char*)encoder.data(), encoder.size()}; - } -} - -// Serialize the given value into a blob and write it out to the given -// output sink. The value's type might be a marker, which can trigger -// sub-serializations. -template -void JobBase::serialize(const T& v, ISink& sink) { - using namespace detail; - if constexpr (IsVariadic::value) { + // std::string isn't serialized, but written as itself as + // root/idx. + return writeFile(root / folly::to(idx), v.data(), v.size()); + } else if constexpr (IsVariadic::value) { + // For Variadic, we create a directory root/idx, and under it, + // write a file for every element in the vector. static_assert(!IsMarker::value, "Special markers cannot be nested"); - using namespace folly::gen; - auto const blobs = from(v.vals) - | map([&] (const typename T::Type& t) { return serializeBlob(t); }) - | as(); - sink.variadic(blobs); + auto const path = root / folly::to(idx); + std::filesystem::create_directory(path, root); + for (size_t i = 0; i < v.vals.size(); ++i) { + serialize(v.vals[i], i, path); + } } else if constexpr (IsOpt::value) { // Opt is like T, except nothing is written if the value isn't // present. static_assert(!IsMarker::value, "Special markers cannot be nested"); - sink.optBlob( - v.val.has_value() ? serializeBlob(*v.val) : Optional{} - ); + if (!v.val.has_value()) return; + serialize(*v.val, idx, root); } else if constexpr (IsMulti::value) { // Treat Multi as equivalent to std::tuple (IE, write each element - // separately). - size_t DEBUG_ONLY nextTupleIdx = 0; + // to a separate file). + assertx(idx == 0); for_each( v.vals, [&] (auto const& elem, size_t tupleIdx) { @@ -223,12 +204,18 @@ void JobBase::serialize(const T& v, ISink& sink) { >::value, "Multi cannot be nested" ); - assertx(tupleIdx == nextTupleIdx++); - serialize(elem, sink); + serialize(elem, tupleIdx, root); } ); } else { - sink.blob(serializeBlob(v)); + // Most types are just encoded with BlobEncoder and written as + // root/idx + BlobEncoder encoder; + encoder(v); + writeFile( + root / folly::to(idx), + (const char*)encoder.data(), encoder.size() + ); } } @@ -238,14 +225,12 @@ void JobBase::serialize(const T& v, ISink& sink) { ////////////////////////////////////////////////////////////////////// -inline RefId::RefId(std::string id, size_t size, size_t extra) - : m_id{std::move(id)}, m_size{size}, m_extra{extra} +inline RefId::RefId(std::string id, size_t size) + : m_id{std::move(id)}, m_size{size} {} inline bool RefId::operator==(const RefId& o) const { - return - std::tie(m_id, m_extra, m_size) == - std::tie(o.m_id, o.m_extra, o.m_size); + return std::tie(m_id, m_size) == std::tie(o.m_id, o.m_size); } inline bool RefId::operator!=(const RefId& o) const { @@ -253,20 +238,11 @@ inline bool RefId::operator!=(const RefId& o) const { } inline bool RefId::operator<(const RefId& o) const { - return - std::tie(m_id, m_extra, m_size) < - std::tie(o.m_id, o.m_extra, o.m_size); + return std::tie(m_size, m_id) < std::tie(o.m_size, o.m_id); } inline std::string RefId::toString() const { - // Don't print out the extra field if it's zero, to avoid clutter - // for implementations which don't use it. The id might contain - // binary data, so escape it before printing. - if (m_extra) { - return folly::sformat("{}:{}:{}", folly::humanify(m_id), m_extra, m_size); - } else { - return folly::sformat("{}:{}", folly::humanify(m_id), m_size); - } + return folly::sformat("{}:{}", m_id, m_size); } ////////////////////////////////////////////////////////////////////// diff --git a/hphp/util/extern-worker.cpp b/hphp/util/extern-worker.cpp index 1db0a77a3efcd..d97d3cb70f7da 100644 --- a/hphp/util/extern-worker.cpp +++ b/hphp/util/extern-worker.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include @@ -68,22 +67,6 @@ TRACE_SET_MOD(extern_worker); ////////////////////////////////////////////////////////////////////// -// If passed via the command-line, the worker is running in "local" -// mode and will use a different mechanism to read inputs/write -// outputs. -const char* const g_local_option = "--local"; - -// If running in local mode, the FD of the pipe used to communicate -// output back to the parent. -constexpr int g_local_pipe_fd = 3; - -// For the subprocess impl, if the size of the data is <= this -// constant, we'll store it "inline" in the ref itself, rather than -// writing it to disk. This values probably needs more tuning. -constexpr size_t g_inline_size = 64; - -////////////////////////////////////////////////////////////////////// - struct Registry { hphp_fast_string_map registry; std::mutex lock; @@ -96,196 +79,48 @@ Registry& registry() { ////////////////////////////////////////////////////////////////////// -// Represents an open file. Used this over readFile/writeFile if you -// want to maintain a persistent FD, or want to just read/write just a -// portion of the file. Only reading or appending to the file is -// supported. -struct FD { - FD(const fs::path& path, bool read, bool write, bool create) - : m_path{path}, m_offset{0} { - assertx(IMPLIES(create, write)); - - auto flags = O_CLOEXEC; - if (read) { - flags |= (write ? O_RDWR : O_RDONLY); - } else { - assertx(write); - flags |= O_WRONLY; - } +// folly::writeFile expects a container-like input, so this makes a +// ptr/length pair behave like one (without copying). +struct Adaptor { + size_t size() const { return m_size; } + bool empty() const { return !size(); } + const char& operator[](size_t idx) const { return m_ptr[idx]; } + const char* m_ptr; + size_t m_size; +}; - if (write) flags |= O_APPEND; - if (create) { - flags |= O_CREAT | O_EXCL; - // We're creating it, so the file is empty - m_offset = 0; - } +////////////////////////////////////////////////////////////////////// - auto fd = folly::openNoInt(m_path.c_str(), flags); - if (fd < 0) { - throw Error{ - folly::sformat( - "Unable to open {} [{}]", - m_path.native(), folly::errnoStr(errno) - ) - }; - } - m_fd = fd; - SCOPE_FAIL { ::close(m_fd); }; +} - // If we're going to write to the file, but not creating it, we - // don't know what the end of the file is, so find the current - // offset. - if (write && !create) syncOffset(); - } - ~FD() { if (m_fd >= 0) ::close(m_fd); } - - FD(const FD&) = delete; - FD(FD&& o) noexcept : m_fd{o.m_fd}, m_path{o.m_path}, m_offset{o.m_offset} - { o.m_fd = -1; } - FD& operator=(const FD&) = delete; - FD& operator=(FD&& o) noexcept { - std::swap(m_fd, o.m_fd); - std::swap(m_path, o.m_path); - std::swap(m_offset, o.m_offset); - return *this; - } +////////////////////////////////////////////////////////////////////// - const fs::path& path() const { return m_path; } +namespace detail { - std::string read(size_t offset, size_t size) const { - assertx(m_fd >= 0); +////////////////////////////////////////////////////////////////////// - std::string data; - folly::resizeWithoutInitialization(data, size); +// Wrappers around the folly functions with error handling - auto const read = folly::preadFull(m_fd, data.data(), size, offset); - if (read == size) return data; - if (read < 0) { - throw Error{ - folly::sformat( - "Failed reading {} bytes from {} at {} [{}]", - size, m_path.native(), offset, folly::errnoStr(errno) - ) - }; - } +std::string readFile(const fs::path& path) { + std::string s; + if (!folly::readFile(path.c_str(), s)) { throw Error{ folly::sformat( - "Partial read from {} at {} (expected {}, actual {})", - m_path.native(), offset, size, read + "Unable to read input from {} [{}]", + path.c_str(), folly::errnoStr(errno) ) }; } - - size_t append(const std::string& data) { - assertx(m_fd >= 0); - - auto const written = folly::writeFull(m_fd, data.data(), data.size()); - if (written < 0) { - throw Error{ - folly::sformat( - "Failed writing {} bytes to {} [{}]", - data.size(), m_path.native(), folly::errnoStr(errno) - ) - }; - } - if (written != data.size()) { - throw Error{ - folly::sformat( - "Partial write to {} (expected {}, actual {})", - m_path.native(), data.size(), written - ) - }; - } - auto const prev = m_offset; - m_offset += written; - return prev; - } - - // Update m_offset to the end of the file. This is only needed if - // you've opened an already created file, or if you know someone - // else has written to it. - void syncOffset() { - assertx(m_fd >= 0); - auto const size = ::lseek(m_fd, 0, SEEK_END); - if (size < 0) { - throw Error{ - folly::sformat( - "Unable to seek to end of {}", - m_path.native() - ) - }; - } - m_offset = size; - } - -private: - int m_fd; - fs::path m_path; - size_t m_offset; -}; - -////////////////////////////////////////////////////////////////////// - -// Read from the FD (which is assumed to be a pipe) and append the -// contents to the given string. Return true if the pipe is now -// closed, or false otherwise. The false case can only happen if the -// FD is non-blocking. -bool readFromPipe(int fd, std::string& s) { - auto realEnd = s.size(); - SCOPE_EXIT { s.resize(realEnd); }; - while (true) { - assertx(realEnd <= s.size()); - auto spaceLeft = s.size() - realEnd; - while (spaceLeft < 1024) { - folly::resizeWithoutInitialization( - s, std::max(4096, s.size() * 2) - ); - spaceLeft = s.size() - realEnd; - } - auto const read = - folly::readNoInt(fd, s.data() + realEnd, spaceLeft); - if (read < 0) { - if (errno == EAGAIN) return false; - throw Error{ - folly::sformat( - "Failed reading from pipe {} [{}]", - fd, - folly::errnoStr(errno) - ) - }; - } else if (read == 0) { - return true; - } - realEnd += read; - } -} - -// Read from the FD (which is assumed to be a pipe) until it is closed -// (returns EOF). Returns all data read. Only use this with a blocking -// FD, as it will spin otherwise. -std::string readFromPipe(int fd) { - std::string out; - while (!readFromPipe(fd, out)) {} - return out; + return s; } -// Write all of the given data to the FD (which is assumed to be a -// pipe). -void writeToPipe(int fd, const char* data, size_t size) { - auto const written = folly::writeFull(fd, data, size); - if (written < 0) { - throw Error{ - folly::sformat( - "Failed writing {} bytes to pipe {} [{}]", - size, fd, folly::errnoStr(errno) - ) - }; - } - if (written != size) { +void writeFile(const fs::path& path, + const char* ptr, size_t size) { + if (!folly::writeFile(Adaptor{ptr, size}, path.c_str())) { throw Error{ folly::sformat( - "Partial write to pipe {} (expected {}, actual {})", - fd, size, written + "Unable to write output to {} [{}]", + path.c_str(), folly::errnoStr(errno) ) }; } @@ -293,209 +128,25 @@ void writeToPipe(int fd, const char* data, size_t size) { ////////////////////////////////////////////////////////////////////// -/* - * "Serialized" API: - * - * For this mode a worker expects to be invoked with --local, followed - * by the name of the command, the root of the blob files, and the - * name of the blob file to write results to. - * - * This mode uses blob files written locally, and communicates its - * input/output via serialized RefIds via stdin and pipes. It's meant - * for SubprocessImpl and is more efficient than the "File" mode - * below. - * - * See SubprocessImpl for more description. - */ - -struct SerializedSource : public detail::ISource { - SerializedSource(fs::path root, std::string s) - : m_source{std::move(s)} - , m_decoder{m_source.data(), m_source.size()} - , m_currentInput{0} - , m_numInputs{0} - , m_root{std::move(root)} - {} - ~SerializedSource() = default; - - std::string blob() override { - return refToBlob(decodeRefId(m_decoder)); - } - Optional optBlob() override { - auto r = decodeOptRefId(m_decoder); - if (!r) return std::nullopt; - return refToBlob(*r); - } - - BlobVec variadic() override { - return from(decodeRefIdVec(m_decoder)) - | map([&] (const RefId& r) { return refToBlob(r); }) - | as(); - } - - void initDone() override { - assertx(m_numInputs == 0); - assertx(m_currentInput == 0); - m_decoder(m_numInputs); - } - bool inputEnd() const override { - return m_currentInput >= m_numInputs; - } - void nextInput() override { - assertx(!inputEnd()); - ++m_currentInput; - } - void finish() override { m_decoder.assertDone(); } - - static RefId decodeRefId(BlobDecoder& d) { - // Optimize RefId representation for space. If m_size is <= the - // inline size, we know that m_extra is zero, and that m_id has - // the same length as m_size, so we do not need to encode it - // twice. - decltype(RefId::m_size) size; - d(size); - if (size <= g_inline_size) { - assertx(d.remaining() >= size); - std::string id{(const char*)d.data(), size}; - d.advance(size); - return RefId{std::move(id), size, 0}; - } else { - decltype(RefId::m_id) id; - decltype(RefId::m_extra) offset; - d(id); - d(offset); - return RefId{std::move(id), size, offset}; - } - } - - static Optional decodeOptRefId(BlobDecoder& d) { - bool present; - d(present); - if (!present) return std::nullopt; - return decodeRefId(d); - } - - static IdVec decodeRefIdVec(BlobDecoder& d) { - std::vector out; - size_t size; - d(size); - out.reserve(size); - std::generate_n( - std::back_inserter(out), - size, - [&] { return decodeRefId(d); } - ); - return out; - } - -private: - std::string refToBlob(const RefId& r) { - if (r.m_size <= g_inline_size) { - assertx(r.m_id.size() == r.m_size); - assertx(!r.m_extra); - return r.m_id; - } - - fs::path path{r.m_id}; - if (path.is_absolute()) { - return FD{path, true, false, false}.read(r.m_extra, r.m_size); - } - - auto it = m_fdCache.find(path.native()); - if (it == m_fdCache.end()) { - auto [elem, emplaced] = m_fdCache.emplace( - path.native(), - FD{m_root / path, true, false, false} - ); - assertx(emplaced); - it = elem; - } - return it->second.read(r.m_extra, r.m_size); - } - - std::string m_source; - BlobDecoder m_decoder; - size_t m_currentInput; - size_t m_numInputs; - - fs::path m_root; - hphp_fast_map m_fdCache; -}; - -struct SerializedSink : public detail::ISink { - explicit SerializedSink(fs::path outputFile) - : m_fd{outputFile, false, true, false} {} - - void blob(const std::string& b) override { - encodeRefId(makeRefId(b), m_encoder); - } - void optBlob(const Optional& b) override { - encodeOptRefId(b ? makeRefId(*b) : Optional{}, m_encoder); - } - void variadic(const BlobVec& v) override { - auto const refs = from(v) - | map([&] (const std::string& b) { return makeRefId(b); }) - | as(); - encodeRefIdVec(refs, m_encoder); - } - void nextOutput() override {} - void startFini() override {} - - void finish() override { - writeToPipe( - g_local_pipe_fd, - (const char*)m_encoder.data(), - m_encoder.size() - ); - ::close(g_local_pipe_fd); - } - - static void encodeRefId(const RefId& r, BlobEncoder& e) { - // Optimized RefId encoding. If it's an inline ref, we can encode - // it more optimally. See above in SerializedSource::decodeRefId. - e(r.m_size); - if (r.m_size <= g_inline_size) { - assertx(r.m_id.size() == r.m_size); - assertx(!r.m_extra); - e.writeRaw(r.m_id.data(), r.m_id.size()); - } else { - e(r.m_id); - e(r.m_extra); - } - } - - static void encodeOptRefId(const Optional& r, BlobEncoder& e) { - if (r) { - e(true); - encodeRefId(*r, e); - } else { - e(false); - } - } - - static void encodeRefIdVec(const IdVec& v, BlobEncoder& e) { - e((size_t)v.size()); - for (auto const& r : v) encodeRefId(r, e); - } +JobBase::JobBase(const std::string& name) + : m_name{name} +{ + FTRACE(4, "registering remote worker command \"{}\"\n", m_name); + auto& r = registry(); + std::lock_guard _{r.lock}; + auto const insert = r.registry.emplace(m_name, this); + always_assert(insert.second); +} -private: - RefId makeRefId(const std::string& b) { - if (b.size() <= g_inline_size) return RefId{b, b.size(), 0}; - auto const offset = m_fd.append(b); - return RefId{m_fd.path().filename(), b.size(), offset}; - } +////////////////////////////////////////////////////////////////////// - FD m_fd; - BlobEncoder m_encoder; -}; +} ////////////////////////////////////////////////////////////////////// /* - * "File" API: - * - * For this mode a worker expects to be invoked with the name of the - * command, followed by 3 paths to directories. The directories + * A note on conventions: A worker expects to be invoked with the name + * of the command, followed by 3 paths to directories. The directories * represent the config inputs (for init()), the inputs (for multiple * runs()), and the last is the output directory, which will be * written to. @@ -534,257 +185,28 @@ struct SerializedSink : public detail::ISink { * * NB: Marker types cannot nest, so there aren't any possible deeper * levels than this. + * + * All implementations must follow this layout when setting up + * execution. The actual execution on the worker side is completely + * agnostic to the implementation. */ -struct FileSource : public detail::ISource { - FileSource(fs::path config, fs::path input) - : m_configPath{std::move(config)} - , m_inputPath{std::move(input)} - , m_itemIdx{0} - , m_inputIdx{0} - , m_itemBase{m_configPath} {} - ~FileSource() = default; - - std::string blob() override { - auto const filename = m_itemBase / folly::to(m_itemIdx++); - return detail::readFile(filename); - } - Optional optBlob() override { - auto const filename = m_itemBase / folly::to(m_itemIdx++); - if (!fs::exists(filename)) return std::nullopt; - return detail::readFile(filename); - } - - BlobVec variadic() override { - BlobVec out; - auto const vecBase = m_itemBase / folly::to(m_itemIdx++); - for (size_t i = 0;; ++i) { - auto const valPath = vecBase / folly::to(i); - // A break in the numbering means the end of the vector. - if (!fs::exists(valPath)) break; - out.emplace_back(detail::readFile(valPath)); - } - return out; - } - - void initDone() override { - assertx(m_itemBase == m_configPath); - assertx(m_inputIdx == 0); - m_itemIdx = 0; - m_itemBase = m_inputPath / "0"; - } - - bool inputEnd() const override { - assertx(m_itemBase == m_inputPath / folly::to(m_inputIdx)); - return !fs::exists(m_itemBase); - } - - void nextInput() override { - assertx(m_itemBase == m_inputPath / folly::to(m_inputIdx)); - m_itemIdx = 0; - m_itemBase = m_inputPath / folly::to(++m_inputIdx); - } - - void finish() override {} -private: - fs::path m_configPath; - fs::path m_inputPath; - - size_t m_itemIdx; - size_t m_inputIdx; - fs::path m_itemBase; -}; - -struct FileSink : public detail::ISink { - explicit FileSink(fs::path base) - : m_base{std::move(base)} - , m_itemIdx{0} - , m_outputIdx{0} - { - // We insist on a clean output directory. - if (!fs::create_directory(m_base)) { - throw Error{ - folly::sformat("Output directory {} already exists", m_base.native()) - }; - } - } - - void blob(const std::string& b) override { write(b); } - void optBlob(const Optional& b) override { - if (!b) { - makeDir(); - ++m_itemIdx; - } else { - write(*b); - } - } - void variadic(const BlobVec& v) override { - auto const dir = makeDir(); - auto const vecDir = dir / folly::to(m_itemIdx); - fs::create_directory(vecDir, dir); - for (size_t i = 0; i < v.size(); ++i) { - auto const& b = v[i]; - writeFile(vecDir / folly::to(i), b.data(), b.size()); - } - ++m_itemIdx; - } - - void nextOutput() override { - assertx(m_outputIdx.has_value()); - ++*m_outputIdx; - m_itemIdx = 0; - } - void startFini() override { - assertx(m_outputIdx.has_value()); - m_outputIdx.reset(); - m_itemIdx = 0; - } - void finish() override {} -private: - fs::path currentDir() { - if (!m_outputIdx) return m_base / "fini"; - return m_base / folly::to(*m_outputIdx); - } - - fs::path makeDir() { - auto const outputDir = currentDir(); - if (!m_itemIdx) fs::create_directory(outputDir, m_base); - return outputDir; - } - - void write(const std::string& b) { - auto const dir = makeDir(); - writeFile(dir / folly::to(m_itemIdx), b.data(), b.size()); - ++m_itemIdx; - } - - fs::path m_base; - size_t m_itemIdx; - Optional m_outputIdx; -}; - -////////////////////////////////////////////////////////////////////// - -// folly::writeFile expects a container-like input, so this makes a -// ptr/length pair behave like one (without copying). -struct Adaptor { - size_t size() const { return m_size; } - bool empty() const { return !size(); } - const char& operator[](size_t idx) const { return m_ptr[idx]; } - const char* m_ptr; - size_t m_size; -}; - -////////////////////////////////////////////////////////////////////// - -} - -////////////////////////////////////////////////////////////////////// - -namespace detail { - -////////////////////////////////////////////////////////////////////// - -// Wrappers around the folly functions with error handling - -std::string readFile(const fs::path& path) { - std::string s; - if (!folly::readFile(path.c_str(), s)) { - throw Error{ - folly::sformat( - "Unable to read input from {} [{}]", - path.c_str(), folly::errnoStr(errno) - ) - }; - } - return s; -} - -void writeFile(const fs::path& path, - const char* ptr, size_t size) { - if (!folly::writeFile(Adaptor{ptr, size}, path.c_str())) { - throw Error{ - folly::sformat( - "Unable to write output to {} [{}]", - path.c_str(), folly::errnoStr(errno) - ) - }; - } -} - -////////////////////////////////////////////////////////////////////// - -JobBase::JobBase(const std::string& name) - : m_name{name} -{ - FTRACE(4, "registering remote worker command \"{}\"\n", m_name); - auto& r = registry(); - std::lock_guard _{r.lock}; - auto const insert = r.registry.emplace(m_name, this); - always_assert(insert.second); -} - -////////////////////////////////////////////////////////////////////// - -} - -////////////////////////////////////////////////////////////////////// - -namespace { +int main(int argc, char** argv) { + try { + always_assert(argc > 1); + always_assert(!strcmp(argv[1], s_option)); -// Parse the command-line and create the appropriate Source and Sink -// for input and output. -std::tuple< - std::unique_ptr, - std::unique_ptr, - std::string -> -parseOptions(int argc, char** argv) { - always_assert(argc > 1); - always_assert(!strcmp(argv[1], s_option)); - - if (argc >= 2 && !strcmp(argv[2], g_local_option)) { if (argc != 6) { std::cerr << "Usage: " << argv[0] - << " " << s_option - << " " << g_local_option - << " " - << " " - << " " + << " " << s_option << " " + << " " + << " " + << " " << std::endl; - return std::make_tuple(nullptr, nullptr, ""); + return EXIT_FAILURE; } - std::string name{argv[3]}; - fs::path root{argv[4]}; - fs::path outputFile{argv[5]}; - - FTRACE(2, "extern worker run (local) (\"{}\", {}, {})\n", - name, root.native(), outputFile.native()); - - // Input comes from STDIN - auto source = - time("read-pipe", [] { return readFromPipe(STDIN_FILENO); }); - return std::make_tuple( - std::make_unique( - std::move(root), - std::move(source) - ), - std::make_unique(std::move(outputFile)), - std::move(name) - ); - } else if (argc != 6) { - std::cerr << "Usage: " - << argv[0] - << " " << s_option - << " " - << " " - << " " - << " " - << std::endl; - return std::make_tuple(nullptr, nullptr, ""); - } else { std::string name{argv[2]}; fs::path configPath{argv[3]}; fs::path outputPath{argv[4]}; @@ -794,30 +216,8 @@ parseOptions(int argc, char** argv) { name, configPath.native(), outputPath.native(), inputPath.native()); - return std::make_tuple( - std::make_unique( - std::move(configPath), - std::move(inputPath) - ), - std::make_unique(std::move(outputPath)), - std::move(name) - ); - } -} - -} - -////////////////////////////////////////////////////////////////////// - -int main(int argc, char** argv) { - Timer _{"main"}; - - try { - auto const [source, sink, name] = parseOptions(argc, argv); - if (!source) return EXIT_FAILURE; - // Lookup the registered job for the requested name. - auto const worker = [&, name = name] { + auto const worker = [&] { auto& r = registry(); std::lock_guard _{r.lock}; auto const it = r.registry.find(name); @@ -827,26 +227,38 @@ int main(int argc, char** argv) { return it->second; }(); + // We insist on a clean output directory. + if (!fs::create_directory(outputPath)) { + throw Error{ + folly::sformat("Output directory {} already exists", outputPath.native()) + }; + } + g_in_job = true; SCOPE_EXIT { g_in_job = false; }; // First do any global initialization. - time("init", [&, &source = source] { worker->init(*source); }); - time("run-all", [&, &source = source, &sink = sink] { - // Then execute run() until we're out of inputs. - size_t run = 0; - while (!source->inputEnd()) { + time("init", [&] { worker->init(configPath); }); + time("run-all", [&] { + // Then execute run() for each given set of inputs. + for (size_t i = 0;; ++i) { + auto const thisInput = inputPath / folly::to(i); + if (!fs::exists(thisInput)) break; + + auto const thisOutput = outputPath / folly::to(i); + FTRACE(4, "executing #{} ({} -> {})\n", + i, thisInput.native(), thisOutput.native()); + fs::create_directory(thisOutput, outputPath); + time( - [&] { return folly::sformat("run {}", run); }, - [&, &source = source, &sink = sink] { worker->run(*source, *sink); } + [&]{ return folly::sformat("run {}", i); }, + [&] { worker->run(thisInput, thisOutput); } ); - ++run; } - source->finish(); }); - // Do any cleanup and flush output to its final destination - time("fini", [&, &sink = sink] { worker->fini(*sink); }); - time("flush", [&sink = sink] { sink->finish(); }); + // Do any cleanup + time("fini", [&] { worker->fini(outputPath); }); + return 0; } catch (const std::exception& exn) { std::cerr << "Error: " << exn.what() << std::endl; @@ -866,38 +278,27 @@ namespace { * be "reliable" (IE, never needs a fallback). * * All data is stored under the "root" (which is the working-dir - * specified in Options). - * - * Data is stored in blob files under the root, whose names are - * assigned numerically. The blob files are pooled. When a thread - * needs to write, it checks out a free file, append the data to it, - * then "returns" it back to the pool. The particular blob file data - * is stored to is therefore arbitrary. Files are not stored at all - * (since they're already on disk). - * - * The RefIds are just the path to the file, either a blob file, or - * some other file. If the data was stored in a blob file, the m_extra - * of the RefId will contain its offset in the blob file. For stored - * files, m_extra will always be zero. + * specified in Options). The two sub-directories are "blobs" and + * "execs". * - * As an optimization, if the data is less or equal to g_inline_size, - * then its stored "inline" in the RefId. That is, m_id is the blob - * itself. Inline/non-inline blobs can be distinguished by the m_size - * field. This also applies to stored files (if sufficiently small, - * we'll read it and store it inline). + * Data is stored under blobs/, using files with numerically + * increasing names. Stored files are not stored at all (since they're + * already on disk). The RefIds are just the path to the file (either + * under blobs/ or where-ever the stored file was). * - * Workers are forked, given their inputs, calculate outputs, then - * write their outputs and exit. Input is given to the worker via - * stdin (as a stream of serialized RefIds). As part of the - * command-line, the worker is given an output file to write its - * output to. It does (creating RefIds in the process), and reports - * the output RefIds via a pipe (g_local_pipe_fd). + * Executions are stored under execs/. Each entry is a sub-directory + * with numerically increasing names. Within each sub-directory is the + * directory layout that the worker expects (see comment above int + * main()). The data is not actually copied into the input + * directories. Instead symlinks are created using the path in the + * RefId. Outputs are not copied either, they just "live" in their + * output directory. */ struct SubprocessImpl : public Client::Impl { SubprocessImpl(const Options&, Client&); ~SubprocessImpl() override; - std::string session() const override { return m_fdManager->root().native(); } + std::string session() const override { return m_root.native(); } bool isSubprocess() const override { return true; } bool supportsOptimistic() const override { return false; } @@ -915,85 +316,60 @@ struct SubprocessImpl : public Client::Impl { const folly::Range*) override; private: - // Manage the pool of blob files - struct FDManager { - explicit FDManager(fs::path); - - const fs::path& root() const { return m_root; } - - // Acquire a FD to the given file to read from. If the path does - // not correspond to a blob file, nullptr is returned. - const FD* acquireForRead(const fs::path&); - // Acquire a FD to a blob file to append to. You don't have a say - // in which file you get. The FD must be returned via release() - // when done. You have exclusive access to this FD. - FD* acquireForAppend(); - // Release a FD acquired from acquireForAppend. - void release(FD&); - private: - std::unique_ptr newFD(); - - fs::path m_root; - folly_concurrent_hash_map_simd> m_fds; - - std::mutex m_lock; - std::stack m_forAppend; - size_t m_nextBlob; - }; - - // Similar to FDManager, but manages trace files. We have workers - // re-use trace files to avoid generating a huge number of time. - struct TraceFileManager { - fs::path get(); - void put(fs::path); - private: - std::mutex m_lock; - std::stack m_paths; - size_t m_nextId{0}; - }; - + fs::path newBlob(); + fs::path newExec(); static fs::path newRoot(const Options&); - coro::Task doSubprocess(const RequestId&, - const std::string&, - std::string, - const fs::path&); + coro::Task doSubprocess(const RequestId&, + const std::string&, + const fs::path&, + const fs::path&, + const fs::path&, + const fs::path&); Options m_options; - std::unique_ptr m_fdManager; - TraceFileManager m_traceManager; + + fs::path m_root; + // SubprocessImpl doesn't need the m_size portion of RefId. So we + // store an unique integer in it to help verify that the RefId came + // from this implementation instance. + size_t m_marker; + + std::atomic m_nextBlob; + std::atomic m_nextExec; }; SubprocessImpl::SubprocessImpl(const Options& options, Client& parent) : Impl{"subprocess", parent} , m_options{options} - , m_fdManager{std::make_unique(newRoot(m_options))} + , m_root{newRoot(m_options)} + // Cheap way of generating semi-unique integer: + , m_marker{std::hash{}(m_root.native())} + , m_nextBlob{0} + , m_nextExec{0} { FTRACE(2, "Using subprocess extern-worker impl with root at {}\n", - m_fdManager->root().native()); - Logger::FInfo( - "Using subprocess extern-worker at {}", - m_fdManager->root().native() - ); + m_root.native()); + Logger::FInfo("Using subprocess extern-worker at {}", m_root.native()); + + fs::create_directory(m_root / "blobs"); + fs::create_directory(m_root / "execs"); } SubprocessImpl::~SubprocessImpl() { - auto const root = m_fdManager->root(); - // Destroy FD manager first, to ensure no open FDs to any files - // while cleaning up. - m_fdManager.reset(); - + // Deleting everything under blobs/ and execs/ can take quite a + // while... if (m_options.m_cleanup) { Logger::FInfo( "Cleaning up subprocess extern-worker at {}...", - root.native() + m_root.native() ); auto const before = std::chrono::steady_clock::now(); auto const removed = time( "subprocess cleanup", [&] { std::error_code ec; // Suppress exceptions - return fs::remove_all(root, ec); + return fs::remove_all(m_root, ec); } ); auto const elapsed = std::chrono::duration_cast>( @@ -1008,6 +384,28 @@ SubprocessImpl::~SubprocessImpl() { } } +// Create paths for a new blob or a new execution. I lied a little bit +// above in the description of the directory layout. It's true that +// blobs and execs use numerically increasing names, but I shard them +// into directories of 10000 each, to keep their sizes reasonable. +fs::path SubprocessImpl::newBlob() { + auto const id = m_nextBlob++; + auto const blobRoot = m_root / "blobs"; + auto const mid = blobRoot / folly::sformat("{:05}", id / 10000); + fs::create_directory(mid, blobRoot); + return mid / folly::sformat("{:04}", id % 10000); +} + +fs::path SubprocessImpl::newExec() { + auto const id = m_nextExec++; + auto const execRoot = m_root / "execs"; + auto const mid = execRoot / folly::sformat("{:05}", id / 10000); + fs::create_directory(mid, execRoot); + auto const full = mid / folly::sformat("{:04}", id % 10000); + fs::create_directory(full, mid); + return full; +} + // Ensure we always have an unique root under the working directory. fs::path SubprocessImpl::newRoot(const Options& opts) { auto const base = opts.m_workingDir / "hphp-extern-worker"; @@ -1025,21 +423,13 @@ coro::Task SubprocessImpl::load(const RequestId& requestId, // contents. auto out = from(ids) | mapped([&] (const RefId& id) { + assertx(id.m_size == m_marker); FTRACE(4, "{} reading blob from {}\n", - requestId.tracePrefix(), id.toString()); - if (id.m_size <= g_inline_size) { - // Inline data requires no file read - assertx(id.m_id.size() == id.m_size); - assertx(!id.m_extra); - return id.m_id; - } - if (auto const fd = m_fdManager->acquireForRead(id.m_id)) { - // The data is in a blob file, so use a cached FD - return fd->read(id.m_extra, id.m_size); - } - // It's some other (non-blob) file. Create an ephemeral FD to - // read it. - return FD{id.m_id, true, false, false}.read(id.m_extra, id.m_size); + requestId.tracePrefix(), id.m_id); + auto blob = readFile(id.m_id); + FTRACE(4, "{} blob is {} bytes\n", + requestId.tracePrefix(), blob.size()); + return blob; }) | as(); HPHP_CORO_MOVE_RETURN(out); @@ -1049,56 +439,27 @@ coro::Task SubprocessImpl::store(const RequestId& requestId, PathVec paths, BlobVec blobs, bool) { - // SubprocessImpl never "uploads" files, but it must write to disk - // (which we classify as an upload). - - FD* fd = nullptr; - SCOPE_EXIT { if (fd) m_fdManager->release(*fd); }; - - // Update stats. Skip blobs which we'll store inline. - for (auto const& b : blobs) { - if (b.size() <= g_inline_size) continue; - ++stats().blobsUploaded; - stats().blobBytesUploaded += b.size(); - if (!fd) fd = m_fdManager->acquireForAppend(); - assertx(fd); - } - + // SubprocessImpl never "uploads" files because it uses symlinks. It + // must write blobs to disk, however (which we classify as an + // upload). + stats().blobsUploaded += blobs.size(); + for (auto& b : blobs) stats().blobBytesUploaded += b.size(); + + // Create RefIds from the given paths, then write the blobs to disk, + // then use their paths. auto out = ((from(paths) | mapped([&] (const fs::path& p) { - auto fileSize = fs::file_size(p); - if (fileSize <= g_inline_size) { - FTRACE(4, "{} storing file {} inline\n", - requestId.tracePrefix(), - p.native()); - auto const contents = readFile(p); - // Size of file could theoretically change between - // stat-ing and reading it. - if (contents.size() <= g_inline_size) { - return RefId{contents, contents.size(), 0}; - } - fileSize = contents.size(); - } - // We distinguish blob files from non-blob files by making - // sure non-blob files are always absolute paths (blob files - // are always relative). - return RefId{fs::canonical(p).native(), fileSize, 0}; + return RefId{fs::canonical(p).native(), m_marker}; }) ) + (from(blobs) | mapped([&] (const std::string& b) { - if (b.size() <= g_inline_size) { - FTRACE(4, "{} storing blob inline\n", - requestId.tracePrefix()); - return RefId{b, b.size(), 0}; - } + auto const path = newBlob(); FTRACE(4, "{} writing size {} blob to {}\n", - requestId.tracePrefix(), b.size(), fd->path().native()); - auto const offset = fd->append(b); - RefId r{fd->path().filename().native(), b.size(), offset}; - FTRACE(4, "{} written as {}\n", requestId.tracePrefix(), r.toString()); - return r; + requestId.tracePrefix(), b.size(), path.native()); + writeFile(path, b.data(), b.size()); + return RefId{fs::canonical(path).native(), m_marker}; }) )) | as(); @@ -1112,236 +473,217 @@ SubprocessImpl::exec(const RequestId& requestId, std::vector inputs, const folly::Range& output, const folly::Range* finiOutput) { - FTRACE(4, "{} executing \"{}\" ({} runs)\n", - requestId.tracePrefix(), command, inputs.size()); + auto const execPath = newExec(); + auto const configPath = execPath / "config"; + auto const inputsPath = execPath / "input"; + auto const outputsPath = execPath / "output"; + + FTRACE(4, "{} executing \"{}\" inside {} ({} runs)\n", + requestId.tracePrefix(), command, + execPath.native(), inputs.size()); HPHP_CORO_SAFE_POINT; - // Each set of inputs should always have the same size. - if (debug && !inputs.empty()) { - auto const size = inputs[0].size(); - for (size_t i = 1; i < inputs.size(); ++i) { - always_assert(inputs[i].size() == size); - } - } + // Set up the directory structure that the worker expects: - // Encode all the inputs - BlobEncoder encoder; - auto const encodeParams = [&] (const RefValVec& params) { - for (auto const& param : params) { + auto const symlink = [&] (const RefId& id, + const fs::path& path) { + assertx(id.m_size == m_marker); + fs::create_symlink(id.m_id, path); + FTRACE(4, "{} symlinked {} to {}\n", + requestId.tracePrefix(), id.m_id, path.native()); + }; + + auto const prepare = [&] (const RefValVec& params, + const fs::path& parent) { + for (size_t paramIdx = 0; paramIdx < params.size(); ++paramIdx) { + auto const& p = params[paramIdx]; + auto const path = parent / folly::to(paramIdx); match( - param, - [&] (const RefId& id) { - SerializedSink::encodeRefId(id, encoder); - }, + p, + [&] (const RefId& id) { symlink(id, path); }, [&] (const Optional& id) { - SerializedSink::encodeOptRefId(id, encoder); + if (!id.has_value()) return; + symlink(*id, path); }, [&] (const IdVec& ids) { - SerializedSink::encodeRefIdVec(ids, encoder); + fs::create_directory(path, parent); + for (size_t vecIdx = 0; vecIdx < ids.size(); ++vecIdx) { + symlink(ids[vecIdx], path / folly::to(vecIdx)); + } } ); } }; - encodeParams(config); - encoder((size_t)inputs.size()); - for (auto const& input : inputs) encodeParams(input); - // Acquire a FD. We're not actually going to write to it, but the - // worker will. This ensures the worker has exclusive access to the - // file while it's running. - auto fd = m_fdManager->acquireForAppend(); - SCOPE_EXIT { if (fd) m_fdManager->release(*fd); }; + fs::create_directory(configPath, execPath); + prepare(config, configPath); + + // Each set of inputs should always have the same size. + if (debug && !inputs.empty()) { + auto const size = inputs[0].size(); + for (size_t i = 1; i < inputs.size(); ++i) { + always_assert(inputs[i].size() == size); + } + } + + fs::create_directory(inputsPath, execPath); + for (size_t i = 0; i < inputs.size(); ++i) { + auto const path = inputsPath / folly::to(i); + fs::create_directory(path, inputsPath); + prepare(inputs[i], path); + } // Do the actual fork+exec. - auto const outputBlob = HPHP_CORO_AWAIT( - doSubprocess( - requestId, - command, - std::string{(const char*)encoder.data(), encoder.size()}, - fd->path() - ) - ); - // The worker (maybe) wrote to the file, so we need to re-sync our - // tracked offset. - fd->syncOffset(); + HPHP_CORO_AWAIT(doSubprocess( + requestId, + command, + execPath, + configPath, + inputsPath, + outputsPath + )); + + // Make RefIds corresponding to the outputs. - // Decode the output - BlobDecoder decoder{outputBlob.data(), outputBlob.size()}; - auto const makeOutput = [&] (OutputType type) -> RefVal { + auto const makeOutput = + [&] (OutputType type, fs::path path) -> RefVal { switch (type) { case OutputType::Val: - return SerializedSource::decodeRefId(decoder); + assertx(fs::exists(path)); + return RefId{std::move(path), m_marker}; case OutputType::Opt: - return SerializedSource::decodeOptRefId(decoder); - case OutputType::Vec: - return SerializedSource::decodeRefIdVec(decoder); + if (!fs::exists(path)) return std::nullopt; + return make_optional(RefId{std::move(path), m_marker}); + case OutputType::Vec: { + assertx(fs::exists(path)); + IdVec vec; + size_t i = 0; + while (true) { + auto valPath = path / folly::to(i); + if (!fs::exists(valPath)) break; + vec.emplace_back(valPath.native(), m_marker); + ++i; + } + return vec; + } } always_assert(false); }; - auto const makeOutputs = [&] (const folly::Range& types) { + auto const makeOutputs = + [&] (const fs::path& path, + const folly::Range& outputTypes) { RefValVec vec; - vec.reserve(types.size()); - for (auto const type : types) { - vec.emplace_back(makeOutput(type)); + vec.reserve(outputTypes.size()); + for (size_t i = 0; i < outputTypes.size(); ++i) { + vec.emplace_back( + makeOutput( + outputTypes[i], + path / folly::to(i) + ) + ); } return vec; }; std::vector out; out.reserve(inputs.size() + (finiOutput ? 1 : 0)); - std::generate_n( - std::back_inserter(out), - inputs.size(), - [&] { return makeOutputs(output); } - ); - if (finiOutput) out.emplace_back(makeOutputs(*finiOutput)); - - decoder.assertDone(); + for (size_t i = 0; i < inputs.size(); ++i) { + out.emplace_back( + makeOutputs( + outputsPath / folly::to(i), + output + ) + ); + } + if (finiOutput) { + out.emplace_back( + makeOutputs( + outputsPath / "fini", + *finiOutput + ) + ); + } HPHP_CORO_MOVE_RETURN(out); } -coro::Task -SubprocessImpl::doSubprocess(const RequestId& requestId, - const std::string& command, - std::string inputBlob, - const fs::path& outputPath) { +coro::Task SubprocessImpl::doSubprocess(const RequestId& requestId, + const std::string& command, + const fs::path& execPath, + const fs::path& configPath, + const fs::path& inputPath, + const fs::path& outputPath) { std::vector args{ current_executable_path(), s_option, - g_local_option, command, - m_fdManager->root().native(), - outputPath.native() + configPath, + outputPath, + inputPath }; // Propagate the TRACE option in the environment. We'll copy the // trace output into this process' trace output. std::vector env; Optional traceFile; - SCOPE_EXIT { if (traceFile) m_traceManager.put(std::move(*traceFile)); }; if (auto const trace = getenv("TRACE")) { - traceFile = m_traceManager.get(); - auto const fullPath = m_fdManager->root() / *traceFile; + traceFile = execPath / "trace.log"; env.emplace_back(folly::sformat("TRACE={}", trace)); - env.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", fullPath.c_str())); + env.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", traceFile->c_str())); } - FTRACE( - 4, "{} executing subprocess for '{}'{}(input size: {})\n", - requestId.tracePrefix(), - command, - traceFile - ? folly::sformat(" (trace-file: {}) ", traceFile->native()) - : "", - inputBlob.size() - ); + FTRACE(4, "{} executing subprocess\n", + requestId.tracePrefix()); HPHP_CORO_SAFE_POINT; - auto const before = std::chrono::steady_clock::now(); + auto const DEBUG_ONLY before = std::chrono::steady_clock::now(); // Do the actual fork+exec. folly::Subprocess subprocess{ args, folly::Subprocess::Options{} - .parentDeathSignal(SIGKILL) - .pipeStdin() + .stdinFd(folly::Subprocess::DEV_NULL) .pipeStdout() .pipeStderr() - .fd(g_local_pipe_fd, folly::Subprocess::PIPE_OUT) .closeOtherFds(), nullptr, &env }; - std::string output; - std::string stderr; - size_t inputWritten = 0; - - // Communicate with the worker. When available, read from worker's - // stderr and output pipe and store the data. Throw everything else - // away. Attempt to write inputs to the worker's stdin whenever it - // has free space. - subprocess.communicate( - [&] (int parentFd, int childFd) { // Read - if (childFd == g_local_pipe_fd) { - return readFromPipe(parentFd, output); - } else if (childFd == STDERR_FILENO) { - return readFromPipe(parentFd, stderr); - } else { - // Worker is writing to some other random FD (including - // stdout). Ignore it. - char dummy[512]; - folly::readNoInt(parentFd, dummy, sizeof dummy); - } - return false; - }, - [&] (int parentFd, int childFd) { // Write - // Close any writable FD except stdin - if (childFd != STDIN_FILENO) return true; - // We've written all of the input, so close stdin. This will - // signal to the worker that input is all sent. - if (inputWritten >= inputBlob.size()) return true; - - // Otherwise write what we can - auto const toWrite = inputBlob.size() - inputWritten; - auto const written = - folly::writeNoInt(parentFd, inputBlob.data() + inputWritten, toWrite); - if (written < 0) { - if (errno == EAGAIN) return false; - throw Error{ - folly::sformat( - "Failed writing {} bytes to subprocess input for '{}' [{}]", - toWrite, command, folly::errnoStr(errno) - ) - }; - } else if (written == 0) { - return true; - } - inputWritten += written; - return false; - } - ); - + auto const [_, stderr] = subprocess.communicate(); auto const returnCode = subprocess.wait(); - auto const elapsed = std::chrono::steady_clock::now() - before; - stats().execCpuUsec += - std::chrono::duration_cast(elapsed).count(); - ++stats().execAllocatedCores; + HPHP_CORO_SAFE_POINT; FTRACE( 4, - "{} subprocess finished (took {}). " - "Output size: {}, Return code: {}, Stderr: {}\n", + "{} subprocess finished (took {}). Return code: {}, Stderr: {}\n", requestId.tracePrefix(), prettyPrint( std::chrono::duration_cast>( - elapsed + std::chrono::steady_clock::now() - before ).count(), folly::PRETTY_TIME, false ), - output.size(), returnCode.str(), stderr ); - HPHP_CORO_SAFE_POINT; - // Do this before checking the return code. If the process failed, // we want to capture anything it logged before throwing. - if (traceFile && fs::exists(m_fdManager->root() / *traceFile)) { - auto const contents = readFile(m_fdManager->root() / *traceFile); + if (traceFile && fs::exists(*traceFile)) { + auto const contents = readFile(*traceFile); if (!contents.empty()) { Trace::ftraceRelease( - "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\") vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n" + "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\" {}) vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n" "{}" "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n", requestId.toString(), command, + execPath.native(), contents ); } @@ -1357,89 +699,8 @@ SubprocessImpl::doSubprocess(const RequestId& requestId, ) }; } - if (inputWritten != inputBlob.size()) { - throw WorkerError{ - folly::sformat( - "Execution of `{}` failed: Process failed to consume input\n", - command - ) - }; - } - - HPHP_CORO_MOVE_RETURN(output); -} - -////////////////////////////////////////////////////////////////////// - -SubprocessImpl::FDManager::FDManager(fs::path root) - : m_root{std::move(root)} - , m_nextBlob{0} -{} - -const FD* SubprocessImpl::FDManager::acquireForRead(const fs::path& path) { - // Blob files are always relative - if (path.is_absolute()) return nullptr; - // Only already created blob files should be in m_fds. So, if it's - // not present, it can't be a blob file. - auto const it = m_fds.find(path.native()); - if (it == m_fds.end()) return nullptr; - return it->second.get(); -} - -FD* SubprocessImpl::FDManager::acquireForAppend() { - // Use the blob file at the top of m_forAppend. - std::scoped_lock _{m_lock}; - if (m_forAppend.empty()) { - // If empty, there's no free blob file. We need to create a new - // one. - auto fd = newFD(); - auto const path = fd->path(); - auto const [elem, emplaced] = - m_fds.emplace(path.filename().native(), std::move(fd)); - assertx(emplaced); - m_forAppend.push(elem->second.get()); - } - auto fd = m_forAppend.top(); - m_forAppend.pop(); - return fd; -} - -void SubprocessImpl::FDManager::release(FD& fd) { - if (debug) { - // Should be returning something cached - auto const it = m_fds.find(fd.path().filename().native()); - always_assert(it != m_fds.end()); - always_assert(it->second.get() == &fd); - } - std::scoped_lock _{m_lock}; - m_forAppend.push(&fd); -} - -std::unique_ptr SubprocessImpl::FDManager::newFD() { - // We deliberately keep the blob filename as small as possible - // because they get serialized a lot and it keeps the input/output - // sizes small. - auto const id = m_nextBlob++; - auto const filename = m_root / folly::to(id); - return std::make_unique(filename, true, true, true); -} - -////////////////////////////////////////////////////////////////////// - -fs::path SubprocessImpl::TraceFileManager::get() { - std::scoped_lock _{m_lock}; - if (m_paths.empty()) { - auto const id = m_nextId++; - m_paths.push(folly::sformat("trace-{:04}.log", id)); - } - auto path = std::move(m_paths.top()); - m_paths.pop(); - return path; -} -void SubprocessImpl::TraceFileManager::put(fs::path path) { - std::scoped_lock _{m_lock}; - m_paths.push(std::move(path)); + HPHP_CORO_RETURN_VOID; } ////////////////////////////////////////////////////////////////////// diff --git a/hphp/util/extern-worker.h b/hphp/util/extern-worker.h index 9a4a14b093845..076e57178a835 100644 --- a/hphp/util/extern-worker.h +++ b/hphp/util/extern-worker.h @@ -172,9 +172,10 @@ struct Job : public detail::JobBase { using ExecT = typename detail::ExecRet::type; private: - void init(detail::ISource&) const override; - void fini(detail::ISink&) const override; - void run(detail::ISource&, detail::ISink&) const override; + void init(const std::filesystem::path&) const override; + void fini(const std::filesystem::path&) const override; + void run(const std::filesystem::path&, + const std::filesystem::path&) const override; }; ////////////////////////////////////////////////////////////////////// @@ -214,16 +215,16 @@ struct Multi { // Identifier for a Ref. Used by the implementation to track them. The // meaning of the identifier is private to the implementation. struct RefId { - RefId(std::string, size_t, size_t extra = 0); + RefId(std::string, size_t); std::string toString() const; bool operator==(const RefId&) const; bool operator!=(const RefId&) const; bool operator<(const RefId&) const; + // Despite their names, these fields can be used for anything. std::string m_id; - size_t m_size; // Size of data - size_t m_extra; // For internal usage + size_t m_size; }; // Represents a piece of data "inside" the extern-worker