Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use NonEmptyString to access UDFMgr. #14844

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/bendpy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ databend-common-expression = { path = "../query/expression" }
databend-common-license = { path = "../common/license" }
databend-common-meta-app = { path = "../meta/app" }
databend-common-meta-embedded = { path = "../meta/embedded" }
databend-common-meta-types = { path = "../meta/types" }
databend-common-users = { path = "../query/users" }
databend-query = { path = "../query/service", features = [
"simd",
Expand Down
5 changes: 4 additions & 1 deletion src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::GrantObject;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_types::NonEmptyString;
use databend_common_users::UserApiProvider;
use databend_query::sessions::QueryContext;
use databend_query::sessions::Session;
Expand Down Expand Up @@ -55,12 +56,14 @@ impl PySessionContext {
uuid::Uuid::new_v4().to_string()
};

let tenant = NonEmptyString::new(tenant).unwrap();

let config = GlobalConfig::instance();
UserApiProvider::try_create_simple(config.meta.to_meta_grpc_client_conf(), &tenant)
.await
.unwrap();

session.set_current_tenant(tenant.to_owned());
session.set_current_tenant(tenant.to_string());

let mut user = UserInfo::new_no_auth("root", "%");
user.grants.grant_privileges(
Expand Down
4 changes: 3 additions & 1 deletion src/meta/kvapi/src/kvapi/test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ impl kvapi::TestSuite {
assert_eq!(v2.seq, 3);
assert_eq!(v2.data, b("v2"));
let v2_meta = v2.meta.unwrap();
assert_eq!(v2_meta.get_expire_at_ms().unwrap() / 1000, now_sec + 10);
let expire_at_sec = v2_meta.get_expire_at_ms().unwrap() / 1000;
let want = now_sec + 10;
assert!((want..want + 2).contains(&expire_at_sec));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl GlobalServices {
UserApiProvider::init(
config.meta.to_meta_grpc_client_conf(),
config.query.idm.clone(),
config.query.tenant_id.as_str(),
&config.query.tenant_id,
config.query.tenant_quota.clone(),
)
.await?;
Expand Down
11 changes: 6 additions & 5 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_meta_app::principal::StageType;
use databend_common_meta_app::principal::UserGrantSet;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::optimizer::get_udf_names;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::PresignAction;
Expand Down Expand Up @@ -507,7 +508,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, None, db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand All @@ -526,7 +527,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, None, db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand All @@ -545,7 +546,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, Some(table), db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, Some(table), db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand Down Expand Up @@ -628,7 +629,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), &plan.database, None, db_id, None, grant_set).await?;
let has_priv = has_priv(&tenant, &plan.database, None, db_id, None, grant_set).await?;

return if has_priv {
Ok(())
Expand Down Expand Up @@ -1002,7 +1003,7 @@ impl AccessChecker for PrivilegeAccess {

// TODO(liyz): replace it with verify_access
async fn has_priv(
tenant: &str,
tenant: &NonEmptyString,
db_name: &str,
table_name: Option<&str>,
db_id: u64,
Expand Down
5 changes: 1 addition & 4 deletions src/query/service/src/interpreters/common/grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ pub async fn validate_grant_object_exists(
}
}
GrantObject::UDF(udf) => {
if !UserApiProvider::instance()
.exists_udf(tenant.as_str(), udf)
.await?
{
if !UserApiProvider::instance().exists_udf(&tenant, udf).await? {
return Err(databend_common_exception::ErrorCode::UnknownStage(format!(
"udf {udf} not exists"
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Interpreter for CreateDatabaseInterpreter {
let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let databases = catalog.list_databases(&tenant).await?;
let databases = catalog.list_databases(tenant.as_str()).await?;
if quota.max_databases != 0 && databases.len() >= quota.max_databases as usize {
return Err(ErrorCode::TenantQuotaExceeded(format!(
"Max databases quota exceeded {}",
Expand All @@ -121,7 +121,7 @@ impl Interpreter for CreateDatabaseInterpreter {
};
// if create from other tenant, check from share endpoint
if let Some(ref share_name) = self.plan.meta.from_share {
self.check_create_database_from_share(&tenant, share_name)
self.check_create_database_from_share(&tenant.to_string(), share_name)
.await?;
}

Expand All @@ -130,7 +130,7 @@ impl Interpreter for CreateDatabaseInterpreter {

// Grant ownership as the current role. The above create_db_req.meta.owner could be removed in
// the future.
let role_api = UserApiProvider::instance().get_role_api_client(&tenant)?;
let role_api = UserApiProvider::instance().role_api(&tenant);
if let Some(current_role) = self.ctx.get_current_role() {
role_api
.grant_ownership(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ impl Interpreter for DropDatabaseInterpreter {
.get_database(tenant.as_str(), &self.plan.database)
.await;
if let Ok(db) = db {
let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
let owner_object = OwnershipObject::Database {
catalog_name: self.plan.catalog.clone(),
db_id: db.get_db_info().ident.db_id,
};

role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// actual drop database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_meta_app::principal::OwnershipObject;
use databend_common_meta_app::principal::PrincipalIdentity;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::principal::UserPrivilegeType::Ownership;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::plans::GrantPrivilegePlan;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl GrantPrivilegeInterpreter {
async fn grant_ownership(
&self,
ctx: &Arc<QueryContext>,
tenant: &str,
tenant: &NonEmptyString,
owner_object: &OwnershipObject,
new_role: &str,
) -> Result<()> {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Interpreter for GrantPrivilegeInterpreter {
.convert_to_ownerobject(tenant.as_str(), &plan.on, plan.on.catalog())
.await?;
if self.ctx.get_current_role().is_some() {
self.grant_ownership(&self.ctx, tenant.as_str(), &owner_object, &role)
self.grant_ownership(&self.ctx, &tenant, &owner_object, &role)
.await?;
} else {
return Err(databend_common_exception::ErrorCode::UnknownRole(
Expand All @@ -208,9 +209,9 @@ impl Interpreter for GrantPrivilegeInterpreter {
}
} else {
user_mgr
.grant_privileges_to_role(tenant.as_str(), &role, plan.on, plan.priv_types)
.grant_privileges_to_role(&tenant, &role, plan.on, plan.priv_types)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ impl Interpreter for RevokePrivilegeInterpreter {
}
for object in plan.on {
user_mgr
.revoke_privileges_from_role(
tenant.as_str(),
&role,
object,
plan.priv_types,
)
.revoke_privileges_from_role(&tenant, &role, object, plan.priv_types)
.await?;
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/query/service/src/interpreters/interpreter_role_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,9 @@ impl Interpreter for CreateRoleInterpreter {
let tenant = self.ctx.get_tenant();
let user_mgr = UserApiProvider::instance();
user_mgr
.add_role(
tenant.as_str(),
RoleInfo::new(&role_name),
plan.if_not_exists,
)
.await?;
RoleCacheManager::instance()
.force_reload(tenant.as_str())
.add_role(&tenant, RoleInfo::new(&role_name), plan.if_not_exists)
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
6 changes: 2 additions & 4 deletions src/query/service/src/interpreters/interpreter_role_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Interpreter for DropRoleInterpreter {
}
let tenant = self.ctx.get_tenant();
UserApiProvider::instance()
.drop_role(tenant.as_str(), plan.role_name, plan.if_exists)
.drop_role(&tenant, plan.role_name, plan.if_exists)
.await?;

let session = self.ctx.get_current_session();
Expand All @@ -77,9 +77,7 @@ impl Interpreter for DropRoleInterpreter {
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
10 changes: 3 additions & 7 deletions src/query/service/src/interpreters/interpreter_role_grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl Interpreter for GrantRoleInterpreter {
// TODO: check privileges

// Check if the grant role exists.
user_mgr
.get_role(tenant.as_str(), plan.role.clone())
.await?;
user_mgr.get_role(&tenant, plan.role.clone()).await?;
match plan.principal {
PrincipalIdentity::User(user) => {
user_mgr
Expand All @@ -71,14 +69,12 @@ impl Interpreter for GrantRoleInterpreter {
}
PrincipalIdentity::Role(role) => {
user_mgr
.grant_role_to_role(tenant.as_str(), &role, plan.role)
.grant_role_to_role(&tenant, &role, plan.role)
.await?;
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
6 changes: 2 additions & 4 deletions src/query/service/src/interpreters/interpreter_role_revoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ impl Interpreter for RevokeRoleInterpreter {
}
PrincipalIdentity::Role(role) => {
UserApiProvider::instance()
.revoke_role_from_role(tenant.as_str(), &role, &plan.role)
.revoke_role_from_role(&tenant, &role, &plan.role)
.await?;
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
5 changes: 4 additions & 1 deletion src/query/service/src/interpreters/interpreter_setting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chrono_tz::Tz;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::plans::SettingPlan;
use databend_common_sql::plans::VarValue;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -91,7 +92,9 @@ impl Interpreter for SettingInterpreter {
if config.query.internal_enable_sandbox_tenant && !tenant.is_empty() {
UserApiProvider::try_create_simple(
config.meta.to_meta_grpc_client_conf(),
&tenant,
&NonEmptyString::new(tenant).map_err(|_e| {
ErrorCode::TenantIsEmpty("when SettingInterpreter")
})?,
)
.await?;
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_show_grants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ impl Interpreter for ShowGrantsInterpreter {
}
PrincipalIdentity::Role(role) => {
let role = UserApiProvider::instance()
.get_role(tenant.as_str(), role.clone())
.get_role(&tenant, role.clone())
.await?;
(format!("ROLE `{}`", role.identity()), role.grants)
}
},
};
// TODO: display roles list instead of the inherited roles
let grant_entries = RoleCacheManager::instance()
.find_related_roles(tenant.as_str(), &grant_set.roles())
.find_related_roles(&tenant, &grant_set.roles())
.await?
.into_iter()
.map(|role| role.grants)
Expand Down
12 changes: 8 additions & 4 deletions src/query/service/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TableStatistics;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::field_default_value;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::BloomIndexColumns;
Expand Down Expand Up @@ -100,6 +101,9 @@ impl Interpreter for CreateTableInterpreter {
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let tenant = self.plan.tenant.clone();
let tenant = NonEmptyString::new(tenant).map_err(|_e| {
ErrorCode::TenantIsEmpty("tenant is empty when CreateTableInterpreter")
})?;
let has_computed_column = self
.plan
.schema
Expand Down Expand Up @@ -186,7 +190,7 @@ impl CreateTableInterpreter {
.await?;
let db_id = db.get_db_info().ident.db_id;

let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
role_api
.grant_ownership(
&OwnershipObject::Table {
Expand All @@ -197,7 +201,7 @@ impl CreateTableInterpreter {
&current_role.name,
)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// If the table creation query contains column definitions, like 'CREATE TABLE t1(a int) AS SELECT * from t2',
Expand Down Expand Up @@ -278,7 +282,7 @@ impl CreateTableInterpreter {
.await?;
let db_id = db.get_db_info().ident.db_id;

let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
role_api
.grant_ownership(
&OwnershipObject::Table {
Expand All @@ -289,7 +293,7 @@ impl CreateTableInterpreter {
&current_role.name,
)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// update share spec if needed
Expand Down
Loading
Loading