Skip to content

Commit

Permalink
fix: recoverdata support load disk table (#3888)
Browse files Browse the repository at this point in the history
  • Loading branch information
vagetablechicken authored May 9, 2024
1 parent 7269141 commit c5ecca7
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docs/en/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client

### loadtable

1. Load an existing table
Load an existing table, only support memory table

Command format: `loadtable table_name tid pid ttl segment_cnt`

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client

### loadtable

1、加载已有表
加载已有表,只支持内存表

命令格式: loadtable table\_name tid pid ttl segment\_cnt

Expand Down
1 change: 0 additions & 1 deletion src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3260,7 +3260,6 @@ void HandleClientLoadTable(const std::vector<std::string> parts, ::openmldb::cli
return;
}
}
// TODO(): get status msg
auto st = client->LoadTable(parts[1], boost::lexical_cast<uint32_t>(parts[2]),
boost::lexical_cast<uint32_t>(parts[3]), ttl, is_leader, seg_cnt);
if (st.OK()) {
Expand Down
1 change: 1 addition & 0 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3039,6 +3039,7 @@ void TabletImpl::LoadTable(RpcController* controller, const ::openmldb::api::Loa
break;
}
std::string root_path;
// we can't know table is memory or disk, so set the right storage_mode in request message
bool ok = ChooseDBRootPath(tid, pid, table_meta.storage_mode(), root_path);
if (!ok) {
response->set_code(::openmldb::base::ReturnCode::kFailToGetDbRootPath);
Expand Down
61 changes: 35 additions & 26 deletions tools/openmldb_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,41 +97,43 @@ def CheckTable(executor, db, table_name):
return Status(-1, "role is not match")
return Status()

def RecoverPartition(executor, db, partitions, endpoint_status):
def RecoverPartition(executor, db, replicas, endpoint_status, storage):
"""recover all replicas of one partition"""
leader_pos = -1
max_offset = 0
table_name = partitions[0].GetName()
pid = partitions[0].GetPid()
for pos in range(len(partitions)):
partition = partitions[pos]
if partition.IsLeader() and partition.GetOffset() >= max_offset:
table_name = replicas[0].GetName()
pid = replicas[0].GetPid()
tid = replicas[0].GetTid()
for pos in range(len(replicas)):
replica = replicas[pos]
if replica.IsLeader() and replica.GetOffset() >= max_offset:
leader_pos = pos
if leader_pos < 0:
log.error("cannot find leader partition. db {db} name {table_name} partition {pid}".format(
db=db, table_name=table_name, pid=pid))
return Status(-1, "recover partition failed")
tid = partitions[0].GetTid()
leader_endpoint = partitions[leader_pos].GetEndpoint()
msg = "cannot find leader replica. db {db} name {table_name} partition {pid}".format(
db=db, table_name=table_name, pid=pid)
log.error(msg)
return Status(-1, "recover partition failed: {msg}".format(msg=msg))
leader_endpoint = replicas[leader_pos].GetEndpoint()
# recover leader
if "{tid}_{pid}".format(tid=tid, pid=pid) not in endpoint_status[leader_endpoint]:
log.info("leader partition is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
log.info("leader replica is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
status = executor.LoadTable(leader_endpoint, table_name, tid, pid)
status = executor.LoadTableHTTP(leader_endpoint, table_name, tid, pid, storage)
if not status.OK():
log.error("load table failed. db {db} name {table_name} tid {tid} pid {pid} endpoint {leader_endpoint} msg {status}".format(
db=db, table_name=table_name, tid=tid, pid=pid, leader_endpoint=leader_endpoint, status=status.GetMsg()))
return Status(-1, "recover partition failed")
if not partitions[leader_pos].IsAlive():
return status
if not replicas[leader_pos].IsAlive():
status = executor.UpdateTableAlive(db, table_name, pid, leader_endpoint, "yes")
if not status.OK():
log.error("update leader alive failed. db {db} name {table_name} pid {pid} endpoint {leader_endpoint}".format(
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
return Status(-1, "recover partition failed")
# recover follower
for pos in range(len(partitions)):
for pos in range(len(replicas)):
if pos == leader_pos:
continue
partition = partitions[pos]
partition = replicas[pos]
endpoint = partition.GetEndpoint()
if partition.IsAlive():
status = executor.UpdateTableAlive(db, table_name, pid, endpoint, "no")
Expand All @@ -149,24 +151,31 @@ def RecoverTable(executor, db, table_name):
log.info("{table_name} in {db} is healthy".format(table_name=table_name, db=db))
return Status()
log.info("recover {table_name} in {db}".format(table_name=table_name, db=db))
status, table_info = executor.GetTableInfo(db, table_name)
status, table_info = executor.GetTableInfoHTTP(db, table_name)
if not status.OK():
log.warning("get table info failed. msg is {msg}".format(msg=status.GetMsg()))
return Status(-1, "get table info failed. msg is {msg}".format(msg=status.GetMsg()))
partition_dict = executor.ParseTableInfo(table_info)
log.warning("get table info failed. msg is {msg}".format(msg=status))
return Status(-1, "get table info failed. msg is {msg}".format(msg=status))
if len(table_info) != 1:
log.warning("table info should be 1, {table_info}".format(table_info=table_info))
return Status(-1, "table info should be 1")
table_info = table_info[0]
partition_dict = executor.ParseTableInfoJson(table_info)
storage = "kMemory" if "storage_mode" not in table_info else table_info["storage_mode"]
endpoints = set()
for record in table_info:
endpoints.add(record[3])
for _, reps in partition_dict.items():
# list of replicas
for rep in reps:
endpoints.add(rep.GetEndpoint())
endpoint_status = {}
for endpoint in endpoints:
status, result = executor.GetTableStatus(endpoint)
if not status.OK():
log.warning("get table status failed. msg is {msg}".format(msg=status.GetMsg()))
return Status(-1, "get table status failed. msg is {msg}".format(msg=status.GetMsg()))
endpoint_status[endpoint] = result
max_pid = int(table_info[-1][2])
for pid in range(max_pid + 1):
RecoverPartition(executor, db, partition_dict[str(pid)], endpoint_status)

for _, part in partition_dict.items():
RecoverPartition(executor, db, part, endpoint_status, storage)
# wait op
time.sleep(1)
while True:
Expand Down
98 changes: 73 additions & 25 deletions tools/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
import subprocess
import sys
import time
# http lib for python2 or 3
import json
try:
import httplib
import urllib
except ImportError:
import http.client as httplib
import urllib.parse as urllib

# for Python 2, don't use f-string
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format = '%(levelname)s: %(message)s')
Expand All @@ -35,6 +44,9 @@ def GetMsg(self):
def GetCode(self):
return self.code

def __str__(self):
return "code: {code}, msg: {msg}".format(code = self.code, msg = self.msg)

class Partition:
def __init__(self, name, tid, pid, endpoint, is_leader, is_alive, offset):
self.name = name
Expand Down Expand Up @@ -202,17 +214,48 @@ def GetTableInfo(self, database, table_name = ''):
continue
result.append(record)
return Status(), result
def GetTableInfoHTTP(self, database, table_name = ''):
"""http post ShowTable to ns leader, return one or all table info"""
ns = self.endpoint_map[self.ns_leader]
conn = httplib.HTTPConnection(ns)
param = {"db": database, "name": table_name}
headers = {"Content-type": "application/json"}
conn.request("POST", "/NameServer/ShowTable", json.dumps(param), headers)
response = conn.getresponse()
if response.status != 200:
return Status(response.status, response.reason), None
result = json.loads(response.read())
conn.close()
# check resp
if result["code"] != 0:
return Status(result["code"], "get table info failed: {msg}".format(msg=result["msg"]))
return Status(), result["table_info"]

def ParseTableInfo(self, table_info):
result = {}
for record in table_info:
is_leader = True if record[4] == "leader" else False
is_alive = True if record[5] == "yes" else False
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6]);
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6])
result.setdefault(record[2], [])
result[record[2]].append(partition)
return result

def ParseTableInfoJson(self, table_info):
"""parse one table's partition info from json"""
result = {}
parts = table_info["table_partition"]
for partition in parts:
# one partition(one leader and others)
for replica in partition["partition_meta"]:
is_leader = replica["is_leader"]
is_alive = True if "is_alive" not in replica else replica["is_alive"]
# the classname should be replica, but use partition for compatible
pinfo = Partition(table_info["name"], table_info["tid"], partition["pid"], replica["endpoint"], is_leader, is_alive, replica["offset"])
result.setdefault(partition["pid"], [])
result[partition["pid"]].append(pinfo)
return result

def GetTablePartition(self, database, table_name):
status, result = self.GetTableInfo(database, table_name)
if not status.OK:
Expand Down Expand Up @@ -274,30 +317,35 @@ def ShowTableStatus(self, pattern = '%'):

return Status(), output_processed

def LoadTable(self, endpoint, name, tid, pid, sync = True):
cmd = list(self.tablet_base_cmd)
cmd.append("--endpoint=" + self.endpoint_map[endpoint])
cmd.append("--cmd=loadtable {} {} {} 0 8".format(name, tid, pid))
log.info("run {cmd}".format(cmd = cmd))
status, output = self.RunWithRetuncode(cmd)
time.sleep(1)
if status.OK() and output.find("LoadTable ok") != -1:
if not sync:
return Status()
while True:
status, result = self.GetTableStatus(endpoint, tid, pid)
key = "{}_{}".format(tid, pid)
if status.OK() and key in result:
table_stat = result[key][4]
if table_stat == "kTableNormal":
return Status()
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
else:
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
time.sleep(2)

return Status(-1, "execute load table failed, status {msg}, output {output}".format(msg = status.GetMsg(), output = output))
def LoadTableHTTP(self, endpoint, name, tid, pid, storage):
"""http post LoadTable to tablet, support all storage mode"""
conn = httplib.HTTPConnection(endpoint)
# ttl won't effect, set to 0, and seg cnt is always 8
# and no matter if leader
param = {"table_meta": {"name": name, "tid": tid, "pid": pid, "ttl":0, "seg_cnt":8, "storage_mode": storage}}
headers = {"Content-type": "application/json"}
conn.request("POST", "/TabletServer/LoadTable", json.dumps(param), headers)
response = conn.getresponse()
if response.status != 200:
return Status(response.status, response.reason)
result = response.read()
conn.close()
resp = json.loads(result)
if resp["code"] != 0:
return Status(resp["code"], resp["msg"])
# wait for success TODO(hw): refactor
while True:
status, result = self.GetTableStatus(endpoint, str(tid), str(pid))
key = "{}_{}".format(tid, pid)
if status.OK() and key in result:
table_stat = result[key][4]
if table_stat == "kTableNormal":
return Status()
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
else:
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
time.sleep(2)

def GetLeaderFollowerOffset(self, endpoint, tid, pid):
cmd = list(self.tablet_base_cmd)
Expand Down

0 comments on commit c5ecca7

Please sign in to comment.