Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[orchagent]: Remove global lock caused by notifications running in another thread #478

Merged
merged 7 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions orchagent/fdborch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "fdborch.h"
#include "crmorch.h"
#include "notifier.h"
#include "sai_serialize.h"

extern sai_fdb_api_t *sai_fdb_api;

Expand All @@ -25,8 +26,14 @@ FdbOrch::FdbOrch(DBConnector *db, string tableName, PortsOrch *port) :
{
m_portsOrch->attach(this);
auto consumer = new NotificationConsumer(db, "FLUSHFDBREQUEST");
auto fdbNotification = new Notifier(consumer, this);
auto fdbNotification = new Notifier(consumer, this, "FLUSHFDBREQUEST");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define the notification events as enumerations and derive the names from them? This way, we can put them in a common place so no hardcode of strings from different places.
Also, it would make the addition of future events implementation easier in case we want to implement the existing notification events or new added events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With enumerations we will have hardcoded enumerations.
I think in this case it's better to develop better abstraction than NotificationConsumer(). Currently it's connected too deep into Orch class (doTask methods for NotificationConsumer). We should decouple them from each other and implement doTask methods as a base class for Orch classes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree more changes is needed for a better abstraction than what we have today. but I am not sure if you are going to do it in this PR.
My point was, at least we should put the message types in a common place and reuse then on different places so you don't end up with mistype something and get run-time problem. enumerations will detect the errors during the build time not run time if you mistyped, also by putting things in the same place improves manageability and extendability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that enumerations will work better here. But what I don't like with this approach is to have one common enum for all cases. I better prefer to have a separateenumeration for every class. Otherwise it's not clear what notifications could come to a target doTask.

Anyway, after Qi's comment I removed the string parameter. You can check new version of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments, anyway

Copy link
Contributor

@qiluo-msft qiluo-msft Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fdbNotification [](start = 9, length = 15)

Confused about fdbNotification vs fdb_notification. What is the diff? #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed all the variable names for notification related stuff

Orch::addExecutor("", fdbNotification);

/* Add FDB notification support from ASIC */
DBConnector *notification_db = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
auto notification_consumer = new swss::NotificationConsumer(notification_db, "NOTIFICATIONS");
auto fdb_notification = new Notifier(notification_consumer, this, "NOTIFICATIONS");
Orch::addExecutor("FDB_NOTIFICATIONS", fdb_notification);
}

void FdbOrch::update(sai_fdb_event_t type, const sai_fdb_entry_t* entry, sai_object_id_t bridge_port_id)
Expand Down Expand Up @@ -274,7 +281,7 @@ void FdbOrch::doTask(Consumer& consumer)
}
}

void FdbOrch::doTask(NotificationConsumer& consumer)
void FdbOrch::doTask(NotificationConsumer& consumer, const std::string &name)
{
SWSS_LOG_ENTER();

Expand All @@ -290,36 +297,64 @@ void FdbOrch::doTask(NotificationConsumer& consumer)

consumer.pop(op, data, values);

if (op == "ALL")
if (name == "FLUSHFDBREQUEST")
{
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
if (op == "ALL")
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}

return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
}
else
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
}
}
else
else if (name == "NOTIFICATIONS" && op == "fdb_event")
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
uint32_t count;
sai_fdb_event_notification_data_t *fdbevent = nullptr;

