Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve config sync locking (support/2.12) #8511

Merged
merged 5 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ set(base_SOURCES
shared-object.hpp
singleton.hpp
socket.cpp socket.hpp
spinlock.cpp spinlock.hpp
stacktrace.cpp stacktrace.hpp
statsfunction.hpp
stdiostream.cpp stdiostream.hpp
Expand Down
22 changes: 17 additions & 5 deletions lib/base/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;

Process::Process(Process::Arguments arguments, Dictionary::Ptr extraEnvironment)
: m_Arguments(std::move(arguments)), m_ExtraEnvironment(std::move(extraEnvironment)), m_Timeout(600), m_AdjustPriority(false)
: m_Arguments(std::move(arguments)), m_ExtraEnvironment(std::move(extraEnvironment)),
m_Timeout(600), m_AdjustPriority(false), m_ResultAvailable(false)
#ifdef _WIN32
, m_ReadPending(false), m_ReadFailed(false), m_Overlapped()
#endif /* _WIN32 */
Expand Down Expand Up @@ -1007,6 +1008,12 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
#endif /* _WIN32 */
}

const ProcessResult& Process::WaitForResult() {
std::unique_lock<std::mutex> lock(m_ResultMutex);
m_ResultCondition.wait(lock, [this]{ return m_ResultAvailable; });
return m_Result;
}

bool Process::DoEvents()
{
bool is_timeout = false;
Expand Down Expand Up @@ -1114,10 +1121,15 @@ bool Process::DoEvents()
}
#endif /* _WIN32 */

m_Result.PID = m_PID;
m_Result.ExecutionEnd = Utility::GetTime();
m_Result.ExitStatus = exitcode;
m_Result.Output = output;
{
std::lock_guard<std::mutex> lock(m_ResultMutex);
m_Result.PID = m_PID;
m_Result.ExecutionEnd = Utility::GetTime();
m_Result.ExitStatus = exitcode;
m_Result.Output = output;
m_ResultAvailable = true;
}
m_ResultCondition.notify_all();

if (m_Callback)
Utility::QueueAsyncCallback(std::bind(m_Callback, m_Result));
Expand Down
7 changes: 7 additions & 0 deletions lib/base/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <deque>
#include <vector>
#include <sstream>
#include <mutex>
#include <condition_variable>

