From e64d280ed9305a03c7f01cd45ef7ad2416b32a8a Mon Sep 17 00:00:00 2001 From: Huang Wei Date: Tue, 4 Jul 2023 15:52:08 +0800 Subject: [PATCH 1/5] fix: offline load data & show table status --- docs/zh/deploy/conf.md | 2 +- docs/zh/openmldb_sql/ddl/SET_STATEMENT.md | 4 +- docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md | 32 +++---- .../openmldb_sql/dml/LOAD_DATA_STATEMENT.md | 23 +++-- docs/zh/quickstart/function_boundary.md | 10 +++ .../openmldb/batch/nodes/LoadDataPlan.scala | 86 +++++++++++-------- .../openmldb/batch/utils/HybridseUtil.scala | 52 +++++++---- src/sdk/sql_cluster_router.cc | 28 +++--- 8 files changed, 141 insertions(+), 96 deletions(-) diff --git a/docs/zh/deploy/conf.md b/docs/zh/deploy/conf.md index 57acf72f645..b4edfdd1ed6 100644 --- a/docs/zh/deploy/conf.md +++ b/docs/zh/deploy/conf.md @@ -301,7 +301,7 @@ local模式即Spark任务运行在本地(TaskManager所在主机),该模 "yarn"和"yarn-cluster"是同一个模式,即Spark任务运行在Yarn集群上,该模式下需要配置的参数较多,主要包括: - 在**启动TaskManager前**配置环境变量`HADOOP_CONF_DIR`为Hadoop和Yarn的配置文件所在目录,文件目录中应包含Hadoop的`core-site.xml`、`hdfs-site.xml`、Yarn的`yarn-site.xml`等配置文件,参考[Spark官方文档](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#launching-spark-on-yarn)。 - `spark.yarn.jars`配置Yarn需要读取的Spark运行jar包地址,必须是`hdfs://`地址。可以上传[OpenMLDB Spark 发行版](../../tutorial/openmldbspark_distribution.md)解压后的`jars`目录到HDFS上,并配置为`hdfs:///jars/*`(注意通配符)。[如果不配置该参数,Yarn会将`$SPARK_HOME/jars`打包上传分发,并且每次离线任务都要分发](https://spark.apache.org/docs/3.2.1/running-on-yarn.html#preparations),效率较低,所以推荐配置。 -- `batchjob.jar.path`必须是HDFS路径,上传batchjob jar包到HDFS上,并配置为对应地址,保证Yarn集群上所有Worker可以获得batchjob包。 +- `batchjob.jar.path`必须是HDFS路径(具体到包名),上传batchjob jar包到HDFS上,并配置为对应地址,保证Yarn集群上所有Worker可以获得batchjob包。 - `offline.data.prefix`必须是HDFS路径,保证Yarn集群上所有Worker可读写数据。应使用前面配置的环境变量`HADOOP_CONF_DIR`中的Hadoop集群地址。 ##### yarn-client模式 diff --git a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md index cf14540f7b0..077bd77483d 100644 --- a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md @@ -148,7 +148,7 @@ CREATE TABLE t1 (col0 STRING, col1 int, std_time TIMESTAMP, INDEX(KEY=col1, TS=s ### 离线命令配置详情 -- 设置离线命令同步执行,同步的超时时间将自动设置: +- 设置离线命令同步执行,同步的超时时间将自动设置为`sync_job_timeout`,默认30min: ```sql > SET @@sync_job = "true"; @@ -160,7 +160,7 @@ CREATE TABLE t1 (col0 STRING, col1 int, std_time TIMESTAMP, INDEX(KEY=col1, TS=s - 配置客户端`--sync_job_timeout`,不可大于`server.channel_keep_alive_time`。SDK暂不支持修改。 ``` -- 设置离线异步命令或离线管理命令的等待时间(单位为毫秒): +- 离线命令异步执行时,同样会有超时时间,可手动配置。设置离线异步命令或离线管理命令的等待时间(单位为毫秒): ```sql > SET @@job_timeout = "600000"; ``` diff --git a/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md b/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md index 4d05076ee47..5ff9db9151c 100644 --- a/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md +++ b/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md @@ -9,22 +9,22 @@ SHOW TABLE STATUS [LIKE Pattern]; ## 输出信息 -| Column | Description | -| ----------------- |-----------------------------------------------------------| -| Table_id | 表唯一 id | -| Table_name | 表名 | -| Database_name | 数据库名 | -| Storage_type | 存储类型, `memory`,`ssd`,`hdd` | -| Rows | 表的 rows count | -| Memory_data_size | 表内存占用(单位 bytes) | -| Disk_data_size | 表磁盘占用 (单位 bytes) | -| Partition | Partiton 数量 | -| Partition_unalive | Unalive partition 数量 | -| Replica | Replica 数量 | -| Offline_path | 表对应 offline 数据路径,仅对离线表生效。 `NULL` 表示未设置该项。 | -| Offline_format | 表对应 offline 数据格式,仅对离线表生效。 `NULL` 表示未设置该项。 | -| Offline_deep_copy | 表对应 offline 数据是否使用 deep copy,仅对离线表生效。 `NULL` 表示未设置该项。| -| Warnings | 当前表相关的Warnings信息,包含以下四类:
1) `leader/follower mode inconsistent`: nameserver上记录的leader/follower信息和tablet上面不一致
2) `state is kNotFound/kTableUndefined/kTableLoading`:分片状态为不可用,`kNotFound`代表分片不存在;`kTableUndefined`代表分片未成功加载; `kTableLoading`表明分配正在加载,稍等可用
3) `real replica number xx does not match the configured replicanum xx`:副本数目和配置的`replicanum`不匹配
4) `not connected to leader`:follower和leader未连接,通常和3) 同时出现 | +| Column | Description | +| ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Table_id | 表唯一 id | +| Table_name | 表名 | +| Database_name | 数据库名 | +| Storage_type | 存储类型, `memory`,`ssd`,`hdd` | +| Rows | 表的 rows count | +| Memory_data_size | 表内存占用(单位 bytes) | +| Disk_data_size | 表磁盘占用 (单位 bytes) | +| Partition | Partiton 数量 | +| Partition_unalive | Unalive partition 数量 | +| Replica | Replica 数量 | +| Offline_path | 表对应 offline 数据路径,仅对离线表生效。 `NULL` 表示未设置该项。 | +| Offline_format | 表对应 offline 数据格式,仅对离线表生效。 `NULL` 表示未设置该项。 | +| Offline_symbolic_paths | 表对应 offline 软链接路径,`NULL` 表示未设置该项。 | +| Warnings | 当前表相关的Warnings信息,包含以下四类:
1) `leader/follower mode inconsistent`: nameserver上记录的leader/follower信息和tablet上面不一致
2) `state is kNotFound/kTableUndefined/kTableLoading`:分片状态为不可用,`kNotFound`代表分片不存在;`kTableUndefined`代表分片未成功加载; `kTableLoading`表明分配正在加载,稍等可用
3) `real replica number xx does not match the configured replicanum xx`:副本数目和配置的`replicanum`不匹配
4) `not connected to leader`:follower和leader未连接,通常和3) 同时出现 | diff --git a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md index f8cdc43690a..0bea7f2f98d 100644 --- a/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md +++ b/docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md @@ -110,27 +110,26 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1; ## 离线导入规则 -表的离线信息可通过desc 查看。在没有离线信息时,进行LOAD DATA离线导入,没有特别限制。 - -但如果当前已有离线信息,再次LOAD DATA,能否成功和表之前的离线信息有关。规则为: -- 原信息为软链接(Deep Copy列为false),OpenMLDB应只读该地址,不应修改**软连接中的数据** - - 可以再次软链接,替换原软链接地址,指向别的数据地址(mode='overwrite', deep_copy=false) - - 可以做硬拷贝(mode='overwrite', deep_copy=true),将丢弃原软链接地址,但不会修改软链接指向的数据 -- 原信息为硬拷贝(Deep Copy列为true),数据地址(Offline path)为OpenMLDB所拥有的,可读可写 - - **不可以**替换为软链接(数据还没有回收恢复机制,直接删除是危险行为,所以暂不支持) - - 可以再次硬拷贝(mode='overwrite'/'append', deep_copy=true) - +表的离线信息可通过`desc
`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。 +根据模式的不同,对离线信息的修改也不同。 +- overwrite模式,将会覆盖原有的所有字段,包括离线地址、软链接地址、格式、读取选项,仅保留当前overwrite进入的信息。 + - overwrite 硬拷贝,离线地址如果存在数据将被覆盖,软链接全部清空,格式更改为内部默认格式parquet,读取选项全部清空。 + - overwrite 软拷贝,离线地址直接删除(并不删除数据),软链接覆盖为输入的链接、格式、读取选项。 +- append模式,append 硬拷贝将数据写入当前离线地址,append 软拷贝需要考虑当前的格式和读取选项,如果不同,将无法append。 + - append同样的路径将被忽略,但路径需要是字符串相等的,如果不同,会作为两个软链接地址。 +- errorifexists,如果当前已有离线信息,将报错。这里的离线信息包括离线地址和软链接地址,比如,当前存在离线地址,无软链接,现在`LOAD DATA`软链接,也将报错。 ````{tip} -如果你肯定原有的硬拷贝数据不再被需要,而现在想将离线数据地址修改为软链接,可以手动删除离线地址的数据,并用nameserver http请求清空表的离线信息。 +如果当前离线信息存在问题,无法通过`LOAD DATA`修改,可以手动删除离线地址的数据,并用nameserver http请求清空表的离线信息。 清空离线信息步骤: ``` curl http:///NameServer/ShowTable -d'{"db":"","name":""}' # 获得其中的表tid curl http:///NameServer/UpdateOfflineTableInfo -d '{"db":"","name":"","tid":}' ``` -然后,可以进行软链接导入。 ```` +由于硬拷贝的写入格式无法修改,是parquet格式,所以如果想要硬拷贝和软链接同时存在,需要保证软链接的数据格式也是parquet。 + ## CSV源数据格式说明 导入支持csv和parquet两种数据格式,csv的格式需要特别注意,下面举例说明。 diff --git a/docs/zh/quickstart/function_boundary.md b/docs/zh/quickstart/function_boundary.md index 9162534982f..f925f5ec7f8 100644 --- a/docs/zh/quickstart/function_boundary.md +++ b/docs/zh/quickstart/function_boundary.md @@ -52,6 +52,16 @@ spark.default.conf=spark.port.maxRetries=32;foo=bar ## DML 边界 +### 离线信息 + +表的离线信息中存在两种path,一个是`offline_path`,一个是`symbolic_paths`。`offline_path`是离线数据的实际存储路径,`symbolic_paths`是离线数据的软链接路径。两种path都可以通过`LOAD DATA`来修改,`symbolic_paths`还可以通过`ALTER`语句修改。 + +`offline_path`和`symbolic_paths`的区别在于,`offline_path`是OpenMLDB集群所拥有的路径,如果实施硬拷贝,数据将写入此路径,而`symbolic_paths`是OpenMLDB集群外的路径,软拷贝将会在这个信息中增添一个路径。离线查询时,两个路径的数据都会被加载。两个路径使用同样的格式和读选项,不支持不同配置的路径。 + +因此,如果目前离线中存在`offline_path`,那么`LOAD DATA`只能修改`symbolic_paths`,如果目前离线中存在`symbolic_paths`,那么`LOAD DATA`可以修改`offline_path`和`symbolic_paths`。 + +`errorifexists`当表存在离线信息时就会报错。存在软链接时硬拷贝,或存在硬拷贝时软拷贝,都会报错。 + ### LOAD DATA `LOAD DATA` 无论导入到在线或离线,都是离线 job。源数据的格式规则,离线在线没有区别。 diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala index deeb0ba582c..caf8a1d658f 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import scala.collection.mutable import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMap, mapAsScalaMapConverter} - object LoadDataPlan { private val logger = LoggerFactory.getLogger(this.getClass) @@ -48,7 +47,6 @@ object LoadDataPlan { logger.info("table info: {}", info) require(info != null && info.getName.nonEmpty, s"table $db.$table info is not existed(no table name): $info") - // we read input file even in soft copy, // cause we want to check if "the input file schema == openmldb table schema" val df = HybridseUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList) @@ -58,12 +56,15 @@ object LoadDataPlan { if (storage == "online") { // Import online data require(deepCopy && mode == "append", "import to online storage, can't do soft copy, and mode must be append") - val writeOptions = Map("db" -> db, "table" -> table, + val writeOptions = Map( + "db" -> db, + "table" -> table, "zkCluster" -> ctx.getConf.openmldbZkCluster, - "zkPath" -> ctx.getConf.openmldbZkRootPath) + "zkPath" -> ctx.getConf.openmldbZkRootPath + ) df.write.options(writeOptions).format("openmldb").mode(mode).save() } else { // Import offline data - // only in some cases, do not need to update info + // only in some cases, do not need to update info, set false in these cases var needUpdateInfo = true val newInfoBuilder = info.toBuilder @@ -71,14 +72,15 @@ object LoadDataPlan { if (!deepCopy) { // Get error if exists - if (mode.equals("errorifexists") && info.hasOfflineTableInfo){ + if (mode.equals("errorifexists") && info.hasOfflineTableInfo) { throw new IllegalArgumentException("offline info exists") } val oldOfflineTableInfo = if (info.hasOfflineTableInfo) { // Have offline table info info.getOfflineTableInfo - } else { // No offline table info - OfflineTableInfo.newBuilder() + } else { // No offline table info, use new format + OfflineTableInfo + .newBuilder() .setPath("") .setFormat(format) .build() @@ -86,7 +88,6 @@ object LoadDataPlan { val newOfflineInfoBuilder = OfflineTableInfo.newBuilder(oldOfflineTableInfo) - // If mode=="append" if (mode.equals("append") || mode.equals("errorifexists")) { // Check if new path is already existed or not val symbolicPathsSize = newOfflineInfoBuilder.getSymbolicPathsCount() @@ -103,11 +104,20 @@ object LoadDataPlan { logger.info(s"Add the path of $inputFile to offline table info symbolic paths") newOfflineInfoBuilder.addSymbolicPaths(inputFile) } + // if no offline info berore, the format is set by new format + require( + oldOfflineTableInfo.getFormat.equals(format) && + oldOfflineTableInfo.getOptionsMap.asScala.toMap.equals(options), + s"format and options must be the same with existed, but old is ${oldOfflineTableInfo.getFormat}, " + + s"${oldOfflineTableInfo.getOptionsMap.asScala.toMap}, new is $format, $options" + ) } else if (mode.equals("overwrite")) { + // overwrite mode, we need to clean all paths(hard+symbolic) // TODO(tobe): May remove data files from copy import newOfflineInfoBuilder.setPath("") newOfflineInfoBuilder.clearSymbolicPaths() newOfflineInfoBuilder.addSymbolicPaths(inputFile) + newOfflineInfoBuilder.setFormat(format) } if (!format.equals("hive")) { @@ -117,7 +127,7 @@ object LoadDataPlan { // update to ns later newInfoBuilder.setOfflineTableInfo(newOfflineInfoBuilder.build()) - } else { // deep copy + } else { // deep copy, only work on OfflineTableInfo.path // Generate new offline address by db name, table name and config of prefix val offlineDataPrefix = if (ctx.getConf.offlineDataPrefix.endsWith("/")) { ctx.getConf.offlineDataPrefix.dropRight(1) @@ -129,47 +139,51 @@ object LoadDataPlan { // write default settings: no option and parquet format var (writePath, writeFormat) = (offlineDataPath, "parquet") var writeOptions: mutable.Map[String, String] = mutable.Map() + + var offlineBuilder = OfflineTableInfo + .newBuilder() + .setPath(writePath) + .setFormat(writeFormat) + .putAllOptions(writeOptions.asJava) if (info.hasOfflineTableInfo) { - if (mode.equals("errorifexists")) { - throw new IllegalArgumentException("offline info exists") - } else if (mode.equals("append")) { - throw new IllegalArgumentException("Deep copy with append mode is not supported yet") - } + require(!mode.equals("errorifexists"), "has offline info(even no deep path), can't do errorifexists") val old = info.getOfflineTableInfo - if (!old.getDeepCopy) { - require(mode.equals("overwrite"), "Only overwrite mode works. Old offline data is soft-coped, only can " + - "overwrite the offline info, leave the soft-coped data as it is.") - // if old offline data is soft-coped, we need to reject the old info, use the 'offlineDataPath' and - // normal settings - needUpdateInfo = true - } else { - // if old offline data is deep-copied and mode is append/overwrite, - // we need to use the old info and don't need to update info to ns - //writeFormat = old.getFormat - //writeOptions = old.getOptionsMap.asScala - // Generated the path to deep copy - //writePath = s"$offlineDataPrefix/$db/$table" - needUpdateInfo = true + // only symbolic paths need to be copied, and if overwrite, all fields will be overwritten as default write + if (!mode.equals("overwrite")) { + old.getSymbolicPathsList.forEach(path => { + offlineBuilder.addSymbolicPaths(path) + }) + // if has symbolic paths, the format and options may don't proper for deep copy + require( + old.getFormat == writeFormat && old.getOptionsMap.asScala.toMap.equals(writeOptions.toMap), + s"can't do deep copy in existed paths' format or options in mode $mode, " + + s"old is ${old.getFormat}, ${old.getOptionsMap.asScala.toMap}, new is $writeFormat, $writeOptions" + ) + if (old.getPath.equals(writePath)) { + needUpdateInfo = false + } + // write path may changed by offline path setting, so don't require path equals } } // do deep copy - require(!inputFile.equals(writePath), "read and write paths shouldn't be the same, it may clean data in " + - "the path") + require( + !inputFile.equals(writePath), + "read and write paths shouldn't be the same, it may clean data in the path" + ) df.write.mode(mode).format(writeFormat).options(writeOptions.toMap).save(writePath) - val offlineBuilder = OfflineTableInfo.newBuilder().setPath(writePath).setFormat(writeFormat) - .setDeepCopy(true).clearSymbolicPaths().putAllOptions(writeOptions.asJava) - newInfoBuilder.setOfflineTableInfo(offlineBuilder) } if (needUpdateInfo) { val newInfo = newInfoBuilder.build() logger.info(s"new info:\n$newInfo") - require(ctx.getOpenmldbSession.openmldbCatalogService.updateOfflineTableInfo(newInfo), s"update new info " + - s"failed: $newInfo") + require( + ctx.getOpenmldbSession.openmldbCatalogService.updateOfflineTableInfo(newInfo), + s"update new info failed: $newInfo" + ) } } diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala index 0b5016f3120..3a5ca64732d 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala @@ -218,19 +218,21 @@ object HybridseUtil { } // load data: read options, select into: write options + // parquet/hive format doesn't support any option now, consistent with write options(empty) when deep copy val options: mutable.Map[String, String] = mutable.Map() - // default values: https://spark.apache.org/docs/3.2.1/sql-data-sources-csv.html - // delimiter -> sep: ,(the same with spark3 default sep) - // header: true(different with spark) - // null_value -> nullValue: null(different with spark) - // quote: `"`(the same with spark3 default quote) - options += ("header" -> "true") - options += ("nullValue" -> "null") - updateOptionsMap(options, getOptionFromNode(node, "delimiter"), "sep", getStr) - updateOptionsMap(options, getOptionFromNode(node, "header"), "header", getBool) - updateOptionsMap(options, getOptionFromNode(node, "null_value"), "nullValue", getStr) - updateOptionsMap(options, getOptionFromNode(node, "quote"), "quote", getStr) - + if (format.equals("csv")){ + // default values: https://spark.apache.org/docs/3.2.1/sql-data-sources-csv.html + // delimiter -> sep: ,(the same with spark3 default sep) + // header: true(different with spark) + // null_value -> nullValue: null(different with spark) + // quote: `"`(the same with spark3 default quote) + options += ("header" -> "true") + options += ("nullValue" -> "null") + updateOptionsMap(options, getOptionFromNode(node, "delimiter"), "sep", getStr) + updateOptionsMap(options, getOptionFromNode(node, "header"), "header", getBool) + updateOptionsMap(options, getOptionFromNode(node, "null_value"), "nullValue", getStr) + updateOptionsMap(options, getOptionFromNode(node, "quote"), "quote", getStr) + } // load data: write mode(load data may write to offline storage or online storage, needs mode too) // select into: write mode val modeStr = parseOption(getOptionFromNode(node, "mode"), "error_if_exists", getStringOrDefault).toLowerCase @@ -314,7 +316,9 @@ object HybridseUtil { autoLoad(openmldbSession, file, List.empty[String], format, options, columns) } - // Decide which load method to use by arg `format`, DO NOT pass `hive://a.b` with format `csv`. + // Load df from file **and** symbol paths, they should in the same format and options. + // Decide which load method to use by arg `format`, DO NOT pass `hive://a.b` with format `csv`, + // the format should be `hive`. // Use `parseOptions` in LoadData/SelectInto to get the right format(filePath & option `format`). // valid pattern: // 1. hive path, format must be hive, discard other options @@ -325,10 +329,26 @@ object HybridseUtil { options: Map[String, String], columns: util.List[Common.ColumnDesc]): DataFrame = { val fmt = format.toLowerCase if (fmt.equals("hive")) { - logger.info("load data from hive table {}", file) - HybridseUtil.hiveLoad(openmldbSession, file, columns); + logger.info(s"load data from hive table $file & $symbolPaths") + if (file.isEmpty) { + var outputDf: DataFrame = null + symbolPaths.zipWithIndex.foreach { case (path, index) => + if (index == 0) { + outputDf = HybridseUtil.hiveLoad(openmldbSession, path, columns); + } else { + outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns)) + } + } + outputDf + } else { + var outputDf = HybridseUtil.hiveLoad(openmldbSession, file, columns) + for (path: String <- symbolPaths) { + outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns)) + } + outputDf + } } else { - logger.info("load data from file {} reader[format {}, options {}]", file, fmt, options) + logger.info("load data from file {} & {} reader[format {}, options {}]", file, symbolPaths, fmt, options) if (file.isEmpty) { var outputDf: DataFrame = null diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 68fd1026cba..b2116f1e7c7 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1970,10 +1970,9 @@ base::Status SQLClusterRouter::HandleSQLCreateTable(hybridse::node::CreatePlanNo std::map config; ::openmldb::taskmanager::JobInfo job_info; - int job_timeout = GetJobTimeout(); std::string output; - - ::openmldb::base::Status status = ExecuteOfflineQueryGetOutput(sql, config, db, job_timeout, &output); + // create table like hive is a long op, use large timeout + ::openmldb::base::Status status = ExecuteOfflineQueryGetOutput(sql, config, db, FLAGS_sync_job_timeout, &output); if (!status.OK()) { LOG(ERROR) << "Fail to create table, error message: " + status.msg; @@ -2677,7 +2676,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( auto current_symbolic_paths = offline_table_info.symbolic_paths(); - std::vector actions = plan->actions_; + std::vector actions = plan->actions_; // Handle multiple add and delete actions for (const auto& action : actions) { @@ -4068,7 +4067,7 @@ static const std::initializer_list GetTableStatusSchema() { static const std::initializer_list schema = { "Table_id", "Table_name", "Database_name", "Storage_type", "Rows", "Memory_data_size", "Disk_data_size", "Partition", "Partition_unalive", "Replica", - "Offline_path", "Offline_format", "Offline_deep_copy", "Warnings"}; + "Offline_path", "Offline_format", "Offline_symbolic_paths", "Warnings"}; return schema; } @@ -4089,7 +4088,7 @@ static const std::initializer_list GetTableStatusSchema() { // - Replica: replica number // - Offline_path: data path for offline data // - Offline_format: format for offline data -// - Offline_deep_copy: deep copy option for offline data +// - Offline_symbolic_paths: symbolic paths for offline data // - Warnings: any warnings raised during the checking // // if db is empty: @@ -4164,18 +4163,21 @@ std::shared_ptr SQLClusterRouter::ExecuteShowTableStat } CheckTableStatus(db, table_name, tid, partition_info, replica_num, table_statuses, &error_msg); } - - std::string offline_path = "NULL", offline_format = "NULL", offline_deep_copy = "NULL"; + // TODO(hw): miss options, it can be got from desc table + std::string offline_path = "NULL", offline_format = "NULL", symbolic_paths = "NULL"; if (tinfo.has_offline_table_info()) { offline_path = tinfo.offline_table_info().path(); offline_format = tinfo.offline_table_info().format(); - offline_deep_copy = std::to_string(tinfo.offline_table_info().deep_copy()); + symbolic_paths = ""; + for (const auto& ele : tinfo.offline_table_info().symbolic_paths()) { + symbolic_paths += ele + ","; + } } data.push_back({std::to_string(tid), table_name, db, storage_type, std::to_string(rows), std::to_string(mem_bytes), std::to_string(disk_bytes), std::to_string(partition_num), std::to_string(partition_unalive), std::to_string(replica_num), offline_path, offline_format, - offline_deep_copy, error_msg}); + symbolic_paths, error_msg}); } // TODO(#1456): rich schema result set, and pretty-print numberic values (e.g timestamp) in cli @@ -4288,7 +4290,7 @@ void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::ma } std::shared_ptr SQLClusterRouter::GetJobResultSet(int job_id, - ::hybridse::sdk::Status* status) { + ::hybridse::sdk::Status* status) { std::string db = openmldb::nameserver::INTERNAL_DB; std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id); @@ -4323,7 +4325,7 @@ std::shared_ptr SQLClusterRouter::GetJobResultSet(::hy return rs; } std::shared_ptr SQLClusterRouter::GetTaskManagerJobResult(const std::string& like_pattern, - ::hybridse::sdk::Status* status) { + ::hybridse::sdk::Status* status) { bool like_match = JobTableHelper::NeedLikeMatch(like_pattern); if (!like_pattern.empty() && !like_match) { int job_id; @@ -4347,7 +4349,7 @@ std::shared_ptr SQLClusterRouter::GetTaskManagerJobRes } std::shared_ptr SQLClusterRouter::GetNameServerJobResult(const std::string& like_pattern, - ::hybridse::sdk::Status* status) { + ::hybridse::sdk::Status* status) { bool like_match = JobTableHelper::NeedLikeMatch(like_pattern); base::Status ret; nameserver::ShowOPStatusResponse response; From c451d632161f3db25397f8ab66b0b5d398686100 Mon Sep 17 00:00:00 2001 From: HuangWei Date: Mon, 10 Jul 2023 09:48:07 +0800 Subject: [PATCH 2/5] fix --- docs/zh/openmldb_sql/ddl/SET_STATEMENT.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md index 077bd77483d..94bdbd172f0 100644 --- a/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md +++ b/docs/zh/openmldb_sql/ddl/SET_STATEMENT.md @@ -148,7 +148,7 @@ CREATE TABLE t1 (col0 STRING, col1 int, std_time TIMESTAMP, INDEX(KEY=col1, TS=s ### 离线命令配置详情 -- 设置离线命令同步执行,同步的超时时间将自动设置为`sync_job_timeout`,默认30min: +- 设置离线命令同步执行,同步的超时时间将自动设置为gflag `sync_job_timeout`,默认30min: ```sql > SET @@sync_job = "true"; From 5f99f168f290def37d866d10a22bca0e5bd676ce Mon Sep 17 00:00:00 2001 From: Huang Wei Date: Mon, 10 Jul 2023 15:11:21 +0800 Subject: [PATCH 3/5] fix ut --- .../com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala index caf8a1d658f..6e962675e53 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala @@ -78,11 +78,12 @@ object LoadDataPlan { val oldOfflineTableInfo = if (info.hasOfflineTableInfo) { // Have offline table info info.getOfflineTableInfo - } else { // No offline table info, use new format + } else { // No offline table info, use new format and options OfflineTableInfo .newBuilder() .setPath("") .setFormat(format) + .putAllOptions(options.asJava) .build() } From 8bd754e20877aa38ae45ae6eefa376c45681fd50 Mon Sep 17 00:00:00 2001 From: Huang Wei Date: Mon, 10 Jul 2023 18:00:37 +0800 Subject: [PATCH 4/5] fix --- .../en/reference/sql/ddl/SHOW_TABLE_STATUS.md | 32 +++++++++---------- src/cmd/sql_cmd_test.cc | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md b/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md index 2ea04a9b7ce..558af5ab526 100644 --- a/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md +++ b/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md @@ -13,22 +13,22 @@ For example, `'%'` means all databases, including the hidden ones. ## Output Information -| Column | Note | -| ----------------- |----------------------------------------------------------------------------------------------------------------------------------------| -| Table_id | It shows the unique id of the table. | -| Table_name | It shows the name of the table. | -| Database_name | It shows the name of the database, which the table belongs to. | -| Storage_type | It shows the storage type of the table. There are three types of value: `memory`,`ssd` and `hdd`. | -| Rows | It shows the number of rows in this table. | -| Memory_data_size | It shows the memory usage of the table in bytes. | -| Disk_data_size | It shows the disk usage of the table in bytes. | -| Partition | It shows the number of partitons of the table. | -| Partition_unalive | It shows the number of the unalive partitions of the table. | -| Replica | It shows the number of replicas of the table. | -| Offline_path | It shows the path of the offline data for this table and is valid only for offline tables. The `NULL` value means the path is not set. | -| Offline_format | It shows the offline data format of the table and is valid only for offline tables. The `NULL` value means it is not set. | -| Offline_deep_copy | It indicates whether deep copy is used on the table and is valid only for offline tables. The `NULL` value means it is not set. | -| Warnings | Warnings related to the table, including the following four types:
1) `leader/follower mode inconsistent`: the leader/follower information from nameserver is not consistent with those in tablet
2) `state is kNotFound/kTableUndefined/kTableLoading`:the partition is unavailable, `kNotFound` means the partition does not exist; `kTableUndefined` means the partition is not loaded successfully; `kTableLoading` means the partition is being loaded
3) `real replica number xx does not match the configured replicanum xx`:the number of replicas != `replicanum`
4) `not connected to leader`:follower is not connected to the leader, which usually occurs together with 3) | +| Column | Note | +| ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Table_id | It shows the unique id of the table. | +| Table_name | It shows the name of the table. | +| Database_name | It shows the name of the database, which the table belongs to. | +| Storage_type | It shows the storage type of the table. There are three types of value: `memory`,`ssd` and `hdd`. | +| Rows | It shows the number of rows in this table. | +| Memory_data_size | It shows the memory usage of the table in bytes. | +| Disk_data_size | It shows the disk usage of the table in bytes. | +| Partition | It shows the number of partitons of the table. | +| Partition_unalive | It shows the number of the unalive partitions of the table. | +| Replica | It shows the number of replicas of the table. | +| Offline_path | It shows the path of the offline data for this table and is valid only for offline tables. The `NULL` value means the path is not set. | +| Offline_format | It shows the offline data format of the table and is valid only for offline tables. The `NULL` value means it is not set. | +| Offline_symbolic_paths | It shows the paths that load data in deep_copy==false way. The `NULL` value means it is not set. | +| Warnings | Warnings related to the table, including the following four types:
1) `leader/follower mode inconsistent`: the leader/follower information from nameserver is not consistent with those in tablet
2) `state is kNotFound/kTableUndefined/kTableLoading`:the partition is unavailable, `kNotFound` means the partition does not exist; `kTableUndefined` means the partition is not loaded successfully; `kTableLoading` means the partition is being loaded
3) `real replica number xx does not match the configured replicanum xx`:the number of replicas != `replicanum`
4) `not connected to leader`:follower is not connected to the leader, which usually occurs together with 3) | ## Example diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 65e96028d0d..36ec76abd1c 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -3083,7 +3083,7 @@ void ExpectShowTableStatusResult(const std::vector> merged_expect = { {"Table_id", "Table_name", "Database_name", "Storage_type", "Rows", "Memory_data_size", "Disk_data_size", - "Partition", "Partition_unalive", "Replica", "Offline_path", "Offline_format", "Offline_deep_copy", + "Partition", "Partition_unalive", "Replica", "Offline_path", "Offline_format", "Offline_symbolic_paths", "Warnings"}}; merged_expect.insert(merged_expect.end(), expect.begin(), expect.end()); if (all_db) { From c0491adc24ae42f3cd1d07bd68f9f37eb651b0b0 Mon Sep 17 00:00:00 2001 From: Huang Wei Date: Tue, 11 Jul 2023 18:26:08 +0800 Subject: [PATCH 5/5] fix --- .../en/reference/sql/ddl/SHOW_TABLE_STATUS.md | 4 ++++ docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md | 4 +++- src/sdk/sql_cluster_router.cc | 21 ++++++++++++++----- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md b/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md index 558af5ab526..7c76595697d 100644 --- a/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md +++ b/docs/en/reference/sql/ddl/SHOW_TABLE_STATUS.md @@ -30,6 +30,10 @@ For example, `'%'` means all databases, including the hidden ones. | Offline_symbolic_paths | It shows the paths that load data in deep_copy==false way. The `NULL` value means it is not set. | | Warnings | Warnings related to the table, including the following four types:
1) `leader/follower mode inconsistent`: the leader/follower information from nameserver is not consistent with those in tablet
2) `state is kNotFound/kTableUndefined/kTableLoading`:the partition is unavailable, `kNotFound` means the partition does not exist; `kTableUndefined` means the partition is not loaded successfully; `kTableLoading` means the partition is being loaded
3) `real replica number xx does not match the configured replicanum xx`:the number of replicas != `replicanum`
4) `not connected to leader`:follower is not connected to the leader, which usually occurs together with 3) | +```{note} +When version <=0.8.1, `Offline_symbolic_paths` doesn't exist, the position is used by `Offline_deep_copy` instead. +``` + ## Example ```sql diff --git a/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md b/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md index 5ff9db9151c..4da1badeb98 100644 --- a/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md +++ b/docs/zh/openmldb_sql/ddl/SHOW_TABLE_STATUS.md @@ -26,7 +26,9 @@ SHOW TABLE STATUS [LIKE Pattern]; | Offline_symbolic_paths | 表对应 offline 软链接路径,`NULL` 表示未设置该项。 | | Warnings | 当前表相关的Warnings信息,包含以下四类:
1) `leader/follower mode inconsistent`: nameserver上记录的leader/follower信息和tablet上面不一致
2) `state is kNotFound/kTableUndefined/kTableLoading`:分片状态为不可用,`kNotFound`代表分片不存在;`kTableUndefined`代表分片未成功加载; `kTableLoading`表明分配正在加载,稍等可用
3) `real replica number xx does not match the configured replicanum xx`:副本数目和配置的`replicanum`不匹配
4) `not connected to leader`:follower和leader未连接,通常和3) 同时出现 | - +```{note} +版本<=0.8.1时,`Offline_symbolic_paths`不存在,所在列位置是`Offline_deep_copy`列。 +``` ## Example diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index b2116f1e7c7..537a6aeca66 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1972,7 +1972,8 @@ base::Status SQLClusterRouter::HandleSQLCreateTable(hybridse::node::CreatePlanNo ::openmldb::taskmanager::JobInfo job_info; std::string output; // create table like hive is a long op, use large timeout - ::openmldb::base::Status status = ExecuteOfflineQueryGetOutput(sql, config, db, FLAGS_sync_job_timeout, &output); + ::openmldb::base::Status status = + ExecuteOfflineQueryGetOutput(sql, config, db, FLAGS_sync_job_timeout, &output); if (!status.OK()) { LOG(ERROR) << "Fail to create table, error message: " + status.msg; @@ -4064,10 +4065,20 @@ std::shared_ptr SQLClusterRouter::ExecuteShowApiServer } static const std::initializer_list GetTableStatusSchema() { - static const std::initializer_list schema = { - "Table_id", "Table_name", "Database_name", "Storage_type", "Rows", - "Memory_data_size", "Disk_data_size", "Partition", "Partition_unalive", "Replica", - "Offline_path", "Offline_format", "Offline_symbolic_paths", "Warnings"}; + static const std::initializer_list schema = {"Table_id", + "Table_name", + "Database_name", + "Storage_type", + "Rows", + "Memory_data_size", + "Disk_data_size", + "Partition", + "Partition_unalive", + "Replica", + "Offline_path", + "Offline_format", + "Offline_symbolic_paths", + "Warnings"}; return schema; }