diff --git a/fpmsyncd/Makefile.am b/fpmsyncd/Makefile.am index bae2fd73a7..d0f27588be 100644 --- a/fpmsyncd/Makefile.am +++ b/fpmsyncd/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I $(top_srcdir) -I $(FPM_PATH) +INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/warmrestart -I $(FPM_PATH) bin_PROGRAMS = fpmsyncd @@ -8,9 +8,8 @@ else DBGFLAGS = -g endif -fpmsyncd_SOURCES = fpmsyncd.cpp fpmlink.cpp routesync.cpp +fpmsyncd_SOURCES = fpmsyncd.cpp fpmlink.cpp routesync.cpp $(top_srcdir)/warmrestart/warmRestartHelper.cpp $(top_srcdir)/warmrestart/warmRestartHelper.h fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) fpmsyncd_LDADD = -lnl-3 -lnl-route-3 -lswsscommon - diff --git a/fpmsyncd/fpmsyncd.cpp b/fpmsyncd/fpmsyncd.cpp index d5a54f8fee..42eba65f0f 100644 --- a/fpmsyncd/fpmsyncd.cpp +++ b/fpmsyncd/fpmsyncd.cpp @@ -1,13 +1,24 @@ #include #include "logger.h" #include "select.h" +#include "selectabletimer.h" #include "netdispatcher.h" +#include "warmRestartHelper.h" #include "fpmsyncd/fpmlink.h" #include "fpmsyncd/routesync.h" + using namespace std; using namespace swss; + +/* + * Default warm-restart timer interval for routing-stack app. To be used only if + * no explicit value has been defined in configuration. + */ +const uint32_t DEFAULT_ROUTING_RESTART_INTERVAL = 120; + + int main(int argc, char **argv) { swss::Logger::linkToDbNative("fpmsyncd"); @@ -18,25 +29,75 @@ int main(int argc, char **argv) NetDispatcher::getInstance().registerMessageHandler(RTM_NEWROUTE, &sync); NetDispatcher::getInstance().registerMessageHandler(RTM_DELROUTE, &sync); - while (1) + while (true) { try { FpmLink fpm; Select s; + SelectableTimer warmStartTimer(timespec{0, 0}); - cout << "Waiting for connection..." << endl; + /* + * Pipeline should be flushed right away to deal with state pending + * from previous try/catch iterations. + */ + pipeline.flush(); + + cout << "Waiting for fpm-client connection..." << endl; fpm.accept(); cout << "Connected!" << endl; s.addSelectable(&fpm); + + /* If warm-restart feature is enabled, execute 'restoration' logic */ + bool warmStartEnabled = sync.m_warmStartHelper.checkAndStart(); + if (warmStartEnabled) + { + /* Obtain warm-restart timer defined for routing application */ + uint32_t warmRestartIval = sync.m_warmStartHelper.getRestartTimer(); + if (!warmRestartIval) + { + warmStartTimer.setInterval(timespec{DEFAULT_ROUTING_RESTART_INTERVAL, 0}); + } + else + { + warmStartTimer.setInterval(timespec{warmRestartIval, 0}); + } + + /* Execute restoration instruction and kick off warm-restart timer */ + if (sync.m_warmStartHelper.runRestoration()) + { + warmStartTimer.start(); + s.addSelectable(&warmStartTimer); + } + } + while (true) { Selectable *temps; - /* Reading FPM messages forever (and calling "readData" to read them) */ + + /* Reading FPM messages forever (and calling "readMe" to read them) */ s.select(&temps); - pipeline.flush(); - SWSS_LOG_DEBUG("Pipeline flushed"); + + /* + * Upon expiration of the warm-restart timer, proceed to run the + * reconciliation process and remove warm-restart timer from + * select() loop. + */ + if (warmStartEnabled && temps == &warmStartTimer) + { + SWSS_LOG_NOTICE("Warm-Restart timer expired."); + sync.m_warmStartHelper.reconcile(); + s.removeSelectable(&warmStartTimer); + + pipeline.flush(); + SWSS_LOG_DEBUG("Pipeline flushed"); + } + else if (!warmStartEnabled || sync.m_warmStartHelper.isReconciled()) + { + pipeline.flush(); + SWSS_LOG_DEBUG("Pipeline flushed"); + } } } catch (FpmLink::FpmConnectionClosedException &e) diff --git a/fpmsyncd/routesync.cpp b/fpmsyncd/routesync.cpp index 488410c9e3..63805d9d0a 100644 --- a/fpmsyncd/routesync.cpp +++ b/fpmsyncd/routesync.cpp @@ -14,7 +14,8 @@ using namespace std; using namespace swss; RouteSync::RouteSync(RedisPipeline *pipeline) : - m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true) + m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true), + m_warmStartHelper(pipeline, &m_routeTable, APP_ROUTE_TABLE_NAME, "bgp", "bgp") { m_nl_sock = nl_socket_alloc(); nl_connect(m_nl_sock, NETLINK_ROUTE); @@ -38,10 +39,31 @@ void RouteSync::onMsg(int nlmsg_type, struct nl_object *obj) return; } + /* + * Upon arrival of a delete msg we could either push the change right away, + * or we could opt to defer it if we are going through a warm-reboot cycle. + */ + bool warmRestartInProgress = m_warmStartHelper.inProgress(); + if (nlmsg_type == RTM_DELROUTE) { - m_routeTable.del(destipprefix); - return; + if (!warmRestartInProgress) + { + m_routeTable.del(destipprefix); + return; + } + else + { + SWSS_LOG_INFO("Warm-Restart mode: Receiving delete msg: %s\n", + destipprefix); + + vector fvVector; + const KeyOpFieldsValuesTuple kfv = std::make_tuple(destipprefix, + DEL_COMMAND, + fvVector); + m_warmStartHelper.insertRefreshMap(kfv); + return; + } } else if (nlmsg_type != RTM_NEWROUTE) { @@ -118,8 +140,29 @@ void RouteSync::onMsg(int nlmsg_type, struct nl_object *obj) vector fvVector; FieldValueTuple nh("nexthop", nexthops); FieldValueTuple idx("ifname", ifnames); + fvVector.push_back(nh); fvVector.push_back(idx); - m_routeTable.set(destipprefix, fvVector); - SWSS_LOG_DEBUG("RoutTable set: %s %s %s\n", destipprefix, nexthops.c_str(), ifnames.c_str()); + + if (!warmRestartInProgress) + { + m_routeTable.set(destipprefix, fvVector); + SWSS_LOG_DEBUG("RouteTable set msg: %s %s %s\n", + destipprefix, nexthops.c_str(), ifnames.c_str()); + } + + /* + * During routing-stack restarting scenarios route-updates will be temporarily + * put on hold by warm-reboot logic. + */ + else + { + SWSS_LOG_INFO("Warm-Restart mode: RouteTable set msg: %s %s %s\n", + destipprefix, nexthops.c_str(), ifnames.c_str()); + + const KeyOpFieldsValuesTuple kfv = std::make_tuple(destipprefix, + SET_COMMAND, + fvVector); + m_warmStartHelper.insertRefreshMap(kfv); + } } diff --git a/fpmsyncd/routesync.h b/fpmsyncd/routesync.h index 43b6305287..1652bedee7 100644 --- a/fpmsyncd/routesync.h +++ b/fpmsyncd/routesync.h @@ -4,6 +4,8 @@ #include "dbconnector.h" #include "producerstatetable.h" #include "netmsg.h" +#include "warmRestartHelper.h" + namespace swss { @@ -16,10 +18,12 @@ class RouteSync : public NetMsg virtual void onMsg(int nlmsg_type, struct nl_object *obj); + WarmStartHelper m_warmStartHelper; + private: - ProducerStateTable m_routeTable; - struct nl_cache *m_link_cache; - struct nl_sock *m_nl_sock; + ProducerStateTable m_routeTable; + struct nl_cache *m_link_cache; + struct nl_sock *m_nl_sock; }; } diff --git a/tests/conftest.py b/tests/conftest.py index 3df6c6c1ee..e1ad251fe7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -139,7 +139,7 @@ def __init__(self, name=None): for ctn in self.client.containers.list(): if ctn.id == ctn_sw_id or ctn.name == ctn_sw_id: ctn_sw_name = ctn.name - + (status, output) = commands.getstatusoutput("docker inspect --format '{{.State.Pid}}' %s" % ctn_sw_name) self.ctn_sw_pid = int(output) @@ -214,11 +214,409 @@ def check_ready(self, timeout=30): def restart(self): self.ctn.restart() + # start processes in SWSS + def start_swss(self): + cmd = "" + for pname in self.swssd: + cmd += "supervisorctl start {}; ".format(pname) + self.runcmd(['sh', '-c', cmd]) + + # stop processes in SWSS + def stop_swss(self): + cmd = "" + for pname in self.swssd: + cmd += "supervisorctl stop {}; ".format(pname) + self.runcmd(['sh', '-c', cmd]) + + def start_zebra(dvs): + dvs.runcmd(['sh', '-c', 'supervisorctl start zebra']) + + # Let's give zebra a chance to connect to FPM. + time.sleep(5) + + def stop_zebra(dvs): + dvs.runcmd(['sh', '-c', 'pkill -x zebra']) + time.sleep(1) + + def start_fpmsyncd(dvs): + dvs.runcmd(['sh', '-c', 'supervisorctl start fpmsyncd']) + + # Let's give fpmsyncd a chance to connect to Zebra. + time.sleep(5) + + def stop_fpmsyncd(dvs): + dvs.runcmd(['sh', '-c', 'pkill -x fpmsyncd']) + time.sleep(1) + def init_asicdb_validator(self): self.asicdb = AsicDbValidator(self) def runcmd(self, cmd): - return self.ctn.exec_run(cmd) + res = self.ctn.exec_run(cmd) + try: + exitcode = res.exit_code + out = res.output + except AttributeError: + exitcode = 0 + out = res + if exitcode != 0: + print "-----rc={} for cmd {}-----".format(exitcode, cmd) + print out.rstrip() + print "-----" + + return (exitcode, out) + + def copy_file(self, path, filename): + tarstr = StringIO.StringIO() + tar = tarfile.open(fileobj=tarstr, mode="w") + tar.add(filename, os.path.basename(filename)) + tar.close() + self.ctn.exec_run("mkdir -p %s" % path) + self.ctn.put_archive(path, tarstr.getvalue()) + tarstr.close() + + def get_logs(self, modname=None): + stream, stat = self.ctn.get_archive("/var/log/") + if modname == None: + log_dir = "log" + else: + log_dir = "log/{}".format(modname) + os.system("rm -rf {}".format(log_dir)) + os.system("mkdir -p {}".format(log_dir)) + p = subprocess.Popen(["tar", "--no-same-owner", "-C", "./{}".format(log_dir), "-x"], stdin=subprocess.PIPE) + for x in stream: + p.stdin.write(x) + p.stdin.close() + p.wait() + if p.returncode: + raise RuntimeError("Failed to unpack the archive.") + os.system("chmod a+r -R log") + + def add_log_marker(self, file=None): + marker = "=== start marker {} ===".format(datetime.now().isoformat()) + + if file: + self.runcmd(['sh', '-c', "echo \"{}\" >> {}".format(marker, file)]) + else: + self.ctn.exec_run("logger {}".format(marker)) + + return marker + + def SubscribeAppDbObject(self, objpfx): + r = redis.Redis(unix_socket_path=self.redis_sock, db=swsscommon.APPL_DB) + pubsub = r.pubsub() + pubsub.psubscribe("__keyspace@0__:%s*" % objpfx) + return pubsub + + def SubscribeAsicDbObject(self, objpfx): + r = redis.Redis(unix_socket_path=self.redis_sock, db=swsscommon.ASIC_DB) + pubsub = r.pubsub() + pubsub.psubscribe("__keyspace@1__:ASIC_STATE:%s*" % objpfx) + return pubsub + + def CountSubscribedObjects(self, pubsub, ignore=None, timeout=10): + nadd = 0 + ndel = 0 + idle = 0 + while True and idle < timeout: + message = pubsub.get_message() + if message: + print message + if ignore: + fds = message['channel'].split(':') + if fds[2] in ignore: + continue + if message['data'] == 'hset': + nadd += 1 + elif message['data'] == 'del': + ndel += 1 + idle = 0 + else: + time.sleep(1) + idle += 1 + + return (nadd, ndel) + + def GetSubscribedAppDbObjects(self, pubsub, ignore=None, timeout=10): + r = redis.Redis(unix_socket_path=self.redis_sock, db=swsscommon.APPL_DB) + + addobjs = [] + delobjs = [] + idle = 0 + prev_key = None + + while True and idle < timeout: + message = pubsub.get_message() + if message: + print message + key = message['channel'].split(':', 1)[1] + # In producer/consumer_state_table scenarios, every entry will + # show up twice for every push/pop operation, so skip the second + # one to avoid double counting. + if key != None and key == prev_key: + continue + # Skip instructions with meaningless keys. To be extended in the + # future to other undesired keys. + if key == "ROUTE_TABLE_KEY_SET" or key == "ROUTE_TABLE_DEL_SET": + continue + if ignore: + fds = message['channel'].split(':') + if fds[2] in ignore: + continue + + if message['data'] == 'hset': + (_, k) = key.split(':', 1) + value=r.hgetall(key) + addobjs.append({'key':json.dumps(k), 'vals':json.dumps(value)}) + prev_key = key + elif message['data'] == 'del': + (_, k) = key.split(':', 1) + delobjs.append({'key':json.dumps(k)}) + idle = 0 + else: + time.sleep(1) + idle += 1 + + return (addobjs, delobjs) + + + def GetSubscribedAsicDbObjects(self, pubsub, ignore=None, timeout=10): + r = redis.Redis(unix_socket_path=self.redis_sock, db=swsscommon.ASIC_DB) + + addobjs = [] + delobjs = [] + idle = 0 + + while True and idle < timeout: + message = pubsub.get_message() + if message: + print message + key = message['channel'].split(':', 1)[1] + if ignore: + fds = message['channel'].split(':') + if fds[2] in ignore: + continue + if message['data'] == 'hset': + value=r.hgetall(key) + (_, t, k) = key.split(':', 2) + addobjs.append({'type':t, 'key':k, 'vals':value}) + elif message['data'] == 'del': + (_, t, k) = key.split(':', 2) + delobjs.append({'key':k}) + idle = 0 + else: + time.sleep(1) + idle += 1 + + return (addobjs, delobjs) + + def get_map_iface_bridge_port_id(self, asic_db): + port_id_2_iface = self.asicdb.portoidmap + tbl = swsscommon.Table(asic_db, "ASIC_STATE:SAI_OBJECT_TYPE_BRIDGE_PORT") + iface_2_bridge_port_id = {} + for key in tbl.getKeys(): + status, data = tbl.get(key) + assert status + values = dict(data) + iface_id = values["SAI_BRIDGE_PORT_ATTR_PORT_ID"] + iface_name = port_id_2_iface[iface_id] + iface_2_bridge_port_id[iface_name] = key + + return iface_2_bridge_port_id + + def is_table_entry_exists(self, db, table, keyregex, attributes): + tbl = swsscommon.Table(db, table) + keys = tbl.getKeys() + + extra_info = [] + for key in keys: + if re.match(keyregex, key) is None: + continue + + status, fvs = tbl.get(key) + assert status, "Error reading from table %s" % table + + d_attributes = dict(attributes) + for k, v in fvs: + if k in d_attributes and d_attributes[k] == v: + del d_attributes[k] + + if len(d_attributes) != 0: + extra_info.append("Desired attributes %s was not found for key %s" % (str(d_attributes), key)) + else: + return True, extra_info + else: + if not extra_info: + extra_info.append("Desired key regex %s was not found" % str(keyregex)) + return False, extra_info + + def all_table_entry_has(self, db, table, keyregex, attributes): + tbl = swsscommon.Table(db, table) + keys = tbl.getKeys() + extra_info = [] + + if len(keys) == 0: + extra_info.append("keyregex %s not found" % keyregex) + return False, extra_info + + for key in keys: + if re.match(keyregex, key) is None: + continue + + status, fvs = tbl.get(key) + assert status, "Error reading from table %s" % table + + d_attributes = dict(attributes) + for k, v in fvs: + if k in d_attributes and d_attributes[k] == v: + del d_attributes[k] + + if len(d_attributes) != 0: + extra_info.append("Desired attributes %s were not found for key %s" % (str(d_attributes), key)) + return False, extra_info + + return True, extra_info + + def all_table_entry_has_no(self, db, table, keyregex, attributes_list): + tbl = swsscommon.Table(db, table) + keys = tbl.getKeys() + extra_info = [] + + if len(keys) == 0: + extra_info.append("keyregex %s not found" % keyregex) + return False, extra_info + + for key in keys: + if re.match(keyregex, key) is None: + continue + + status, fvs = tbl.get(key) + assert status, "Error reading from table %s" % table + + for k, v in fvs: + if k in attributes_list: + extra_info.append("Unexpected attribute %s was found for key %s" % (k, key)) + return False, extra_info + + return True, extra_info + + def is_fdb_entry_exists(self, db, table, key_values, attributes): + tbl = swsscommon.Table(db, table) + keys = tbl.getKeys() + + exists = False + extra_info = [] + key_found = False + for key in keys: + try: + d_key = json.loads(key) + except ValueError: + d_key = json.loads('{' + key + '}') + + for k, v in key_values: + if k not in d_key or v != d_key[k]: + continue + + key_found = True + + status, fvs = tbl.get(key) + assert status, "Error reading from table %s" % table + + d_attributes = dict(attributes) + for k, v in fvs: + if k in d_attributes and d_attributes[k] == v: + del d_attributes[k] + + if len(d_attributes) != 0: + exists = False + extra_info.append("Desired attributes %s was not found for key %s" % (str(d_attributes), key)) + else: + exists = True + break + + if not key_found: + exists = False + extra_info.append("Desired key with parameters %s was not found" % str(key_values)) + + return exists, extra_info + + def create_vlan(self, vlan): + tbl = swsscommon.Table(self.cdb, "VLAN") + fvs = swsscommon.FieldValuePairs([("vlanid", vlan)]) + tbl.set("Vlan" + vlan, fvs) + time.sleep(1) + + def create_vlan_member(self, vlan, interface): + tbl = swsscommon.Table(self.cdb, "VLAN_MEMBER") + fvs = swsscommon.FieldValuePairs([("tagging_mode", "untagged")]) + tbl.set("Vlan" + vlan + "|" + interface, fvs) + time.sleep(1) + + def set_interface_status(self, interface, admin_status): + if interface.startswith("PortChannel"): + tbl_name = "PORTCHANNEL" + elif interface.startswith("Vlan"): + tbl_name = "VLAN" + else: + tbl_name = "PORT" + tbl = swsscommon.Table(self.cdb, tbl_name) + fvs = swsscommon.FieldValuePairs([("admin_status", "up")]) + tbl.set(interface, fvs) + time.sleep(1) + + def add_ip_address(self, interface, ip): + if interface.startswith("PortChannel"): + tbl_name = "PORTCHANNEL_INTERFACE" + elif interface.startswith("Vlan"): + tbl_name = "VLAN_INTERFACE" + else: + tbl_name = "INTERFACE" + tbl = swsscommon.Table(self.cdb, tbl_name) + fvs = swsscommon.FieldValuePairs([("NULL", "NULL")]) + tbl.set(interface + "|" + ip, fvs) + time.sleep(1) + + def add_neighbor(self, interface, ip, mac): + tbl = swsscommon.ProducerStateTable(self.pdb, "NEIGH_TABLE") + fvs = swsscommon.FieldValuePairs([("neigh", mac), + ("family", "IPv4")]) + tbl.set(interface + ":" + ip, fvs) + time.sleep(1) + + def setup_db(self): + self.pdb = swsscommon.DBConnector(0, self.redis_sock, 0) + self.adb = swsscommon.DBConnector(1, self.redis_sock, 0) + self.cdb = swsscommon.DBConnector(4, self.redis_sock, 0) + self.sdb = swsscommon.DBConnector(6, self.redis_sock, 0) + + def getCrmCounterValue(self, key, counter): + counters_db = swsscommon.DBConnector(swsscommon.COUNTERS_DB, self.redis_sock, 0) + crm_stats_table = swsscommon.Table(counters_db, 'CRM') + + for k in crm_stats_table.get(key)[1]: + if k[0] == counter: + return int(k[1]) + + def setReadOnlyAttr(self, obj, attr, val): + db = swsscommon.DBConnector(swsscommon.ASIC_DB, self.redis_sock, 0) + tbl = swsscommon.Table(db, "ASIC_STATE:{0}".format(obj)) + keys = tbl.getKeys() + + assert len(keys) == 1 + + swVid = keys[0] + r = redis.Redis(unix_socket_path=self.redis_sock, db=swsscommon.ASIC_DB) + swRid = r.hget("VIDTORID", swVid) + + assert swRid is not None + + ntf = swsscommon.NotificationProducer(db, "SAI_VS_UNITTEST_CHANNEL") + fvp = swsscommon.FieldValuePairs() + ntf.send("enable_unittests", "true", fvp) + fvp = swsscommon.FieldValuePairs([(attr, val)]) + key = "SAI_OBJECT_TYPE_SWITCH:" + swRid + + ntf.send("set_ro", key, fvp) @pytest.yield_fixture(scope="module") def dvs(request): diff --git a/tests/test_warm_reboot.py b/tests/test_warm_reboot.py new file mode 100644 index 0000000000..ace1758722 --- /dev/null +++ b/tests/test_warm_reboot.py @@ -0,0 +1,1444 @@ +from swsscommon import swsscommon +import os +import re +import time +import json + +# Get restore count of all processes supporting warm restart +def swss_get_RestoreCount(dvs, state_db): + restore_count = {} + warmtbl = swsscommon.Table(state_db, swsscommon.STATE_WARM_RESTART_TABLE_NAME) + keys = warmtbl.getKeys() + assert len(keys) != 0 + for key in keys: + if key not in dvs.swssd: + continue + (status, fvs) = warmtbl.get(key) + assert status == True + for fv in fvs: + if fv[0] == "restore_count": + restore_count[key] = int(fv[1]) + print(restore_count) + return restore_count + +# function to check the restore count incremented by 1 for all processes supporting warm restart +def swss_check_RestoreCount(dvs, state_db, restore_count): + warmtbl = swsscommon.Table(state_db, swsscommon.STATE_WARM_RESTART_TABLE_NAME) + keys = warmtbl.getKeys() + print(keys) + assert len(keys) > 0 + for key in keys: + if key not in dvs.swssd: + continue + (status, fvs) = warmtbl.get(key) + assert status == True + for fv in fvs: + if fv[0] == "restore_count": + assert int(fv[1]) == restore_count[key] + 1 + elif fv[0] == "state": + assert fv[1] == "reconciled" + +def check_port_oper_status(appl_db, port_name, state): + portTbl = swsscommon.Table(appl_db, swsscommon.APP_PORT_TABLE_NAME) + (status, fvs) = portTbl.get(port_name) + assert status == True + + oper_status = "unknown" + for v in fvs: + if v[0] == "oper_status": + oper_status = v[1] + break + assert oper_status == state + +# function to check the restore count incremented by 1 for a single process +def swss_app_check_RestoreCount_single(state_db, restore_count, name): + warmtbl = swsscommon.Table(state_db, swsscommon.STATE_WARM_RESTART_TABLE_NAME) + keys = warmtbl.getKeys() + print(keys) + print(restore_count) + assert len(keys) > 0 + for key in keys: + if key != name: + continue + (status, fvs) = warmtbl.get(key) + assert status == True + for fv in fvs: + if fv[0] == "restore_count": + assert int(fv[1]) == restore_count[key] + 1 + elif fv[0] == "state": + assert fv[1] == "reconciled" + +def swss_app_check_warmstart_state(state_db, name, state): + warmtbl = swsscommon.Table(state_db, swsscommon.STATE_WARM_RESTART_TABLE_NAME) + keys = warmtbl.getKeys() + print(keys) + assert len(keys) > 0 + for key in keys: + if key != name: + continue + (status, fvs) = warmtbl.get(key) + assert status == True + for fv in fvs: + if fv[0] == "state": + assert fv[1] == state + +def create_entry(tbl, key, pairs): + fvs = swsscommon.FieldValuePairs(pairs) + tbl.set(key, fvs) + + # FIXME: better to wait until DB create them + time.sleep(1) + +def create_entry_tbl(db, table, key, pairs): + tbl = swsscommon.Table(db, table) + create_entry(tbl, key, pairs) + +def del_entry_tbl(db, table, key): + tbl = swsscommon.Table(db, table) + tbl._del(key) + +def create_entry_pst(db, table, key, pairs): + tbl = swsscommon.ProducerStateTable(db, table) + create_entry(tbl, key, pairs) + +def how_many_entries_exist(db, table): + tbl = swsscommon.Table(db, table) + return len(tbl.getKeys()) + +def test_PortSyncdWarmRestart(dvs, testlog): + + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + dvs.runcmd("config warm_restart enable swss") + + dvs.runcmd("ifconfig Ethernet16 up") + dvs.runcmd("ifconfig Ethernet20 up") + + time.sleep(1) + + dvs.runcmd("ifconfig Ethernet16 11.0.0.1/29 up") + dvs.runcmd("ifconfig Ethernet20 11.0.0.9/29 up") + + dvs.servers[4].runcmd("ip link set down dev eth0") == 0 + dvs.servers[4].runcmd("ip link set up dev eth0") == 0 + dvs.servers[4].runcmd("ifconfig eth0 11.0.0.2/29") + dvs.servers[4].runcmd("ip route add default via 11.0.0.1") + + dvs.servers[5].runcmd("ip link set down dev eth0") == 0 + dvs.servers[5].runcmd("ip link set up dev eth0") == 0 + dvs.servers[5].runcmd("ifconfig eth0 11.0.0.10/29") + dvs.servers[5].runcmd("ip route add default via 11.0.0.9") + + time.sleep(1) + + # Ethernet port oper status should be up + check_port_oper_status(appl_db, "Ethernet16", "up") + check_port_oper_status(appl_db, "Ethernet20", "up") + + # Ping should work between servers via vs vlan interfaces + ping_stats = dvs.servers[4].runcmd("ping -c 1 11.0.0.10") + time.sleep(1) + + neighTbl = swsscommon.Table(appl_db, "NEIGH_TABLE") + (status, fvs) = neighTbl.get("Ethernet16:11.0.0.2") + assert status == True + + (status, fvs) = neighTbl.get("Ethernet20:11.0.0.10") + assert status == True + + restore_count = swss_get_RestoreCount(dvs, state_db) + + # restart portsyncd + dvs.runcmd(['sh', '-c', 'pkill -x portsyncd']) + + pubsub = dvs.SubscribeAsicDbObject("SAI_OBJECT_TYPE") + dvs.runcmd(['sh', '-c', 'supervisorctl start portsyncd']) + + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub) + assert nadd == 0 + assert ndel == 0 + + #new ip on server 5 + dvs.servers[5].runcmd("ifconfig eth0 11.0.0.11/29") + + # Ping should work between servers via vs Ethernet interfaces + ping_stats = dvs.servers[4].runcmd("ping -c 1 11.0.0.11") + + # new neighbor learn on VS + (status, fvs) = neighTbl.get("Ethernet20:11.0.0.11") + assert status == True + + # Port state change reflected in appDB correctly + dvs.servers[6].runcmd("ip link set down dev eth0") == 0 + dvs.servers[6].runcmd("ip link set up dev eth0") == 0 + time.sleep(1) + + check_port_oper_status(appl_db, "Ethernet16", "up") + check_port_oper_status(appl_db, "Ethernet20", "up") + check_port_oper_status(appl_db, "Ethernet24", "up") + + + swss_app_check_RestoreCount_single(state_db, restore_count, "portsyncd") + + +def test_VlanMgrdWarmRestart(dvs, testlog): + + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + dvs.runcmd("ifconfig Ethernet16 0") + dvs.runcmd("ifconfig Ethernet20 0") + + dvs.runcmd("ifconfig Ethernet16 up") + dvs.runcmd("ifconfig Ethernet20 up") + + time.sleep(1) + + dvs.runcmd("config warm_restart enable swss") + + # create vlan + create_entry_tbl( + conf_db, + "VLAN", "Vlan16", + [ + ("vlanid", "16"), + ] + ) + # create vlan + create_entry_tbl( + conf_db, + "VLAN", "Vlan20", + [ + ("vlanid", "20"), + ] + ) + # create vlan member entry in config db. Don't use Ethernet0/4/8/12 as IP configured on them in previous testing. + create_entry_tbl( + conf_db, + "VLAN_MEMBER", "Vlan16|Ethernet16", + [ + ("tagging_mode", "untagged"), + ] + ) + + create_entry_tbl( + conf_db, + "VLAN_MEMBER", "Vlan20|Ethernet20", + [ + ("tagging_mode", "untagged"), + ] + ) + + time.sleep(1) + + dvs.runcmd("ifconfig Vlan16 11.0.0.1/29 up") + dvs.runcmd("ifconfig Vlan20 11.0.0.9/29 up") + + dvs.servers[4].runcmd("ifconfig eth0 11.0.0.2/29") + dvs.servers[4].runcmd("ip route add default via 11.0.0.1") + + dvs.servers[5].runcmd("ifconfig eth0 11.0.0.10/29") + dvs.servers[5].runcmd("ip route add default via 11.0.0.9") + + time.sleep(1) + + # Ping should work between servers via vs vlan interfaces + ping_stats = dvs.servers[4].runcmd("ping -c 1 11.0.0.10") + time.sleep(1) + + tbl = swsscommon.Table(appl_db, "NEIGH_TABLE") + (status, fvs) = tbl.get("Vlan16:11.0.0.2") + assert status == True + + (status, fvs) = tbl.get("Vlan20:11.0.0.10") + assert status == True + + (exitcode, bv_before) = dvs.runcmd("bridge vlan") + print(bv_before) + + restore_count = swss_get_RestoreCount(dvs, state_db) + + dvs.runcmd(['sh', '-c', 'pkill -x vlanmgrd']) + + pubsub = dvs.SubscribeAsicDbObject("SAI_OBJECT_TYPE") + + dvs.runcmd(['sh', '-c', 'supervisorctl start vlanmgrd']) + time.sleep(2) + + (exitcode, bv_after) = dvs.runcmd("bridge vlan") + assert bv_after == bv_before + + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub, ignore=["SAI_OBJECT_TYPE_FDB_ENTRY"]) + assert nadd == 0 + assert ndel == 0 + + #new ip on server 5 + dvs.servers[5].runcmd("ifconfig eth0 11.0.0.11/29") + + # Ping should work between servers via vs vlan interfaces + ping_stats = dvs.servers[4].runcmd("ping -c 1 11.0.0.11") + + # new neighbor learn on VS + (status, fvs) = tbl.get("Vlan20:11.0.0.11") + assert status == True + + swss_app_check_RestoreCount_single(state_db, restore_count, "vlanmgrd") + +def stop_neighsyncd(dvs): + dvs.runcmd(['sh', '-c', 'pkill -x neighsyncd']) + +def start_neighsyncd(dvs): + dvs.runcmd(['sh', '-c', 'supervisorctl start neighsyncd']) + +def check_no_neighsyncd_timer(dvs): + (exitcode, string) = dvs.runcmd(['sh', '-c', 'grep getWarmStartTimer /var/log/syslog | grep neighsyncd | grep invalid']) + assert string.strip() != "" + +def check_neighsyncd_timer(dvs, timer_value): + (exitcode, num) = dvs.runcmd(['sh', '-c', "grep getWarmStartTimer /var/log/syslog | grep neighsyncd | tail -n 1 | rev | cut -d ' ' -f 1 | rev"]) + assert num.strip() == timer_value + +# function to check neighbor entry reconciliation status written in syslog +def check_syslog_for_neighbor_entry(dvs, marker, new_cnt, delete_cnt, iptype): + # check reconciliation results (new or delete entries) for ipv4 and ipv6 + if iptype == "ipv4" or iptype == "ipv6": + (exitcode, num) = dvs.runcmd(['sh', '-c', "awk \'/%s/,ENDFILE {print;}\' /var/log/syslog | grep neighsyncd | grep cache-state:NEW | grep -i %s | wc -l" % (marker, iptype)]) + assert num.strip() == str(new_cnt) + (exitcode, num) = dvs.runcmd(['sh', '-c', "awk \'/%s/,ENDFILE {print;}\' /var/log/syslog | grep neighsyncd | grep -E \"cache-state:(DELETE|STALE)\" | grep -i %s | wc -l" % (marker, iptype)]) + assert num.strip() == str(delete_cnt) + else: + assert "iptype is unknown" == "" + +def test_swss_neighbor_syncup(dvs, testlog): + + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + dvs.runcmd("config warm_restart enable swss") + + # + # Testcase1: + # Add neighbor entries in linux kernel, appDB should get all of them + # + + # create neighbor entries (4 ipv4 and 4 ip6, two each on each interface) in linux kernel + intfs = ["Ethernet24", "Ethernet28"] + #enable ipv6 on docker + dvs.runcmd("sysctl net.ipv6.conf.all.disable_ipv6=0") + + dvs.runcmd("ifconfig {} 24.0.0.1/24 up".format(intfs[0])) + dvs.runcmd("ip -6 addr add 2400::1/64 dev {}".format(intfs[0])) + + dvs.runcmd("ifconfig {} 28.0.0.1/24 up".format(intfs[1])) + dvs.runcmd("ip -6 addr add 2800::1/64 dev {}".format(intfs[1])) + + ips = ["24.0.0.2", "24.0.0.3", "28.0.0.2", "28.0.0.3"] + v6ips = ["2400::2", "2400::3", "2800::2", "2800::3"] + + macs = ["00:00:00:00:24:02", "00:00:00:00:24:03", "00:00:00:00:28:02", "00:00:00:00:28:03"] + + for i in range(len(ips)): + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + + for i in range(len(v6ips)): + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + + time.sleep(1) + + # Check the neighbor entries are inserted correctly + db = swsscommon.DBConnector(0, dvs.redis_sock, 0) + tbl = swsscommon.Table(db, "NEIGH_TABLE") + + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # + # Testcase 2: + # Restart neighsyncd without change neighbor entries, nothing should be sent to appDB or sairedis, + # appDB should be kept the same. + # + + # get restore_count + restore_count = swss_get_RestoreCount(dvs, state_db) + + # stop neighsyncd and sairedis.rec + stop_neighsyncd(dvs) + marker = dvs.add_log_marker() + pubsub = dvs.SubscribeAsicDbObject("SAI_OBJECT_TYPE_NEIGHBOR_ENTRY") + start_neighsyncd(dvs) + time.sleep(10) + + # Check the neighbor entries are still in appDB correctly + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + check_syslog_for_neighbor_entry(dvs, marker, 0, 0, "ipv4") + check_syslog_for_neighbor_entry(dvs, marker, 0, 0, "ipv6") + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub) + assert nadd == 0 + assert ndel == 0 + + # check restore Count + swss_app_check_RestoreCount_single(state_db, restore_count, "neighsyncd") + + # + # Testcase 3: + # stop neighsyncd, delete even nummber ipv4/ipv6 neighbor entries from each interface, warm start neighsyncd. + # the neighsyncd is supposed to sync up the entries from kernel after warm restart + # note: there was an issue for neighbor delete, it will be marked as FAILED instead of deleted in kernel + # but it will send netlink message to be removed from appDB, so it works ok here, + # just that if we want to add the same neighbor again, use "change" instead of "add" + + # get restore_count + restore_count = swss_get_RestoreCount(dvs, state_db) + + # stop neighsyncd + stop_neighsyncd(dvs) + marker = dvs.add_log_marker() + + # delete even nummber of ipv4/ipv6 neighbor entries from each interface + for i in range(0, len(ips), 2): + dvs.runcmd("ip neigh del {} dev {}".format(ips[i], intfs[i%2])) + + for i in range(0, len(v6ips), 2): + dvs.runcmd("ip -6 neigh del {} dev {}".format(v6ips[i], intfs[i%2])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # check ipv4 and ipv6 neighbors + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + #should not see deleted neighbor entries + if i %2 == 0: + assert status == False + continue + else: + assert status == True + + #undeleted entries should still be there. + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + #should not see deleted neighbor entries + if i %2 == 0: + assert status == False + continue + else: + assert status == True + + #undeleted entries should still be there. + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + # 2 deletes each for ipv4 and ipv6 + # 4 neighbor removal in asic db + check_syslog_for_neighbor_entry(dvs, marker, 0, 2, "ipv4") + check_syslog_for_neighbor_entry(dvs, marker, 0, 2, "ipv6") + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub) + assert nadd == 0 + assert ndel == 4 + + # check restore Count + swss_app_check_RestoreCount_single(state_db, restore_count, "neighsyncd") + + + # + # Testcase 4: + # Stop neighsyncd, add even nummber of ipv4/ipv6 neighbor entries to each interface again, + # Start neighsyncd + # The neighsyncd is supposed to sync up the entries from kernel after warm restart + # Check the timer is not retrieved from configDB since it is not configured + + # get restore_count + restore_count = swss_get_RestoreCount(dvs, state_db) + + # stop neighsyncd + stop_neighsyncd(dvs) + marker = dvs.add_log_marker() + + # add even nummber of ipv4/ipv6 neighbor entries to each interface + # use "change" if neighbor is in FAILED state + for i in range(0, len(ips), 2): + (rc, output) = dvs.runcmd(['sh', '-c', "ip -4 neigh | grep {}".format(ips[i])]) + print output + if rc == 0: + dvs.runcmd("ip neigh change {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + else: + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + + for i in range(0, len(v6ips), 2): + (rc, output) = dvs.runcmd(['sh', '-c', "ip -6 neigh | grep {}".format(v6ips[i])]) + print output + if rc == 0: + dvs.runcmd("ip -6 neigh change {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + else: + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # no neighsyncd timer configured + check_no_neighsyncd_timer(dvs) + + # check ipv4 and ipv6 neighbors, should see all neighbors + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and asic db for activities + # 2 news entries for ipv4 and ipv6 each + # 4 neighbor creation in asic db + check_syslog_for_neighbor_entry(dvs, marker, 2, 0, "ipv4") + check_syslog_for_neighbor_entry(dvs, marker, 2, 0, "ipv6") + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub) + assert nadd == 4 + assert ndel == 0 + + # check restore Count + swss_app_check_RestoreCount_single(state_db, restore_count, "neighsyncd") + + # + # Testcase 5: + # Even number of ip4/6 neigbors updated with new mac. + # Odd number of ipv4/6 neighbors removed and added to different interfaces. + # neighbor syncd should sync it up after warm restart + # include the timer settings in this testcase + + # setup timer in configDB + timer_value = "15" + + dvs.runcmd("config warm_restart neighsyncd_timer {}".format(timer_value)) + + # get restore_count + restore_count = swss_get_RestoreCount(dvs, state_db) + + # stop neighsyncd + stop_neighsyncd(dvs) + marker = dvs.add_log_marker() + + # Even number of ip4/6 neigbors updated with new mac. + # Odd number of ipv4/6 neighbors removed and added to different interfaces. + newmacs = ["00:00:00:01:12:02", "00:00:00:01:12:03", "00:00:00:01:16:02", "00:00:00:01:16:03"] + + for i in range(len(ips)): + if i % 2 == 0: + dvs.runcmd("ip neigh change {} dev {} lladdr {}".format(ips[i], intfs[i%2], newmacs[i])) + else: + dvs.runcmd("ip neigh del {} dev {}".format(ips[i], intfs[i%2])) + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[1-i%2], macs[i])) + + for i in range(len(v6ips)): + if i % 2 == 0: + dvs.runcmd("ip -6 neigh change {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], newmacs[i])) + else: + dvs.runcmd("ip -6 neigh del {} dev {}".format(v6ips[i], intfs[i%2])) + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[1-i%2], macs[i])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # timer is not expired yet, state should be "restored" + swss_app_check_warmstart_state(state_db, "neighsyncd", "restored") + time.sleep(10) + + # check neigh syncd timer is retrived from configDB + check_neighsyncd_timer(dvs, timer_value) + + # check ipv4 and ipv6 neighbors, should see all neighbors with updated info + for i in range(len(ips)): + if i % 2 == 0: + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == newmacs[i] + if v[0] == "family": + assert v[1] == "IPv4" + else: + (status, fvs) = tbl.get("{}:{}".format(intfs[1-i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + if i % 2 == 0: + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == newmacs[i] + if v[0] == "family": + assert v[1] == "IPv6" + else: + (status, fvs) = tbl.get("{}:{}".format(intfs[1-i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + time.sleep(2) + + # check syslog and asic db for activities + # 4 news, 2 deletes for ipv4 and ipv6 each + # 4 create, 4 set, 4 removes for neighbor in asic db + check_syslog_for_neighbor_entry(dvs, marker, 4, 2, "ipv4") + check_syslog_for_neighbor_entry(dvs, marker, 4, 2, "ipv6") + (nadd, ndel) = dvs.CountSubscribedObjects(pubsub) + assert nadd == 8 + assert ndel == 4 + + # check restore Count + swss_app_check_RestoreCount_single(state_db, restore_count, "neighsyncd") + + +# TODO: The condition of warm restart readiness check is still under discussion. +def test_OrchagentWarmRestartReadyCheck(dvs, testlog): + + # do a pre-cleanup + dvs.runcmd("ip -s -s neigh flush all") + time.sleep(1) + + dvs.runcmd("config warm_restart enable swss") + + dvs.runcmd("ifconfig Ethernet0 10.0.0.0/31 up") + dvs.runcmd("ifconfig Ethernet4 10.0.0.2/31 up") + + dvs.servers[0].runcmd("ifconfig eth0 10.0.0.1/31") + dvs.servers[0].runcmd("ip route add default via 10.0.0.0") + + dvs.servers[1].runcmd("ifconfig eth0 10.0.0.3/31") + dvs.servers[1].runcmd("ip route add default via 10.0.0.2") + + + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + ps = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_ROUTE_TABLE_NAME) + fvs = swsscommon.FieldValuePairs([("nexthop","10.0.0.1"), ("ifname", "Ethernet0")]) + + ps.set("2.2.2.0/24", fvs) + + time.sleep(1) + # Should fail, since neighbor for next 10.0.0.1 has not been not resolved yet + (exitcode, result) = dvs.runcmd("/usr/bin/orchagent_restart_check") + assert result == "RESTARTCHECK failed\n" + + # Should succeed, the option for skipPendingTaskCheck -s and noFreeze -n have been provided. + # Wait up to 500 milliseconds for response from orchagent. Default wait time is 1000 milliseconds. + (exitcode, result) = dvs.runcmd("/usr/bin/orchagent_restart_check -n -s -w 500") + assert result == "RESTARTCHECK succeeded\n" + + # get neighbor and arp entry + dvs.servers[1].runcmd("ping -c 1 10.0.0.1") + + time.sleep(1) + (exitcode, result) = dvs.runcmd("/usr/bin/orchagent_restart_check") + assert result == "RESTARTCHECK succeeded\n" + + # Should fail since orchagent has been frozen at last step. + (exitcode, result) = dvs.runcmd("/usr/bin/orchagent_restart_check -n -s -w 500") + assert result == "RESTARTCHECK failed\n" + + # Cleaning previously pushed route-entry to ease life of subsequent testcases. + del_entry_tbl(appl_db, swsscommon.APP_ROUTE_TABLE_NAME, "2.2.2.0/24") + + # recover for test cases after this one. + dvs.stop_swss() + dvs.start_swss() + time.sleep(5) + +def test_swss_port_state_syncup(dvs, testlog): + + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + dvs.runcmd("config warm_restart enable swss") + + tbl = swsscommon.Table(appl_db, swsscommon.APP_PORT_TABLE_NAME) + + restore_count = swss_get_RestoreCount(dvs, state_db) + + # update port admin state + dvs.runcmd("ifconfig Ethernet0 10.0.0.0/31 up") + dvs.runcmd("ifconfig Ethernet4 10.0.0.2/31 up") + dvs.runcmd("ifconfig Ethernet8 10.0.0.4/31 up") + + dvs.runcmd("arp -s 10.0.0.1 00:00:00:00:00:01") + dvs.runcmd("arp -s 10.0.0.3 00:00:00:00:00:02") + dvs.runcmd("arp -s 10.0.0.5 00:00:00:00:00:03") + + dvs.servers[0].runcmd("ip link set down dev eth0") == 0 + dvs.servers[1].runcmd("ip link set down dev eth0") == 0 + dvs.servers[2].runcmd("ip link set down dev eth0") == 0 + + dvs.servers[2].runcmd("ip link set up dev eth0") == 0 + + time.sleep(3) + + for i in [0, 1, 2]: + (status, fvs) = tbl.get("Ethernet%d" % (i * 4)) + assert status == True + oper_status = "unknown" + for v in fvs: + if v[0] == "oper_status": + oper_status = v[1] + break + if i == 2: + assert oper_status == "up" + else: + assert oper_status == "down" + + dvs.stop_swss() + time.sleep(3) + + # flap the port oper status for Ethernet0, Ethernet4 and Ethernet8 + dvs.servers[0].runcmd("ip link set down dev eth0") == 0 + dvs.servers[1].runcmd("ip link set down dev eth0") == 0 + dvs.servers[2].runcmd("ip link set down dev eth0") == 0 + + dvs.servers[0].runcmd("ip link set up dev eth0") == 0 + dvs.servers[1].runcmd("ip link set up dev eth0") == 0 + + time.sleep(5) + dvs.start_swss() + time.sleep(10) + + swss_check_RestoreCount(dvs, state_db, restore_count) + + for i in [0, 1, 2]: + (status, fvs) = tbl.get("Ethernet%d" % (i * 4)) + assert status == True + oper_status = "unknown" + for v in fvs: + if v[0] == "oper_status": + oper_status = v[1] + break + if i == 2: + assert oper_status == "down" + else: + assert oper_status == "up" + + +############################################################################# +# # +# Routing Warm-Restart Testing # +# # +############################################################################# + + +def set_restart_timer(dvs, db, app_name, value): + create_entry_tbl( + db, + swsscommon.CFG_WARM_RESTART_TABLE_NAME, app_name, + [ + (app_name + "_timer", value), + ] + ) + + +# Temporary instruction to activate warm_restart. To be deleted once equivalent CLI +# function is pushed to sonic-utils. +def enable_warmrestart(dvs, db, app_name): + create_entry_tbl( + db, + swsscommon.CFG_WARM_RESTART_TABLE_NAME, app_name, + [ + ("enable", "true"), + ] + ) + + +################################################################################ +# +# Routing warm-restart testcases +# +################################################################################ + +def test_routing_WarmRestart(dvs, testlog): + + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + # Restart-timer to utilize during the following testcases + restart_timer = 10 + + + ############################################################################# + # + # Baseline configuration + # + ############################################################################# + + + # Defining create neighbor entries (4 ipv4 and 4 ip6, two each on each interface) in linux kernel + intfs = ["Ethernet0", "Ethernet4", "Ethernet8"] + + # Enable ipv6 on docker + dvs.runcmd("sysctl net.ipv6.conf.all.disable_ipv6=0") + + dvs.runcmd("ip -4 addr add 111.0.0.1/24 dev {}".format(intfs[0])) + dvs.runcmd("ip -6 addr add 1110::1/64 dev {}".format(intfs[0])) + dvs.runcmd("ip link set {} up".format(intfs[0])) + + dvs.runcmd("ip -4 addr add 122.0.0.1/24 dev {}".format(intfs[1])) + dvs.runcmd("ip -6 addr add 1220::1/64 dev {}".format(intfs[1])) + dvs.runcmd("ip link set {} up".format(intfs[1])) + + dvs.runcmd("ip -4 addr add 133.0.0.1/24 dev {}".format(intfs[2])) + dvs.runcmd("ip -6 addr add 1330::1/64 dev {}".format(intfs[2])) + dvs.runcmd("ip link set {} up".format(intfs[2])) + + time.sleep(1) + + # + # Setting peer's ip-addresses and associated neighbor-entries + # + ips = ["111.0.0.2", "122.0.0.2", "133.0.0.2"] + v6ips = ["1110::2", "1220::2", "1330::2"] + macs = ["00:00:00:00:11:02", "00:00:00:00:12:02", "00:00:00:00:13:02"] + + for i in range(len(ips)): + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + + for i in range(len(v6ips)): + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + + time.sleep(1) + + # + # Defining baseline IPv4 non-ecmp route-entries + # + dvs.runcmd("ip route add 192.168.1.100/32 nexthop via 111.0.0.2") + dvs.runcmd("ip route add 192.168.1.200/32 nexthop via 122.0.0.2") + dvs.runcmd("ip route add 192.168.1.300/32 nexthop via 133.0.0.2") + + # + # Defining baseline IPv4 ecmp route-entries + # + dvs.runcmd("ip route add 192.168.1.1/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + dvs.runcmd("ip route add 192.168.1.2/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + dvs.runcmd("ip route add 192.168.1.3/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2") + + # + # Defining baseline IPv6 non-ecmp route-entries + # + dvs.runcmd("ip -6 route add fc00:11:11::1/128 nexthop via 1110::2") + dvs.runcmd("ip -6 route add fc00:12:12::1/128 nexthop via 1220::2") + dvs.runcmd("ip -6 route add fc00:13:13::1/128 nexthop via 1330::2") + + # + # Defining baseline IPv6 ecmp route-entries + # + dvs.runcmd("ip -6 route add fc00:1:1::1/128 nexthop via 1110::2 nexthop via 1220::2 nexthop via 1330::2") + dvs.runcmd("ip -6 route add fc00:2:2::1/128 nexthop via 1110::2 nexthop via 1220::2 nexthop via 1330::2") + dvs.runcmd("ip -6 route add fc00:3:3::1/128 nexthop via 1110::2 nexthop via 1220::2") + + time.sleep(5) + + # Enabling some extra logging for troubleshooting purposes + dvs.runcmd("swssloglevel -l INFO -c fpmsyncd") + + # Subscribe to pubsub channels for routing-state associated to swss and sairedis dbs + pubsubAppDB = dvs.SubscribeAppDbObject("ROUTE_TABLE") + pubsubAsicDB = dvs.SubscribeAsicDbObject("SAI_OBJECT_TYPE_ROUTE_ENTRY") + + + ############################################################################# + # + # Testcase 1. Having routing-warm-reboot disabled, restart zebra and verify + # that the traditional/cold-boot logic is followed. + # + ############################################################################# + + # Restart zebra + dvs.stop_zebra() + dvs.start_zebra() + + time.sleep(5) + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "") + + # Verify that multiple changes are seen in swss and sairedis logs as there's + # no warm-reboot logic in place. + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) != 0 + + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) != 0 + + + ############################################################################# + # + # Testcase 2. Restart zebra and make no control-plane changes. + # For this and all subsequent test-cases routing-warm-reboot + # feature will be kept enabled. + # + ############################################################################# + + + # Enabling bgp warmrestart and setting restart timer. + # The following two instructions will be substituted by the commented ones + # once the later ones are added to sonic-utilities repo. + enable_warmrestart(dvs, conf_db, "bgp") + set_restart_timer(dvs, conf_db, "bgp", str(restart_timer)) + #dvs.runcmd("config warm_restart enable bgp") + #dvs.runcmd("config warm_restart bgp_timer {}".format(restart_timer)) + + time.sleep(1) + + # Restart zebra + dvs.stop_zebra() + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify swss changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + # Verify swss changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + + ############################################################################# + # + # Testcase 3. Restart zebra and add one new non-ecmp IPv4 prefix + # + ############################################################################# + + # Stop zebra + dvs.stop_zebra() + + # Add new prefix + dvs.runcmd("ip route add 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.100.0/24" + assert rt_val == {"ifname": "Ethernet0", "nexthop": "111.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.100.0/24" + + + ############################################################################# + # + # Testcase 4. Restart zebra and withdraw one non-ecmp IPv4 prefix + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Delete prefix + dvs.runcmd("ip route del 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key == "192.168.100.0/24" + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key['dest'] == "192.168.100.0/24" + + + ############################################################################# + # + # Testcase 5. Restart zebra and add a new IPv4 ecmp-prefix + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Add prefix + dvs.runcmd("ip route add 192.168.200.0/24 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.200.0/24" + assert rt_val == {"ifname": "Ethernet0,Ethernet4,Ethernet8", "nexthop": "111.0.0.2,122.0.0.2,133.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.200.0/24" + + + ############################################################################# + # + # Testcase 6. Restart zebra and delete one existing IPv4 ecmp-prefix. + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Delete prefix + dvs.runcmd("ip route del 192.168.200.0/24 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key == "192.168.200.0/24" + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key['dest'] == "192.168.200.0/24" + + + ############################################################################# + # + # Testcase 7. Restart zebra and add one new path to an IPv4 ecmp-prefix + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Add new path + dvs.runcmd("ip route del 192.168.1.3/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2") + dvs.runcmd("ip route add 192.168.1.3/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.1.3" + assert rt_val == {"ifname": "Ethernet0,Ethernet4,Ethernet8", "nexthop": "111.0.0.2,122.0.0.2,133.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.1.3/32" + + + ############################################################################# + # + # Testcase 8. Restart zebra and delete one ecmp-path from an IPv4 ecmp-prefix. + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Delete ecmp-path + dvs.runcmd("ip route del 192.168.1.3/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2 nexthop via 133.0.0.2") + dvs.runcmd("ip route add 192.168.1.3/32 nexthop via 111.0.0.2 nexthop via 122.0.0.2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.1.3" + assert rt_val == {"ifname": "Ethernet0,Ethernet4", "nexthop": "111.0.0.2,122.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.1.3/32" + + + ############################################################################# + # + # Testcase 9. Restart zebra and add one new non-ecmp IPv6 prefix + # + ############################################################################# + + + # Stop zebra + dvs.stop_zebra() + + # Add prefix + dvs.runcmd("ip -6 route add fc00:4:4::1/128 nexthop via 1110::2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "fc00:4:4::1" + assert rt_val == {"ifname": "Ethernet0", "nexthop": "1110::2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "fc00:4:4::1/128" + + + ############################################################################# + # + # Testcase 10. Restart zebra and withdraw one non-ecmp IPv6 prefix + # + ############################################################################# + + # Stop zebra + dvs.stop_zebra() + + # Delete prefix + dvs.runcmd("ip -6 route del fc00:4:4::1/128 nexthop via 1110::2") + time.sleep(1) + + # Start zebra + dvs.start_zebra() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key == "fc00:4:4::1" + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key['dest'] == "fc00:4:4::1/128" + + + ############################################################################# + # + # Testcase 11. Restart fpmsyncd and make no control-plane changes. + # + ############################################################################# + + + # Stop fpmsyncd + dvs.stop_fpmsyncd() + + # Start fpmsyncd + dvs.start_fpmsyncd() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify swss changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + # Verify sairedis changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + + ############################################################################# + # + # Testcase 12. Restart fpmsyncd and add one new non-ecmp IPv4 prefix + # + ############################################################################# + + + # Stop fpmsyncd + dvs.stop_fpmsyncd() + + # Add new prefix + dvs.runcmd("ip route add 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + + # Start fpmsyncd + dvs.start_fpmsyncd() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.100.0/24" + assert rt_val == {"ifname": "Ethernet0", "nexthop": "111.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.100.0/24" + + + ############################################################################# + # + # Testcase 13. Restart fpmsyncd and withdraw one non-ecmp IPv4 prefix + # + ############################################################################# + + + # Stop fpmsyncd + dvs.stop_fpmsyncd() + + # Delete prefix + dvs.runcmd("ip route del 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + + # Start fpmsyncd + dvs.start_fpmsyncd() + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key == "192.168.100.0/24" + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 1 + rt_key = json.loads(delobjs[0]['key']) + assert rt_key['dest'] == "192.168.100.0/24" + + + ############################################################################# + # + # Testcase 14. Restart zebra and add/remove a new non-ecmp IPv4 prefix. As + # the 'delete' instruction would arrive after the 'add' one, no + # changes should be pushed down to SwSS. + # + ############################################################################# + + + # Restart zebra + dvs.stop_zebra() + dvs.start_zebra() + + # Add/delete new prefix + dvs.runcmd("ip route add 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + dvs.runcmd("ip route del 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify swss changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + # Verify swss changes -- none are expected this time + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 0 and len(delobjs) == 0 + + + ############################################################################# + # + # Testcase 15. Restart zebra and generate an add/remove/add for new non-ecmp + # IPv4 prefix. Verify that only the second 'add' instruction is + # honored and the corresponding update passed down to SwSS. + # + ############################################################################# + + + # Restart zebra + dvs.stop_zebra() + dvs.start_zebra() + + marker1 = dvs.add_log_marker("/var/log/swss/swss.rec") + marker2 = dvs.add_log_marker("/var/log/swss/sairedis.rec") + + # Add/delete new prefix + dvs.runcmd("ip route add 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + dvs.runcmd("ip route del 192.168.100.0/24 nexthop via 111.0.0.2") + time.sleep(1) + dvs.runcmd("ip route add 192.168.100.0/24 nexthop via 122.0.0.2") + time.sleep(1) + + # Verify FSM + swss_app_check_warmstart_state(state_db, "bgp", "restored") + time.sleep(restart_timer + 1) + swss_app_check_warmstart_state(state_db, "bgp", "reconciled") + + # Verify the changed prefix is seen in swss + (addobjs, delobjs) = dvs.GetSubscribedAppDbObjects(pubsubAppDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + rt_val = json.loads(addobjs[0]['vals']) + assert rt_key == "192.168.100.0/24" + assert rt_val == {"ifname": "Ethernet4", "nexthop": "122.0.0.2"} + + # Verify the changed prefix is seen in sairedis + (addobjs, delobjs) = dvs.GetSubscribedAsicDbObjects(pubsubAsicDB) + assert len(addobjs) == 1 and len(delobjs) == 0 + rt_key = json.loads(addobjs[0]['key']) + assert rt_key['dest'] == "192.168.100.0/24" + + diff --git a/warmrestart/warmRestartHelper.cpp b/warmrestart/warmRestartHelper.cpp new file mode 100644 index 0000000000..580e9f98a6 --- /dev/null +++ b/warmrestart/warmRestartHelper.cpp @@ -0,0 +1,378 @@ +#include +#include + +#include "warmRestartHelper.h" + + +using namespace swss; + + +WarmStartHelper::WarmStartHelper(RedisPipeline *pipeline, + ProducerStateTable *syncTable, + const std::string &syncTableName, + const std::string &dockerName, + const std::string &appName) : + m_restorationTable(pipeline, syncTableName, false), + m_syncTable(syncTable), + m_syncTableName(syncTableName), + m_dockName(dockerName), + m_appName(appName) +{ + WarmStart::initialize(appName, dockerName); +} + + +WarmStartHelper::~WarmStartHelper() +{ +} + + +void WarmStartHelper::setState(WarmStart::WarmStartState state) +{ + WarmStart::setWarmStartState(m_appName, state); + + /* Caching warm-restart FSM state in local member */ + m_state = state; +} + + +WarmStart::WarmStartState WarmStartHelper::getState(void) const +{ + return m_state; +} + + +/* + * To be called by each application to obtain the active/inactive state of + * warm-restart functionality, and proceed to initialize the FSM accordingly. + */ +bool WarmStartHelper::checkAndStart(void) +{ + bool enabled = WarmStart::checkWarmStart(m_appName, m_dockName); + + /* + * If warm-restart feature is enabled for this application, proceed to + * initialize its FSM, and clean any pending state that could be potentially + * held in ProducerState queues. + */ + if (enabled) + { + SWSS_LOG_NOTICE("Initializing Warm-Restart cycle for %s application.", + m_appName.c_str()); + + setState(WarmStart::INITIALIZED); + m_syncTable->clear(); + } + + /* Cleaning state from previous (unsuccessful) warm-restart attempts */ + m_restorationVector.clear(); + m_refreshMap.clear(); + + /* Keeping track of warm-reboot active/inactive state */ + m_enabled = enabled; + + return enabled; +} + + +bool WarmStartHelper::isReconciled(void) const +{ + return (m_state == WarmStart::RECONCILED); +} + + +bool WarmStartHelper::inProgress(void) const +{ + return (m_enabled && m_state != WarmStart::RECONCILED); +} + + +uint32_t WarmStartHelper::getRestartTimer(void) const +{ + return WarmStart::getWarmStartTimer(m_appName, m_dockName); +} + + +/* + * Invoked by warmStartHelper clients during initialization. All interested parties + * are expected to call this method to upload their associated redisDB state into + * a temporary buffer, which will eventually serve to resolve any conflict between + * 'old' and 'new' state. + */ +bool WarmStartHelper::runRestoration() +{ + SWSS_LOG_NOTICE("Warm-Restart: Initiating AppDB restoration process for %s " + "application.", m_appName.c_str()); + + m_restorationTable.getContent(m_restorationVector); + + /* + * If there's no AppDB state to restore, then alert callee right away to avoid + * iterating through the 'reconciliation' process. + */ + if (!m_restorationVector.size()) + { + SWSS_LOG_NOTICE("Warm-Restart: No records received from AppDB for %s " + "application.", m_appName.c_str()); + + setState(WarmStart::RECONCILED); + + return false; + } + + SWSS_LOG_NOTICE("Warm-Restart: Received %zu records from AppDB for %s " + "application.", + m_restorationVector.size(), + m_appName.c_str()); + + setState(WarmStart::RESTORED); + + SWSS_LOG_NOTICE("Warm-Restart: Completed AppDB restoration process for %s " + "application.", m_appName.c_str()); + + return true; +} + + +void WarmStartHelper::insertRefreshMap(const KeyOpFieldsValuesTuple &kfv) +{ + const std::string key = kfvKey(kfv); + + m_refreshMap[key] = kfv; +} + + +/* + * The reconciliation process takes place here. In essence, all we are doing + * is comparing the restored elements (old state) with the refreshed/new ones + * generated by the application once it completes its restart cycle. If a + * state-diff is found between these two, we will be honoring the refreshed + * one received from the application, and will proceed to push it down to AppDB. + */ +void WarmStartHelper::reconcile(void) +{ + SWSS_LOG_NOTICE("Warm-Restart: Initiating reconciliation process for %s " + "application.", m_appName.c_str()); + + assert(getState() == WarmStart::RESTORED); + + for (auto &restoredElem : m_restorationVector) + { + std::string restoredKey = kfvKey(restoredElem); + auto restoredFV = kfvFieldsValues(restoredElem); + + auto iter = m_refreshMap.find(restoredKey); + + /* + * If the restored element is not found in the refreshMap, we must + * push a delete operation for this entry. + */ + if (iter == m_refreshMap.end()) + { + SWSS_LOG_NOTICE("Warm-Restart reconciliation: deleting stale entry %s", + printKFV(restoredKey, restoredFV).c_str()); + + m_syncTable->del(restoredKey); + continue; + } + + /* + * If an explicit delete request is sent by the application, process it + * right away. + */ + else if (kfvOp(iter->second) == DEL_COMMAND) + { + SWSS_LOG_NOTICE("Warm-Restart reconciliation: deleting entry %s", + printKFV(restoredKey, restoredFV).c_str()); + + m_syncTable->del(restoredKey); + } + + /* + * If a matching entry is found in refreshMap, proceed to compare it + * with its restored counterpart. + */ + else + { + auto refreshedKey = kfvKey(iter->second); + auto refreshedFV = kfvFieldsValues(iter->second); + + if (compareAllFV(restoredFV, refreshedFV)) + { + SWSS_LOG_NOTICE("Warm-Restart reconciliation: updating entry %s", + printKFV(refreshedKey, refreshedFV).c_str()); + + m_syncTable->set(refreshedKey, refreshedFV); + } + else + { + SWSS_LOG_INFO("Warm-Restart reconciliation: no changes needed for " + "existing entry %s", + printKFV(refreshedKey, refreshedFV).c_str()); + } + } + + /* Deleting the just-processed restored entry from the refreshMap */ + m_refreshMap.erase(restoredKey); + } + + /* + * Iterate through all the entries left in the refreshMap, which correspond + * to brand-new entries to be pushed down to AppDB. + */ + for (auto &kfv : m_refreshMap) + { + auto refreshedKey = kfvKey(kfv.second); + auto refreshedOp = kfvOp(kfv.second); + auto refreshedFV = kfvFieldsValues(kfv.second); + + /* + * During warm-reboot, apps could receive an 'add' and a 'delete' for an + * entry that does not exist in AppDB. In these cases we must prevent the + * 'delete' from being pushed down to AppDB, so we are handling this case + * differently than the 'add' one. + */ + if(refreshedOp == DEL_COMMAND) + { + SWSS_LOG_NOTICE("Warm-Restart reconciliation: discarding non-existing" + " entry %s\n", + refreshedKey.c_str()); + } + else + { + SWSS_LOG_NOTICE("Warm-Restart reconciliation: introducing new entry %s", + printKFV(refreshedKey, refreshedFV).c_str()); + + m_syncTable->set(refreshedKey, refreshedFV); + } + } + + /* Clearing pending kfv's from refreshMap */ + m_refreshMap.clear(); + + /* Clearing restoration vector */ + m_restorationVector.clear(); + + setState(WarmStart::RECONCILED); + + SWSS_LOG_NOTICE("Warm-Restart: Concluded reconciliation process for %s " + "application.", m_appName.c_str()); +} + + +/* + * Compare all field-value-tuples within two vectors. + * + * Example: v1 {nexthop: 10.1.1.1, ifname: eth1} + * v2 {nexthop: 10.1.1.2, ifname: eth2} + * + * Returns: + * + * 'false' : If the content of both 'fields' and 'values' fully match + * 'true' : No full-match is found + */ +bool WarmStartHelper::compareAllFV(const std::vector &v1, + const std::vector &v2) +{ + std::unordered_map v1Map((v1.begin()), v1.end()); + + /* Iterate though all v2 tuples to check if their content match v1 ones */ + for (auto &v2fv : v2) + { + auto v1Iter = v1Map.find(v2fv.first); + /* + * The sizes of both tuple-vectors should always match within any + * given application. In other words, all fields within v1 should be + * also present in v2. + * + * To make this possible, every application should continue relying on a + * uniform schema to create/generate information. For example, fpmsyncd + * will be always expected to push FieldValueTuples with "nexthop" and + * "ifname" fields; neighsyncd is expected to make use of "family" and + * "neigh" fields, etc. The existing reconciliation logic will rely on + * this assumption. + */ + assert(v1Iter != v1Map.end()); + + if (compareOneFV(v1Map[fvField(*v1Iter)], fvValue(v2fv))) + { + return true; + } + } + + return false; +} + + +/* + * Compare the values of a single field-value within two different KFVs. + * + * Example: s1 {nexthop: 10.1.1.1, 10.1.1.2} + * s2 {nexthop: 10.1.1.2, 10.1.1.1} + * + * Example: s1 {Ethernet1, Ethernet2} + * s2 {Ethernet2, Ethernet1} + * + * Returns: + * + * 'false' : If the content of both strings fully matches + * 'true' : No full-match is found + */ +bool WarmStartHelper::compareOneFV(const std::string &s1, const std::string &s2) +{ + if (s1.size() != s2.size()) + { + return true; + } + + std::vector splitValuesS1 = tokenize(s1, ','); + std::vector splitValuesS2 = tokenize(s2, ','); + + if (splitValuesS1.size() != splitValuesS2.size()) + { + return true; + } + + std::sort(splitValuesS1.begin(), splitValuesS1.end()); + std::sort(splitValuesS2.begin(), splitValuesS2.end()); + + for (size_t i = 0; i < splitValuesS1.size(); i++) + { + if (splitValuesS1[i] != splitValuesS2[i]) + { + return true; + } + } + + return false; +} + + +/* + * Helper method to print KFVs in a friendly fashion. + * + * Example: + * + * 192.168.1.0/30 { nexthop: 10.2.2.1,10.1.2.1 | ifname: Ethernet116,Ethernet112 } + */ +const std::string WarmStartHelper::printKFV(const std::string &key, + const std::vector &fv) +{ + std::string res; + + res = key + " { "; + + for (size_t i = 0; i < fv.size(); ++i) + { + res += fv[i].first + ": " + fv[i].second; + + if (i != fv.size() - 1) + { + res += " | "; + } + } + + res += " } "; + + return res; +} diff --git a/warmrestart/warmRestartHelper.h b/warmrestart/warmRestartHelper.h new file mode 100644 index 0000000000..75af3c4b9d --- /dev/null +++ b/warmrestart/warmRestartHelper.h @@ -0,0 +1,83 @@ +#ifndef __WARMRESTART_HELPER__ +#define __WARMRESTART_HELPER__ + + +#include +#include +#include +#include + +#include "dbconnector.h" +#include "producerstatetable.h" +#include "netmsg.h" +#include "table.h" +#include "tokenize.h" +#include "warm_restart.h" + + +namespace swss { + + +class WarmStartHelper { + public: + + WarmStartHelper(RedisPipeline *pipeline, + ProducerStateTable *syncTable, + const std::string &syncTableName, + const std::string &dockerName, + const std::string &appName); + + ~WarmStartHelper(); + + /* fvVector type to be used to host AppDB restored elements */ + using kfvVector = std::vector; + + /* + * kfvMap type to be utilized to store all the new/refresh state coming + * from the restarting applications. + */ + using kfvMap = std::unordered_map; + + void setState(WarmStart::WarmStartState state); + + WarmStart::WarmStartState getState(void) const; + + bool checkAndStart(void); + + bool isReconciled(void) const; + + bool inProgress(void) const; + + uint32_t getRestartTimer(void) const; + + bool runRestoration(void); + + void insertRefreshMap(const KeyOpFieldsValuesTuple &kfv); + + void reconcile(void); + + const std::string printKFV(const std::string &key, + const std::vector &fv); + + private: + + bool compareAllFV(const std::vector &left, + const std::vector &right); + + bool compareOneFV(const std::string &v1, const std::string &v2); + + ProducerStateTable *m_syncTable; // producer-table to sync/push state to + Table m_restorationTable; // redis table to import current-state from + kfvVector m_restorationVector; // buffer struct to hold old state + kfvMap m_refreshMap; // buffer struct to hold new state + WarmStart::WarmStartState m_state; // cached value of warmStart's FSM state + bool m_enabled; // warm-reboot enabled/disabled status + std::string m_syncTableName; // producer-table-name to sync/push state to + std::string m_dockName; // sonic-docker requesting warmStart services + std::string m_appName; // sonic-app requesting warmStart services +}; + + +} + +#endif