namespace icinga
{
Expand Down Expand Up @@ -61,6 +63,8 @@ class Process final : public Object

void Run(const std::function<void (const ProcessResult&)>& callback = std::function<void (const ProcessResult&)>());

const ProcessResult& WaitForResult();

pid_t GetPID() const;

static Arguments PrepareCommand(const Value& command);
Expand Down Expand Up @@ -94,6 +98,9 @@ class Process final : public Object
std::ostringstream m_OutputStream;
std::function<void (const ProcessResult&)> m_Callback;
ProcessResult m_Result;
bool m_ResultAvailable;
std::mutex m_ResultMutex;
std::condition_variable m_ResultCondition;

static void IOThreadProc(int tid);
bool DoEvents();
Expand Down
22 changes: 0 additions & 22 deletions lib/base/spinlock.cpp

This file was deleted.

35 changes: 0 additions & 35 deletions lib/base/spinlock.hpp

This file was deleted.

85 changes: 37 additions & 48 deletions lib/remote/apilistener-filesync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using namespace icinga;

REGISTER_APIFUNCTION(Update, config, &ApiListener::ConfigUpdateHandler);

SpinLock ApiListener::m_ConfigSyncStageLock;
std::mutex ApiListener::m_ConfigSyncStageLock;

/**
* Entrypoint for updating all authoritative configs from /etc/zones.d, packages, etc.
Expand Down Expand Up @@ -330,7 +330,7 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
/* Only one transaction is allowed, concurrent message handlers need to wait.
* This affects two parent endpoints sending the config in the same moment.
*/
auto lock (Shared<std::unique_lock<SpinLock>>::Make(m_ConfigSyncStageLock));
std::lock_guard<std::mutex> lock(m_ConfigSyncStageLock);

String apiZonesStageDir = GetApiZonesStageDir();
String fromEndpointName = origin->FromClient->GetEndpoint()->GetName();
Expand Down Expand Up @@ -544,7 +544,7 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
Log(LogInformation, "ApiListener")
<< "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName
<< "' are different to production, triggering validation and reload.";
AsyncTryActivateZonesStage(relativePaths, lock);
TryActivateZonesStage(relativePaths);
} else {
Log(LogInformation, "ApiListener")
<< "Received configuration updates (" << count << ") from endpoint '" << fromEndpointName
Expand All @@ -554,17 +554,44 @@ void ApiListener::HandleConfigUpdate(const MessageOrigin::Ptr& origin, const Dic
}

/**
* Callback for stage config validation.
* When validation was successful, the configuration is copied from
* stage to production and a restart is triggered.
* On failure, there's no restart and this is logged.
* Spawns a new validation process with 'System.ZonesStageVarDir' set to override the config validation zone dirs with
* our current stage. Then waits for the validation result and if it was successful, the configuration is copied from
* stage to production and a restart is triggered. On validation failure, there is no restart and this is logged.
*
* The caller of this function must hold m_ConfigSyncStageLock.
*
* @param pr Result of the validation process.
* @param relativePaths Collected paths including the zone name, which are copied from stage to current directories.
*/
void ApiListener::TryActivateZonesStageCallback(const ProcessResult& pr,
const std::vector<String>& relativePaths)
void ApiListener::TryActivateZonesStage(const std::vector<String>& relativePaths)
{
VERIFY(Application::GetArgC() >= 1);

/* Inherit parent process args. */
Array::Ptr args = new Array({
Application::GetExePath(Application::GetArgV()[0]),
});

for (int i = 1; i < Application::GetArgC(); i++) {
String argV = Application::GetArgV()[i];

if (argV == "-d" || argV == "--daemonize")
continue;

args->Add(argV);
}

args->Add("--validate");

// Set the ZonesStageDir. This creates our own local chroot without any additional automated zone includes.
args->Add("--define");
args->Add("System.ZonesStageVarDir=" + GetApiZonesStageDir());

Process::Ptr process = new Process(Process::PrepareCommand(args));
process->SetTimeout(Application::GetReloadTimeout());

process->Run();
const ProcessResult& pr = process->WaitForResult();

String apiZonesDir = GetApiZonesDir();
String apiZonesStageDir = GetApiZonesStageDir();

Expand Down Expand Up @@ -628,44 +655,6 @@ void ApiListener::TryActivateZonesStageCallback(const ProcessResult& pr,
listener->UpdateLastFailedZonesStageValidation(pr.Output);
}

/**
* Spawns a new validation process and waits for its output.
* Sets 'System.ZonesStageVarDir' to override the config validation zone dirs with our current stage.
*
* @param relativePaths Required for later file operations in the callback. Provides the zone name plus path in a list.
*/
void ApiListener::AsyncTryActivateZonesStage(const std::vector<String>& relativePaths, const Shared<std::unique_lock<SpinLock>>::Ptr& lock)
{
VERIFY(Application::GetArgC() >= 1);

/* Inherit parent process args. */
Array::Ptr args = new Array({
Application::GetExePath(Application::GetArgV()[0]),
});

for (int i = 1; i < Application::GetArgC(); i++) {
String argV = Application::GetArgV()[i];

if (argV == "-d" || argV == "--daemonize")
continue;

args->Add(argV);
}

args->Add("--validate");

// Set the ZonesStageDir. This creates our own local chroot without any additional automated zone includes.
args->Add("--define");
args->Add("System.ZonesStageVarDir=" + GetApiZonesStageDir());

Process::Ptr process = new Process(Process::PrepareCommand(args));
process->SetTimeout(Application::GetReloadTimeout());

process->Run([relativePaths, lock](const ProcessResult& pr) {
TryActivateZonesStageCallback(pr, relativePaths);
});
}

/**
* Update the structure from the last failed validation output.
* Uses the current timestamp.
Expand Down
7 changes: 2 additions & 5 deletions lib/remote/apilistener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "base/configobject.hpp"
#include "base/process.hpp"
#include "base/shared.hpp"
#include "base/spinlock.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include "base/tcpsocket.hpp"
Expand Down Expand Up @@ -188,7 +187,7 @@ class ApiListener final : public ObjectImpl<ApiListener>
void RemoveStatusFile();

/* filesync */
static SpinLock m_ConfigSyncStageLock;
static std::mutex m_ConfigSyncStageLock;

void SyncLocalZoneDirs() const;
void SyncLocalZoneDir(const Zone::Ptr& zone) const;
Expand All @@ -200,9 +199,7 @@ class ApiListener final : public ObjectImpl<ApiListener>
static ConfigDirInformation LoadConfigDir(const String& dir);
static void ConfigGlobHandler(ConfigDirInformation& config, const String& path, const String& file);

static void TryActivateZonesStageCallback(const ProcessResult& pr,
const std::vector<String>& relativePaths);
static void AsyncTryActivateZonesStage(const std::vector<String>& relativePaths, const Shared<std::unique_lock<SpinLock>>::Ptr& lock);
static void TryActivateZonesStage(const std::vector<String>& relativePaths);

static String GetChecksum(const String& content);
static bool CheckConfigChange(const ConfigDirInformation& oldConfig, const ConfigDirInformation& newConfig);
Expand Down