sai_deserialize_fdb_event_ntf(data, count, &fdbevent);

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < fdbevent[i].attr_count; ++j)
{
if (fdbevent[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = fdbevent[i].attr[j].value.oid;
break;
}
}

this->update(fdbevent[i].event_type, &fdbevent[i].fdb_entry, oid);

sai_deserialize_free_fdb_event_ntf(count, fdbevent);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion orchagent/fdborch.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class FdbOrch: public Orch, public Subject, public Observer
Table m_table;

void doTask(Consumer& consumer);
void doTask(NotificationConsumer& consumer);
void doTask(NotificationConsumer& consumer, const std::string &name);

void updateVlanMember(const VlanMemberUpdate&);
bool addFdbEntry(const FdbEntry&, const string&, const string&);
Expand Down
4 changes: 0 additions & 4 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ extern "C" {
#include <iostream>
#include <unordered_map>
#include <map>
#include <mutex>
#include <thread>
#include <chrono>
#include <getopt.h>
Expand Down Expand Up @@ -47,9 +46,6 @@ bool gLogRotate = false;
ofstream gRecordOfs;
string gRecordFile;

/* Global database mutex */
mutex gDbMutex;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-b batch_size] [-m MAC]" << endl;
Expand Down
60 changes: 4 additions & 56 deletions orchagent/notifications.cpp
Original file line number Diff line number Diff line change
@@ -1,72 +1,20 @@
#include <unordered_map>
#include <mutex>
#include <assert.h>

#include "portsorch.h"
#include "fdborch.h"

extern "C" {
#include "sai.h"
}

#include "logger.h"
#include "notifications.h"

extern mutex gDbMutex;
extern PortsOrch *gPortsOrch;
extern FdbOrch *gFdbOrch;

void on_fdb_event(uint32_t count, sai_fdb_event_notification_data_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gFdbOrch)
{
SWSS_LOG_NOTICE("gFdbOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < data[i].attr_count; ++j)
{
if (data[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = data[i].attr[j].value.oid;
break;
}
}

gFdbOrch->update(data[i].event_type, &data[i].fdb_entry, oid);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_port_state_change(uint32_t count, sai_port_oper_status_notification_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gPortsOrch)
{
SWSS_LOG_NOTICE("gPortsOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = data[i].port_id;
sai_port_oper_status_t status = data[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

gPortsOrch->updateDbPortOperStatus(id, status);
gPortsOrch->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_switch_shutdown_request()
Expand Down
9 changes: 6 additions & 3 deletions orchagent/notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

class Notifier : public Executor {
public:
Notifier(NotificationConsumer *select, Orch *orch)
: Executor(select, orch)
Notifier(NotificationConsumer *select, Orch *orch, const std::string& notifier_name)
Copy link
Contributor

@qiluo-msft qiluo-msft Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notifier_name [](start = 74, length = 13)

I notice there is a similar name at NotificationConsumer.m_channel. I prefer using that one instread of a new parameter. #Closed

Copy link
Contributor Author

@pavel-shirshov pavel-shirshov Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to m_channel #Closed

Copy link
Contributor

@qiluo-msft qiluo-msft Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify. I mean you could use getNotificationConsumer()->m_channel internally, instead ask user provide a new argument.


In reply to: 182863130 [](ancestors = 182863130)

: Executor(select, orch), m_name(notifier_name)
{
}

Expand All @@ -14,6 +14,9 @@ class Notifier : public Executor {

void execute()
{
m_orch->doTask(*getNotificationConsumer());
m_orch->doTask(*getNotificationConsumer(), m_name);
}

private:
std::string m_name;
};
6 changes: 0 additions & 6 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <fstream>
#include <iostream>
#include <mutex>
#include <sys/time.h>
#include "timestamp.h"
#include "orch.h"
Expand All @@ -15,8 +14,6 @@ using namespace swss;

extern int gBatchSize;

extern mutex gDbMutex;

extern bool gSwssRecord;
extern ofstream gRecordOfs;
extern bool gLogRotate;
Expand Down Expand Up @@ -73,9 +70,6 @@ void Consumer::execute()
{
SWSS_LOG_ENTER();

// TODO: remove DbMutex when there is only single thread
lock_guard<mutex> lock(gDbMutex);

std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);

Expand Down
2 changes: 1 addition & 1 deletion orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class Orch

/* Run doTask against a specific executor */
virtual void doTask(Consumer &consumer) = 0;
virtual void doTask(NotificationConsumer &consumer) { }
virtual void doTask(NotificationConsumer &consumer, const std::string &name) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::string &name [](start = 56, length = 23)

can you add a comment on the name, what does this name use for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also renamed this parameter to more clear name

virtual void doTask(SelectableTimer &timer) { }

/* TODO: refactor recording */
Expand Down
4 changes: 2 additions & 2 deletions orchagent/pfcwdorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ PfcWdSwOrch<DropHandler, ForwardHandler>::PfcWdSwOrch(
auto consumer = new swss::NotificationConsumer(
PfcWdSwOrch<DropHandler, ForwardHandler>::getCountersDb().get(),
"PFC_WD");
auto wdNotification = new Notifier(consumer, this);
auto wdNotification = new Notifier(consumer, this, "PFC_WD");
Orch::addExecutor("PFC_WD", wdNotification);
}

Expand Down Expand Up @@ -712,7 +712,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::stopWdOnPort(const Port& port)
}

template <typename DropHandler, typename ForwardHandler>
void PfcWdSwOrch<DropHandler, ForwardHandler>::doTask(swss::NotificationConsumer& wdNotification)
void PfcWdSwOrch<DropHandler, ForwardHandler>::doTask(swss::NotificationConsumer& wdNotification, const std::string& name)
{
SWSS_LOG_ENTER();

Expand Down
2 changes: 1 addition & 1 deletion orchagent/pfcwdorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PfcWdSwOrch: public PfcWdOrch<DropHandler, ForwardHandler>
void registerInWdDb(const Port& port,
uint32_t detectionTime, uint32_t restorationTime, PfcWdAction action);
void unregisterFromWdDb(const Port& port);
void doTask(swss::NotificationConsumer &wdNotification);
void doTask(swss::NotificationConsumer &wdNotification, const std::string &name);

string filterPfcCounters(string counters, set<uint8_t>& losslessTc);
string getFlexCounterTableKey(string s);
Expand Down
51 changes: 51 additions & 0 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "sai_serialize.h"
#include "crmorch.h"
#include "countercheckorch.h"
#include "notifier.h"

extern sai_switch_api_t *sai_switch_api;
extern sai_bridge_api_t *sai_bridge_api;
Expand Down Expand Up @@ -235,6 +236,12 @@ PortsOrch::PortsOrch(DBConnector *db, vector<table_name_with_pri_t> &tableNames)

removeDefaultVlanMembers();
removeDefaultBridgePorts();

/* Add port oper status notification support */
DBConnector *notification_db = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
auto notification_consumer = new swss::NotificationConsumer(notification_db, "NOTIFICATIONS");
auto port_status_notification = new Notifier(notification_consumer, this, "NOTIFICATIONS");
Orch::addExecutor("PORT_STATUS_NOTIFICATIONS", port_status_notification);
}

void PortsOrch::removeDefaultVlanMembers()
Expand Down Expand Up @@ -2330,3 +2337,47 @@ bool PortsOrch::removeLagMember(Port &lag, Port &port)

return true;
}

void PortsOrch::doTask(NotificationConsumer &consumer, const std::string &name)
{
SWSS_LOG_ENTER();

/* Wait for all ports to be initialized */
if (!isInitDone())
{
return;
}

std::string op;
std::string data;
std::vector<swss::FieldValueTuple> values;

consumer.pop(op, data, values);

if (name != "NOTIFICATIONS")
{
SWSS_LOG_ERROR("Wrong name of notification provider: '%s'", name.c_str());
return;
}

if (op == "port_state_change")
{
uint32_t count;
sai_port_oper_status_notification_t *portoperstatus = nullptr;

sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus);

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = portoperstatus[i].port_id;
sai_port_oper_status_t status = portoperstatus[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

this->updateDbPortOperStatus(id, status);
this->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}

sai_deserialize_free_port_oper_status_ntf(count, portoperstatus);
}
}
2 changes: 2 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class PortsOrch : public Orch, public Subject
void doLagTask(Consumer &consumer);
void doLagMemberTask(Consumer &consumer);

void doTask(NotificationConsumer &consumer, const std::string &name);

void removeDefaultVlanMembers();
void removeDefaultBridgePorts();

Expand Down