Skip to content

Commit

Permalink
Spark: Add support for describing/showing views
Browse files Browse the repository at this point in the history
This adds support for:
* `DESCRIBE <viewName>` / `DESCRIBE EXTENDED <viewName>`
* `SHOW VIEWS` / `SHOW VIEWS LIKE <pattern>`
* `SHOW TBLPROPERTIES <viewName>`
* `SHOW CREATE TABLE <viewName>`
  • Loading branch information
nastra committed Jan 29, 2024
1 parent 54756b6 commit aa786db
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.ShowViews
import org.apache.spark.sql.catalyst.plans.logical.View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.plans.logical.views.ShowIcebergViews
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogPlugin
Expand Down Expand Up @@ -60,6 +63,13 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
properties = properties,
allowExisting = allowExisting,
replace = replace)

case ShowViews(UnresolvedNamespace(Seq()), pattern, output) if isViewCatalog(catalogManager.currentCatalog) =>
ShowIcebergViews(ResolvedNamespace(catalogManager.currentCatalog, Seq.empty), pattern, output)

case ShowViews(UnresolvedNamespace(CatalogAndNamespace(catalog, ns)), pattern, output)
if isViewCatalog(catalog) =>
ShowIcebergViews(ResolvedNamespace(catalog, ns), pattern, output)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.ShowViews
import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand

case class ShowIcebergViews(
namespace: LogicalPlan,
pattern: Option[String],
override val output: Seq[Attribute] = ShowViews.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace

override protected def withNewChildInternal(newChild: LogicalPlan): ShowIcebergViews =
copy(namespace = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.escapeSingleQuotedString
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.LeafExecNode
import scala.collection.JavaConverters._

case class DescribeV2ViewExec(
output: Seq[Attribute],
view: View,
isExtended: Boolean) extends V2CommandExec with LeafExecNode {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override protected def run(): Seq[InternalRow] = {
if (isExtended) {
(describeSchema :+ emptyRow) ++ describeExtended
} else {
describeSchema
}
}

private def describeSchema: Seq[InternalRow] =
view.schema().map { column =>
toCatalystRow(
column.name,
column.dataType.simpleString,
column.getComment().getOrElse(""))
}

private def emptyRow: InternalRow = toCatalystRow("", "", "")

private def describeExtended: Seq[InternalRow] = {
val outputColumns = view.queryColumnNames.mkString("[", ", ", "]")
val properties: Map[String, String] = view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala
val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq
val viewProperties = properties.toSeq.sortBy(_._1).map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}.mkString("[", ", ", "]")


toCatalystRow("# Detailed View Information", "", "") ::
toCatalystRow("Comment", view.properties.getOrDefault(ViewCatalog.PROP_COMMENT, ""), "") ::
toCatalystRow("View Text", view.query, "") ::
toCatalystRow("View Catalog and Namespace", viewCatalogAndNamespace.quoted, "") ::
toCatalystRow("View Query Output Columns", outputColumns, "") ::
toCatalystRow("View Properties", viewProperties, "") ::
toCatalystRow("Created By", view.properties.getOrDefault(ViewCatalog.PROP_CREATE_ENGINE_VERSION, ""), "") ::
Nil
}

override def simpleString(maxFields: Int): String = {
s"DescribeV2ViewExec"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
Expand All @@ -44,9 +46,12 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
import org.apache.spark.sql.catalyst.plans.logical.ShowTableProperties
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.plans.logical.views.ShowIcebergViews
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog
Expand Down Expand Up @@ -123,6 +128,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
allowExisting = allowExisting,
replace = replace) :: Nil

case d@DescribeRelation(ResolvedV2View(catalog, ident), _, isExtended, _) =>
DescribeV2ViewExec(d.output, catalog.loadView(ident), isExtended) :: Nil

case show@ShowTableProperties(ResolvedV2View(catalog, ident), propertyKey, _) =>
ShowV2ViewPropertiesExec(show.output, catalog.loadView(ident), propertyKey) :: Nil

case show@ShowIcebergViews(ResolvedNamespace(catalog: ViewCatalog, namespace), pattern, _) =>
ShowV2ViewsExec(show.output, catalog, namespace, pattern) :: Nil

case show@ShowCreateTable(ResolvedV2View(catalog, ident), _, _) =>
ShowCreateV2ViewExec(show.output, catalog.loadView(ident)) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.escapeSingleQuotedString
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.LeafExecNode
import scala.collection.JavaConverters._

case class ShowCreateV2ViewExec(output: Seq[Attribute], view: View)
extends V2CommandExec with LeafExecNode {

override protected def run(): Seq[InternalRow] = {
val builder = new StringBuilder
builder ++= s"CREATE VIEW ${view.name} "
showColumns(view, builder)
showComment(view, builder)
showProperties(view, builder)
builder ++= s"AS\n${view.query}\n"

Seq(toCatalystRow(builder.toString))
}

private def showColumns(view: View, builder: StringBuilder): Unit = {
val columns = view.schema().fields
.map(x => s"${x.name}${x.getComment().map(c => s" COMMENT '$c'").getOrElse("")}")
.mkString("(", ", ", ")")
builder ++= columns
}

private def showComment(view: View, builder: StringBuilder): Unit = {
Option(view.properties.get(ViewCatalog.PROP_COMMENT))
.map(" COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

private def showProperties(
view: View,
builder: StringBuilder): Unit = {
val showProps = view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala
if (showProps.nonEmpty) {
val props = conf.redactOptions(showProps).toSeq.sortBy(_._1).map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

builder ++= " TBLPROPERTIES "
builder ++= concatByMultiLines(props)
}
}

private def concatByMultiLines(iter: Iterable[String]): String = {
iter.mkString("(\n ", ",\n ", ")\n")
}

override def simpleString(maxFields: Int): String = {
s"ShowCreateV2ViewExec"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.LeafExecNode
import scala.collection.JavaConverters._

case class ShowV2ViewPropertiesExec(
output: Seq[Attribute],
view: View,
propertyKey: Option[String]) extends V2CommandExec with LeafExecNode {

override protected def run(): Seq[InternalRow] = {
propertyKey match {
case Some(p) =>
val propValue = properties.getOrElse(p,
s"View ${view.name()} does not have property: $p")
Seq(toCatalystRow(p, propValue))
case None =>
properties.map {
case (k, v) => toCatalystRow(k, v)
}.toSeq
}
}


private def properties = {
view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala
}

override def simpleString(maxFields: Int): String = {
s"ShowV2ViewPropertiesExec"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.LeafExecNode
import scala.collection.mutable.ArrayBuffer

case class ShowV2ViewsExec(
output: Seq[Attribute],
catalog: ViewCatalog,
namespace: Seq[String],
pattern: Option[String]) extends V2CommandExec with LeafExecNode {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()

// handle GLOBAL VIEWS
val globalTemp = session.sessionState.catalog.globalTempViewManager.database
if (namespace.nonEmpty && globalTemp == namespace.head) {
pattern.map(p => session.sessionState.catalog.globalTempViewManager.listViewNames(p))
.getOrElse(session.sessionState.catalog.globalTempViewManager.listViewNames("*"))
.map(name => rows += toCatalystRow(globalTemp, name, true))
} else {
val views = catalog.listViews(namespace: _*)
views.map { view =>
if (pattern.map(StringUtils.filterPattern(Seq(view.name()), _).nonEmpty).getOrElse(true)) {
rows += toCatalystRow(view.namespace().quoted, view.name(), false)
}
}
}

// include TEMP VIEWS
pattern.map(p => session.sessionState.catalog.listLocalTempViews(p))
.getOrElse(session.sessionState.catalog.listLocalTempViews("*"))
.map(v => rows += toCatalystRow(v.database.toArray.quoted, v.table, true))

rows.toSeq
}

override def simpleString(maxFields: Int): String = {
s"ShowV2ViewsExec"
}
}
Loading

0 comments on commit aa786db

Please sign in to comment.