diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 9787eaebc7ebc..cb6541623eb7f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; /** * Provider of authorization mechanism @@ -46,6 +47,18 @@ default CompletableFuture isSuperUser(String role, ServiceConfiguration return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false); } + /** + * Check if specified role is an admin of the tenant + * @param tenant the tenant to check + * @param role the role to check + * @return a CompletableFuture containing a boolean in which true means the role is an admin user + * and false if it is not + */ + default CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, + AuthenticationDataSource authenticationData) { + return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false); + } + /** * Perform initialization for the authorization provider * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 95ff7643f98cb..381e8cf780e3e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -26,6 +26,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,14 @@ public CompletableFuture isSuperUser(String user) { return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); } + public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, + AuthenticationDataSource authenticationData) { + if (provider != null) { + return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData); + } + return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); + } + /** * * Grant authorization-action permission on a namespace to the given client diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 503b21b430d58..e37f09da31ff5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1797,7 +1797,7 @@ public static CompletableFuture getPartitionedTopicMet checkAuthorization(pulsar, topicName, clientAppId, authenticationData); } catch (RestException e) { try { - validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant()); + validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData); } catch (RestException authException) { log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString()); throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index ca9ec8567b006..497883de73563 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -52,6 +52,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authorization.AuthorizationProvider; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -224,7 +226,7 @@ protected void validateSuperUserAccess() { */ protected void validateAdminAccessForTenant(String tenant) { try { - validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant); + validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData()); } catch (RestException e) { throw e; } catch (Exception e) { @@ -234,7 +236,8 @@ protected void validateAdminAccessForTenant(String tenant) { } protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId, - String originalPrincipal, String tenant) + String originalPrincipal, String tenant, + AuthenticationDataSource authenticationData) throws RestException, Exception { if (log.isDebugEnabled()) { log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant, @@ -259,22 +262,17 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal); if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) { - CompletableFuture isProxySuperUserFuture; CompletableFuture isOriginalPrincipalSuperUserFuture; try { - isProxySuperUserFuture = pulsar.getBrokerService() - .getAuthorizationService() - .isSuperUser(clientAppId); + AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService(); + isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId); - isOriginalPrincipalSuperUserFuture = pulsar.getBrokerService() - .getAuthorizationService() - .isSuperUser(originalPrincipal); + isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal); - Set adminRoles = tenantInfo.getAdminRoles(); - boolean proxyAuthorized = isProxySuperUserFuture.get() || adminRoles.contains(clientAppId); - boolean originalPrincipalAuthorized - = isOriginalPrincipalSuperUserFuture.get() || adminRoles.contains(originalPrincipal); + boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get(); + boolean originalPrincipalAuthorized + = isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo, authenticationData).get(); if (!proxyAuthorized || !originalPrincipalAuthorized) { throw new RestException(Status.UNAUTHORIZED, String.format("Proxy not authorized to access resource (proxy:%s,original:%s)", @@ -290,7 +288,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String .getAuthorizationService() .isSuperUser(clientAppId) .join()) { - if (!tenantInfo.getAdminRoles().contains(clientAppId)) { + if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) { throw new RestException(Status.UNAUTHORIZED, "Don't have permission to administrate resources on this tenant"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index a607921952667..0f7d100e4d122 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -145,6 +145,7 @@ public void setup() throws Exception { doReturn(false).when(namespaces).isRequestHttps(); doReturn("test").when(namespaces).clientAppId(); doReturn(null).when(namespaces).originalPrincipal(); + doReturn(null).when(namespaces).clientAuthData(); doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters(); doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant); doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant"); @@ -987,6 +988,7 @@ public void testValidateTopicOwnership() throws Exception { doReturn(false).when(topics).isRequestHttps(); doReturn("test").when(topics).clientAppId(); doReturn(null).when(topics).originalPrincipal(); + doReturn(null).when(topics).clientAuthData(); mockWebUrl(localWebServiceUrl, testNs); doReturn("persistent").when(topics).domain(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 8b7e92e6f51d0..ed7a0ddd9660a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -42,6 +42,8 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -109,13 +111,16 @@ void setup(Method method) throws Exception { Set providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); + config.setAuthorizationEnabled(true); config.setAuthenticationProviders(providers); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); functionsWorkerService = spy(createPulsarFunctionWorker(config)); AuthenticationService authenticationService = new AuthenticationService(config); + AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class)); when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService); + when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService); when(functionsWorkerService.isInitialized()).thenReturn(true); PulsarAdmin admin = mock(PulsarAdmin.class); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index 8b3a012120fcb..de3fef1026d92 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -155,7 +155,7 @@ protected static void checkAuthorization(DiscoveryService service, TopicName top throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", topicName.getTenant(), e.getMessage())); } - if (!tenantInfo.getAdminRoles().contains(role)) { + if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) { throw new IllegalAccessException("Don't have permission to administrate resources on this property"); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 98cc3a5eb83bc..cab2b07e4022b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1449,10 +1449,10 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo if (clientRole != null) { try { TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant); - if (tenantInfo != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(clientRole)) { + if (tenantInfo != null && worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, tenantInfo, authenticationData).get()) { return true; } - } catch (PulsarAdminException.NotFoundException e) { + } catch (PulsarAdminException.NotFoundException | InterruptedException | ExecutionException e) { } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 258c123797745..cd55b2aee0396 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -20,6 +20,7 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; @@ -230,51 +232,56 @@ public void testMetricsEmpty() { } @Test - public void testIsAuthorizedRole() throws PulsarAdminException { - + public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedException, ExecutionException { + TenantInfo tenantInfo = new TenantInfo(); + AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); + AuthorizationService authorizationService = mock(AuthorizationService.class); + doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService(); WorkerConfig workerConfig = new WorkerConfig(); workerConfig.setAuthorizationEnabled(true); workerConfig.setSuperUserRoles(Collections.singleton(superUser)); doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig(); // test super user - AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource)); // test normal user functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any()); Tenants tenants = mock(Tenants.class); - when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo()); + when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); PulsarAdmin admin = mock(PulsarAdmin.class); when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); + when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false)); assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // if user is tenant admin functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any()); tenants = mock(Tenants.class); - TenantInfo tenantInfo = new TenantInfo(); tenantInfo.setAdminRoles(Collections.singleton("test-user")); when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); admin = mock(PulsarAdmin.class); when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); + when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true)); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // test user allow function action functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); doReturn(true).when(functionImpl).allowFunctionOps(any(), any(), any()); tenants = mock(Tenants.class); - when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo()); + tenantInfo.setAdminRoles(Collections.emptySet()); + when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); admin = mock(PulsarAdmin.class); when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); + when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true)); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // test role is null diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index 28f28cdcfac24..e055872795cc1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -153,7 +153,7 @@ protected static void checkAuthorization(ProxyService service, TopicName topicNa throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", topicName.getTenant(), e.getMessage())); } - if (!tenantInfo.getAdminRoles().contains(role)) { + if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) { throw new IllegalAccessException("Don't have permission to administrate resources on this tenant"); } }