Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Authorization by adding TenantAdmin interface #6487

Merged
merged 2 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* Provider of authorization mechanism
Expand All @@ -46,6 +47,18 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
}

/**
* Check if specified role is an admin of the tenant
* @param tenant the tenant to check
* @param role the role to check
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false);
}

/**
* Perform initialization for the authorization provider
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +77,14 @@ public CompletableFuture<Boolean> isSuperUser(String user) {
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,7 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant());
validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -224,7 +226,7 @@ protected void validateSuperUserAccess() {
*/
protected void validateAdminAccessForTenant(String tenant) {
try {
validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant);
validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
} catch (RestException e) {
throw e;
} catch (Exception e) {
Expand All @@ -234,7 +236,8 @@ protected void validateAdminAccessForTenant(String tenant) {
}

protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant)
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData)
throws RestException, Exception {
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
Expand All @@ -259,22 +262,17 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);

if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {

CompletableFuture<Boolean> isProxySuperUserFuture;
CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
try {
isProxySuperUserFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(clientAppId);
AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId);

isOriginalPrincipalSuperUserFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(originalPrincipal);
isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal);

Set<String> adminRoles = tenantInfo.getAdminRoles();
boolean proxyAuthorized = isProxySuperUserFuture.get() || adminRoles.contains(clientAppId);
boolean originalPrincipalAuthorized
= isOriginalPrincipalSuperUserFuture.get() || adminRoles.contains(originalPrincipal);
boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get();
boolean originalPrincipalAuthorized
= isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo, authenticationData).get();
if (!proxyAuthorized || !originalPrincipalAuthorized) {
throw new RestException(Status.UNAUTHORIZED,
String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
Expand All @@ -290,7 +288,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
.getAuthorizationService()
.isSuperUser(clientAppId)
.join()) {
if (!tenantInfo.getAdminRoles().contains(clientAppId)) {
if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
throw new RestException(Status.UNAUTHORIZED,
"Don't have permission to administrate resources on this tenant");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void setup() throws Exception {
doReturn(false).when(namespaces).isRequestHttps();
doReturn("test").when(namespaces).clientAppId();
doReturn(null).when(namespaces).originalPrincipal();
doReturn(null).when(namespaces).clientAuthData();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
Expand Down Expand Up @@ -987,6 +988,7 @@ public void testValidateTopicOwnership() throws Exception {
doReturn(false).when(topics).isRequestHttps();
doReturn("test").when(topics).clientAppId();
doReturn(null).when(topics).originalPrincipal();
doReturn(null).when(topics).clientAuthData();
mockWebUrl(localWebServiceUrl, testNs);
doReturn("persistent").when(topics).domain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -109,13 +111,16 @@ void setup(Method method) throws Exception {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
config.setAuthenticationProviders(providers);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
functionsWorkerService = spy(createPulsarFunctionWorker(config));
AuthenticationService authenticationService = new AuthenticationService(config);
AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
when(functionsWorkerService.isInitialized()).thenReturn(true);

PulsarAdmin admin = mock(PulsarAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected static void checkAuthorization(DiscoveryService service, TopicName top
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
if (!tenantInfo.getAdminRoles().contains(role)) {
if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this property");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,10 +1449,10 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo
if (clientRole != null) {
try {
TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
if (tenantInfo != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(clientRole)) {
if (tenantInfo != null && worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, tenantInfo, authenticationData).get()) {
return true;
}
} catch (PulsarAdminException.NotFoundException e) {
} catch (PulsarAdminException.NotFoundException | InterruptedException | ExecutionException e) {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -230,51 +232,56 @@ public void testMetricsEmpty() {
}

@Test
public void testIsAuthorizedRole() throws PulsarAdminException {

public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedException, ExecutionException {

TenantInfo tenantInfo = new TenantInfo();
AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
workerConfig.setSuperUserRoles(Collections.singleton(superUser));
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();

// test super user
AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));

// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
Tenants tenants = mock(Tenants.class);
when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
PulsarAdmin admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// if user is tenant admin
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
tenants = mock(Tenants.class);
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(Collections.singleton("test-user"));
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);

admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// test user allow function action
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(true).when(functionImpl).allowFunctionOps(any(), any(), any());
tenants = mock(Tenants.class);
when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
tenantInfo.setAdminRoles(Collections.emptySet());
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);

admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// test role is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected static void checkAuthorization(ProxyService service, TopicName topicNa
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
if (!tenantInfo.getAdminRoles().contains(role)) {
if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
}
}
Expand Down