diff --git a/cwms-data-api/build.gradle b/cwms-data-api/build.gradle index d0f2a5335..4cea0e1de 100644 --- a/cwms-data-api/build.gradle +++ b/cwms-data-api/build.gradle @@ -13,7 +13,7 @@ configurations { } configurations.implementation { - exclude group: 'com.oracle.database.jdbc' + exclude group: 'com.oracle.database.jdbc', module: 'ojdbc' } dependencies { @@ -114,6 +114,21 @@ dependencies { implementation(libs.bundles.jackson) + implementation(libs.aqapi) + implementation(libs.jmscommon) + //For some reason the caffeine transitive dependency makes gradle angry + implementation(libs.activemq.artemis.server) { + exclude group: "com.github.ben-manes.caffeine", module: "caffeine" + } + implementation(libs.activemq.artemis.client) { + exclude group: "com.github.ben-manes.caffeine", module: "caffeine" + } + implementation(libs.activemq.artemis.stomp) { + exclude group: "com.github.ben-manes.caffeine", module: "caffeine" + } + implementation(libs.camel.core) + implementation(libs.camel.jms) + testImplementation(libs.bundles.junit) testRuntimeOnly(libs.junit.jupiter.engine) testImplementation(libs.mockito.core) @@ -228,6 +243,7 @@ task run(type: JavaExec) { mainClass = "fixtures.TomcatServer" systemProperties += project.properties.findAll { k, v -> k.startsWith("RADAR") } systemProperties += project.properties.findAll { k, v -> k.startsWith("CDA") } + systemProperties += project.properties.findAll { k, v -> k.startsWith("cwms") } def context = project.findProperty("cda.war.context") ?: "spk-data" diff --git a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java index bbc2aa3c4..70ec5ed8a 100644 --- a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java +++ b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java @@ -25,12 +25,6 @@ package cwms.cda; import static cwms.cda.api.Controllers.NAME; -import cwms.cda.api.DownstreamLocationsGetController; -import cwms.cda.api.LookupTypeController; -import cwms.cda.api.StreamController; -import cwms.cda.api.StreamLocationController; -import cwms.cda.api.StreamReachController; -import cwms.cda.api.UpstreamLocationsGetController; import static io.javalin.apibuilder.ApiBuilder.crud; import static io.javalin.apibuilder.ApiBuilder.delete; import static io.javalin.apibuilder.ApiBuilder.get; @@ -54,6 +48,7 @@ import cwms.cda.api.ClobController; import cwms.cda.api.Controllers; import cwms.cda.api.CountyController; +import cwms.cda.api.DownstreamLocationsGetController; import cwms.cda.api.EmbankmentController; import cwms.cda.api.ForecastFileController; import cwms.cda.api.ForecastInstanceController; @@ -63,6 +58,7 @@ import cwms.cda.api.LocationCategoryController; import cwms.cda.api.LocationController; import cwms.cda.api.LocationGroupController; +import cwms.cda.api.LookupTypeController; import cwms.cda.api.OfficeController; import cwms.cda.api.ParametersController; import cwms.cda.api.PoolController; @@ -75,6 +71,9 @@ import cwms.cda.api.SpecifiedLevelController; import cwms.cda.api.StandardTextController; import cwms.cda.api.StateController; +import cwms.cda.api.StreamController; +import cwms.cda.api.StreamLocationController; +import cwms.cda.api.StreamReachController; import cwms.cda.api.TextTimeSeriesController; import cwms.cda.api.TextTimeSeriesValueController; import cwms.cda.api.TimeSeriesCategoryController; @@ -88,6 +87,7 @@ import cwms.cda.api.TurbineChangesPostController; import cwms.cda.api.TurbineController; import cwms.cda.api.UnitsController; +import cwms.cda.api.UpstreamLocationsGetController; import cwms.cda.api.auth.ApiKeyController; import cwms.cda.api.enums.UnitSystem; import cwms.cda.api.errors.AlreadyExists; @@ -98,6 +98,7 @@ import cwms.cda.api.errors.JsonFieldsException; import cwms.cda.api.errors.NotFoundException; import cwms.cda.api.errors.RequiredQueryParameterException; +import cwms.cda.api.messaging.CdaTopicHandler; import cwms.cda.data.dao.JooqDao; import cwms.cda.formatters.Formats; import cwms.cda.formatters.FormattingException; @@ -148,6 +149,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.sql.DataSource; + import org.apache.http.entity.ContentType; import org.jetbrains.annotations.NotNull; import org.owasp.html.HtmlPolicyBuilder; @@ -186,7 +188,7 @@ "/projects/*", "/properties/*", "/lookup-types/*", - "/embankments/*" + "/cda-topics" }) public class ApiServlet extends HttpServlet { @@ -218,12 +220,13 @@ public class ApiServlet extends HttpServlet { @Resource(name = "jdbc/CWMS3") DataSource cwms; - + private CdaTopicHandler cdaTopicHandler; @Override public void destroy() { javalin.destroy(); + cdaTopicHandler.shutdown(); } @Override @@ -237,7 +240,7 @@ public void init(ServletConfig config) throws ServletException { @SuppressWarnings({"java:S125","java:S2095"}) // closed in destroy handler @Override - public void init() { + public void init() throws ServletException { JavalinValidation.register(UnitSystem.class, UnitSystem::systemFor); JavalinValidation.register(JooqDao.DeleteMethod.class, Controllers::getDeleteMethod); @@ -513,6 +516,12 @@ protected void configureRoutes() { new PropertyController(metrics), requiredRoles,1, TimeUnit.DAYS); cdaCrudCache(format("/lookup-types/{%s}", Controllers.NAME), new LookupTypeController(metrics), requiredRoles,1, TimeUnit.DAYS); + if(Boolean.getBoolean("cwms.data.api.messaging.enabled")) { + //TODO: setup separate data source for persistent connections to Oracle AQ + cdaTopicHandler = new CdaTopicHandler(cwms, metrics); + get("/cda-topics", cdaTopicHandler); + addCacheControl("/cda-topics", 1, TimeUnit.DAYS); + } } /** diff --git a/cwms-data-api/src/main/java/cwms/cda/api/messaging/ArtemisSecurityManager.java b/cwms-data-api/src/main/java/cwms/cda/api/messaging/ArtemisSecurityManager.java new file mode 100644 index 000000000..cb762a35e --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/messaging/ArtemisSecurityManager.java @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.api.messaging; + +import static cwms.cda.ApiServlet.CWMS_USERS_ROLE; + +import com.google.common.flogger.FluentLogger; +import cwms.cda.data.dao.AuthDao; +import cwms.cda.security.CwmsAuthException; +import cwms.cda.security.DataApiPrincipal; +import java.util.Set; +import javax.sql.DataSource; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; + +final class ArtemisSecurityManager implements ActiveMQSecurityManager { + private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass(); + private final DataSource dataSource; + private final String cdaUser; + + ArtemisSecurityManager(DataSource dataSource) { + this.dataSource = dataSource; + cdaUser = DSL.using(dataSource, SQLDialect.ORACLE18C) + .connectionResult(c -> c.getMetaData().getUserName()); + } + + @Override + public boolean validateUser(String user, String password) { + return validate(user, password); + } + + @Override + public boolean validateUserAndRole(String user, String password, Set roles, CheckType checkType) { + //CDA User is allowed to send and manage messages for the invm acceptor. + //Other users are not allowed to send messages. + if (!cdaUser.equalsIgnoreCase(user) && (checkType == CheckType.SEND || checkType == CheckType.MANAGE)) { + LOGGER.atWarning().log("User: " + user + + " attempting to access Artemis Server with check type: " + checkType + + " Only message consumption is supported."); + return false; + } + return validate(user, password); + } + + private boolean validate(String user, String password) { + AuthDao instance = AuthDao.getInstance(DSL.using(dataSource, SQLDialect.ORACLE18C)); + boolean retval = false; + try { + DataApiPrincipal principal = instance.getByApiKey(password); + retval = principal.getName().equalsIgnoreCase(user) + && principal.getRoles().contains(new cwms.cda.security.Role(CWMS_USERS_ROLE)); + } catch (CwmsAuthException ex) { + LOGGER.atWarning().withCause(ex).log("Unauthenticated user: " + user + + " attempting to access Artemis Server"); + } + return retval; + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/messaging/CamelRouter.java b/cwms-data-api/src/main/java/cwms/cda/api/messaging/CamelRouter.java new file mode 100644 index 000000000..03e3cc69c --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/messaging/CamelRouter.java @@ -0,0 +1,242 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.api.messaging; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.table; + +import com.google.common.flogger.FluentLogger; +import cwms.cda.ApiServlet; +import cwms.cda.data.dao.AuthDao; +import cwms.cda.data.dto.auth.ApiKey; +import cwms.cda.security.DataApiPrincipal; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.ConnectionFactory; +import javax.jms.TopicConnectionFactory; +import javax.sql.DataSource; +import oracle.jms.AQjmsFactory; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.component.jms.JmsComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record1; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; + +final class CamelRouter implements ActiveMQServerPlugin { + private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass(); + private static final String ORACLE_QUEUE_SOURCE = "oracleAQ"; + private static final String ARTEMIS_QUEUE_SOURCE = "artemis"; + private final CamelContext camelContext; + private final Map routeDefinitions; + private final String oracleAqClientId; + + CamelRouter(DataSource cwms) throws Exception { + oracleAqClientId = getClientId(); + camelContext = initCamel(cwms); + routeDefinitions = buildRouteDefinitions(cwms); + camelContext.addRouteDefinitions(routeDefinitions.values()); + } + + private CamelContext initCamel(DataSource cwms) { + try { + //wrapped DelegatingDataSource is used because internally AQJMS casts the returned connection + //as an OracleConnection, but the JNDI pool is returning us a proxy, so unwrap it + DefaultCamelContext camel = new DefaultCamelContext(); + DataSourceWrapper dataSource = new DataSourceWrapper(cwms); + TopicConnectionFactory connectionFactory = AQjmsFactory.getTopicConnectionFactory(dataSource, true); + camel.addComponent(ORACLE_QUEUE_SOURCE, JmsComponent.jmsComponent(connectionFactory)); + + DSLContext context = DSL.using(cwms, SQLDialect.ORACLE18C); + String cdaUser = context + .connectionResult(c -> c.getMetaData().getUserName()); + String apiKey = createApiKey(context, cdaUser); + ConnectionFactory artemisConnectionFactory = new ActiveMQJMSConnectionFactory("vm://0", cdaUser, apiKey); + camel.addComponent(ARTEMIS_QUEUE_SOURCE, JmsComponent.jmsComponent(artemisConnectionFactory)); + camel.start(); + return camel; + } catch (Exception e) { + throw new IllegalStateException("Unable to setup Queues", e); + } + } + + private Map buildRouteDefinitions(DataSource cwms) { + DSLContext create = DSL.using(cwms, SQLDialect.ORACLE18C); + Field field = field(name("OWNER")).concat(".").concat(field(name("NAME"))).as("queue"); + return create.select(field) + .from(table(name("DBA_QUEUES"))) + .where(field(name("OWNER")).eq("CWMS_20")) + .and(field(name("QUEUE_TYPE")).eq("NORMAL_QUEUE")) + .fetch() + .stream() + .map(Record1::component1) + .distinct() + .map(OracleQueue::new) + .collect(toMap(q -> q, this::queueToRoute)); + } + + private RouteDefinition queueToRoute(OracleQueue queue) { + RouteDefinition routeDefinition = new RouteDefinition(); + String durableSub = (ApiServlet.APPLICATION_TITLE + "_" + queue.getOracleQueueName()) + .replace(" ", "_") + .replace(".", "_"); + String fromOracleRoute = format("%s:topic:%s?durableSubscriptionName=%s&clientId=%s", ORACLE_QUEUE_SOURCE, + queue.getOracleQueueName(), durableSub, oracleAqClientId); + String[] topics = queue.getTopicIds() + .stream() + .map(CamelRouter::createArtemisLabel) + .toArray(String[]::new); + routeDefinition.id(queue.getOracleQueueName()); + routeDefinition.from(fromOracleRoute) + .log("Received message from ActiveMQ.Queue : ${body}") + .process(new MapMessageToJsonProcessor(camelContext)) + .to(topics) + .autoStartup(false); + return routeDefinition; + } + + private static String getClientId() { + try { + String host = InetAddress.getLocalHost().getCanonicalHostName().replace("/", "_"); + return "CDA_" + host.replace(".", "_").replace(":", "_"); + } catch (UnknownHostException e) { + throw new IllegalStateException("Cannot obtain local host name for durable subscription queue setup", e); + } + } + + @Override + public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { + String routeId = consumer.getQueueAddress().toString(); + String label = createArtemisLabel(routeId); + List routeDefinition = routeDefinitions.values() + .stream() + .filter(r -> r.getOutputs().stream().anyMatch(o -> o.getLabel().equals(label))) + .collect(toList()); + if (routeDefinition.isEmpty()) { + throw new ActiveMQException(ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST, + "Route for id: " + routeId + " does not exit"); + } + try { + for (RouteDefinition route : routeDefinition) { + //Camel handles synchronization internally + //Calling startRoute on an already started route is innocuous + camelContext.startRoute(route.getId()); + } + } catch (Exception e) { + throw new ActiveMQException("Could not start route: " + routeId, e, + ActiveMQExceptionType.GENERIC_EXCEPTION); + } + } + + Collection getTopics(String office) { + return routeDefinitions.keySet().stream() + .filter(q -> office == null || q.office.equalsIgnoreCase(office)) + .map(OracleQueue::getTopicIds) + .flatMap(Collection::stream) + .collect(toSet()); + } + + private static String createArtemisLabel(String routeId) { + return format("%s:topic:%s", ARTEMIS_QUEUE_SOURCE, routeId); + } + + void stop() throws Exception { + camelContext.stop(); + } + + private String createApiKey(DSLContext context, String user) { + AuthDao instance = AuthDao.getInstance(context); + UUID uuid = UUID.randomUUID(); + DataApiPrincipal principal = new DataApiPrincipal(user, new HashSet<>()); + ZonedDateTime now = ZonedDateTime.now(); + //TODO: Expiration should be handled more gracefully. + // This assumes no new queues are accessed after three months of uptime + //TODO: cda_camel_invm needs to be unique per instance of CDA. Not sure how to handle that at the moment. + // for now using current epoch millis. This unfortunately leaves old keys between restarts. + String keyName = "cda_camel_invm_" + Instant.now().toEpochMilli(); + ApiKey apiKey = new ApiKey(user, keyName, uuid.toString(), now, now.plusMonths(3)); + return instance.createApiKey(principal, apiKey).getApiKey(); + } + + private static final class OracleQueue { + private static final Pattern ORACLE_QUEUE_PATTERN = + Pattern.compile("CWMS_20\\.(?[A-Z]+)_(?.*)"); + private final String oracleQueueName; + private final String office; + private final String queueGroup; + + private OracleQueue(String oracleQueueName) { + this.oracleQueueName = oracleQueueName; + Matcher matcher = ORACLE_QUEUE_PATTERN.matcher(oracleQueueName); + if (matcher.matches()) { + this.office = matcher.group("office"); + this.queueGroup = matcher.group("queueGroup"); + } else { + LOGGER.atInfo().log("Oracle queue:" + oracleQueueName + " did not match standard pattern: " + + ORACLE_QUEUE_PATTERN.pattern() + " Artemis topic will use the Oracle queue name as-is."); + this.office = null; + this.queueGroup = null; + } + } + + private String getOracleQueueName() { + return this.oracleQueueName; + } + + private Set getTopicIds() { + Set retval = new HashSet<>(); + if (this.office != null && queueGroup != null) { + retval.add("CDA." + this.office + ".ALL"); + retval.add("CDA." + this.office + "." + this.queueGroup); + } else { + retval.add(this.oracleQueueName); + } + return retval; + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/messaging/CdaTopicHandler.java b/cwms-data-api/src/main/java/cwms/cda/api/messaging/CdaTopicHandler.java new file mode 100644 index 000000000..f664ac748 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/messaging/CdaTopicHandler.java @@ -0,0 +1,173 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.api.messaging; + +import static com.codahale.metrics.MetricRegistry.name; +import static cwms.cda.api.Controllers.GET_ALL; +import static cwms.cda.api.Controllers.OFFICE; +import static cwms.cda.api.Controllers.RESULTS; +import static cwms.cda.api.Controllers.SIZE; +import static cwms.cda.api.Controllers.STATUS_200; +import static java.util.stream.Collectors.toList; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.flogger.FluentLogger; +import cwms.cda.api.Controllers; +import cwms.cda.data.dto.messaging.CdaTopics; +import cwms.cda.formatters.ContentType; +import cwms.cda.formatters.Formats; +import io.javalin.core.util.Header; +import io.javalin.http.Context; +import io.javalin.http.Handler; +import io.javalin.plugin.openapi.annotations.HttpMethod; +import io.javalin.plugin.openapi.annotations.OpenApi; +import io.javalin.plugin.openapi.annotations.OpenApiContent; +import io.javalin.plugin.openapi.annotations.OpenApiParam; +import io.javalin.plugin.openapi.annotations.OpenApiResponse; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.sql.DataSource; +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.impl.FileConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.jetbrains.annotations.NotNull; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +public final class CdaTopicHandler implements Handler { + private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass(); + private static final String TAG = "Messaging"; + private final MetricRegistry metrics; + private final Histogram requestResultSize; + private ActiveMQServer artemis; + private CamelRouter router; + + public CdaTopicHandler(DataSource cwms, MetricRegistry metrics) { + this.metrics = metrics; + this.requestResultSize = this.metrics.histogram((name(CdaTopicHandler.class.getName(), RESULTS, SIZE))); + String brokerFile = System.getProperty("cwms.data.api.messaging.artemis.broker.file", + System.getenv("CDA_ARTEMIS_BROKER_FILE")); + if (brokerFile == null) { + return; + } + File brokerXmlFile = new File(brokerFile).getAbsoluteFile(); + if (!brokerXmlFile.exists()) { + return; + } + try { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + dbFactory.setExpandEntityReferences(false); + dbFactory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + dbFactory.setNamespaceAware(true); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.parse(brokerXmlFile); + doc.getDocumentElement().normalize(); + Element rootElement = doc.getDocumentElement(); + FileConfiguration configuration = new FileConfiguration(); + configuration.parse(rootElement, brokerXmlFile.toURI().toURL()); + artemis = ActiveMQServers.newActiveMQServer(configuration); + router = new CamelRouter(cwms); + artemis.registerBrokerPlugin(router); + artemis.setSecurityManager(new ArtemisSecurityManager(cwms)); + artemis.start(); + } catch (Exception e) { + throw new IllegalStateException("Unable to setup Queues", e); + } + } + + private Timer.Context markAndTime(String subject) { + return Controllers.markAndTime(metrics, getClass().getName(), subject); + } + + @OpenApi( + description = "Request the list of supported CDA topics in alphabetical order. " + + "Additional information for the host address of the messaging server is also provided.", + queryParams = { + @OpenApiParam(name = OFFICE, + description = "Specifies the owning office. If this field is not " + + "specified, matching information from all offices shall be " + + "returned."), + }, + responses = {@OpenApiResponse(status = STATUS_200, + description = "A list of supported CDA topics.", + content = { + @OpenApiContent(type = Formats.JSONV1, from = CdaTopics.class), + @OpenApiContent(type = Formats.JSON, from = CdaTopics.class) + }) + }, + method = HttpMethod.GET, + tags = {TAG} + ) + @Override + public void handle(@NotNull Context ctx) throws Exception { + try (final Timer.Context ignored = markAndTime(GET_ALL)) { + String office = ctx.queryParam(OFFICE); + String formatHeader = ctx.header(Header.ACCEPT); + ContentType contentType = Formats.parseHeader(formatHeader, CdaTopics.class); + Collection topics = router.getTopics(office); + List> configurations = new ArrayList<>(); + if (artemis.isStarted()) { + configurations = artemis.getConfiguration().getAcceptorConfigurations().stream() + .map(TransportConfiguration::getParams) + //Need to filter out the In-VM acceptor + .filter(s -> s.containsKey("host")) + .collect(toList()); + } + Set protocols = artemis.getRemotingService().getProtocolFactoryMap().keySet(); + CdaTopics cdaTopics = new CdaTopics(configurations, protocols, topics); + String result = Formats.format(contentType, cdaTopics); + ctx.result(result); + ctx.contentType(contentType.toString()); + requestResultSize.update(result.length()); + } + } + + public void shutdown() { + if (artemis != null) { + try { + artemis.stop(); + } catch (Exception e) { + LOGGER.atWarning().withCause(e).log("Unable to stop Artemis server during servlet shutdown"); + } + } + if (router != null) { + try { + router.stop(); + } catch (Exception e) { + LOGGER.atWarning().withCause(e).log("Unable to stop Camel Route Handler during servlet shutdown"); + } + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/messaging/DataSourceWrapper.java b/cwms-data-api/src/main/java/cwms/cda/api/messaging/DataSourceWrapper.java new file mode 100644 index 000000000..58ff95cb7 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/messaging/DataSourceWrapper.java @@ -0,0 +1,115 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.api.messaging; + +import oracle.jdbc.driver.OracleConnection; + +import javax.sql.DataSource; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.logging.Logger; + + +/** + * This class is a wrapper around a DataSource that delegates all calls to the + * wrapped DataSource. It is intended to be extended by classes that need to + * override DataSource methods. + */ +public class DataSourceWrapper implements DataSource { + + + private DataSource delegate; + + /** + * Create a new DelegatingDataSource. + * @param delegate the target DataSource + */ + public DataSourceWrapper(DataSource delegate) { + //wrapped DelegatingDataSource is used because internally AQJMS casts the returned connection + //as an OracleConnection, but the JNDI pool is returning us a proxy, so unwrap it + this.delegate = delegate; + } + + /** + * Return the target DataSource that this DataSource should delegate to. + */ + + public DataSource getDelegate() { + return this.delegate; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return getDelegate().getLogWriter(); + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + getDelegate().setLogWriter(out); + } + + @Override + public int getLoginTimeout() throws SQLException { + return getDelegate().getLoginTimeout(); + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + getDelegate().setLoginTimeout(seconds); + } + + + + @Override + @SuppressWarnings("unchecked") + public T unwrap(Class iface) throws SQLException { + if (iface.isInstance(this)) { + return (T) this; + } + return getDelegate().unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return (iface.isInstance(this) || getDelegate().isWrapperFor(iface)); + } + + + @Override + public Logger getParentLogger() { + return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); + } + + @Override + public Connection getConnection() throws SQLException { + return getDelegate().getConnection().unwrap(OracleConnection.class); + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return getDelegate().getConnection(username, password).unwrap(OracleConnection.class); + } +} \ No newline at end of file diff --git a/cwms-data-api/src/main/java/cwms/cda/api/messaging/MapMessageToJsonProcessor.java b/cwms-data-api/src/main/java/cwms/cda/api/messaging/MapMessageToJsonProcessor.java new file mode 100644 index 000000000..cf3e0698a --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/messaging/MapMessageToJsonProcessor.java @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.api.messaging; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.jms.JmsMessage; + +import javax.jms.MapMessage; +import java.util.Map; + +final class MapMessageToJsonProcessor implements Processor { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final CamelContext context; + + MapMessageToJsonProcessor(CamelContext context) { + this.context = context; + } + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + Message inMessage = exchange.getIn(); + //If we use types other than MapMessage or TextMessage, we'd need to handle here + if (((JmsMessage) inMessage).getJmsMessage() instanceof MapMessage) { + Map map = inMessage.getBody(Map.class); + String payload = null; + + if (map != null) { + payload = OBJECT_MAPPER.writeValueAsString(map); + } + inMessage.setBody(payload); + inMessage.setHeader(Exchange.CONTENT_TYPE, "application/json"); + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/messaging/CdaTopics.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/messaging/CdaTopics.java new file mode 100644 index 000000000..420cf556f --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/messaging/CdaTopics.java @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.data.dto.messaging; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import cwms.cda.data.dto.CwmsDTOBase; +import cwms.cda.formatters.Formats; +import cwms.cda.formatters.annotations.FormattableWith; +import cwms.cda.formatters.json.JsonV1; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +@FormattableWith(contentType = Formats.JSONV1, formatter = JsonV1.class, aliases = {Formats.JSON, Formats.DEFAULT}) +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) +public final class CdaTopics extends CwmsDTOBase { + private List> serverConfigurations = new ArrayList<>(); + private final NavigableSet supportedProtocols = new TreeSet<>(); + private final NavigableSet topics = new TreeSet<>(); + + public CdaTopics() { + } + + public CdaTopics(List> serverConfigurations, Collection supportedProtocols, Collection topics) { + this.serverConfigurations.addAll(serverConfigurations); + this.supportedProtocols.addAll(supportedProtocols); + this.topics.addAll(topics); + } + + public List> getServerConfigurations() { + return serverConfigurations; + } + + public NavigableSet getSupportedProtocols() { + return this.supportedProtocols; + } + + public NavigableSet getTopics() { + return this.topics; + } + + public void setServerConfigurations(List> serverConfigurations) { + this.serverConfigurations = serverConfigurations; + } + + public void setSupportedProtocols(Set supportedProtocols) { + + this.supportedProtocols.addAll(supportedProtocols); + } + + public void setTopics(Set topics) { + this.topics.addAll(topics); + } +} diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/messaging/cda_topics.json b/cwms-data-api/src/test/resources/cwms/cda/data/dto/messaging/cda_topics.json new file mode 100644 index 000000000..d8736e85d --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/messaging/cda_topics.json @@ -0,0 +1,275 @@ +{ + "server-configurations": [ + { + "scheme": "tcp", + "port": "61616", + "host": "localhost" + } + ], + "supported-protocols": [ + "CORE", + "STOMP" + ], + "topics": [ + "CDA.CERL.ALL", + "CDA.CERL.REALTIME_OPS", + "CDA.CERL.STATUS", + "CDA.CERL.TS_STORED", + "CDA.CHL.ALL", + "CDA.CHL.REALTIME_OPS", + "CDA.CHL.STATUS", + "CDA.CHL.TS_STORED", + "CDA.CPC.ALL", + "CDA.CPC.REALTIME_OPS", + "CDA.CPC.STATUS", + "CDA.CPC.TS_STORED", + "CDA.CRREL.ALL", + "CDA.CRREL.REALTIME_OPS", + "CDA.CRREL.STATUS", + "CDA.CRREL.TS_STORED", + "CDA.EL.ALL", + "CDA.EL.REALTIME_OPS", + "CDA.EL.STATUS", + "CDA.EL.TS_STORED", + "CDA.ERD.ALL", + "CDA.ERD.REALTIME_OPS", + "CDA.ERD.STATUS", + "CDA.ERD.TS_STORED", + "CDA.GSL.ALL", + "CDA.GSL.REALTIME_OPS", + "CDA.GSL.STATUS", + "CDA.GSL.TS_STORED", + "CDA.HEC.ALL", + "CDA.HEC.REALTIME_OPS", + "CDA.HEC.STATUS", + "CDA.HEC.TS_STORED", + "CDA.ITL.ALL", + "CDA.ITL.REALTIME_OPS", + "CDA.ITL.STATUS", + "CDA.ITL.TS_STORED", + "CDA.IWR.ALL", + "CDA.IWR.REALTIME_OPS", + "CDA.IWR.STATUS", + "CDA.IWR.TS_STORED", + "CDA.LCRA.ALL", + "CDA.LCRA.REALTIME_OPS", + "CDA.LCRA.STATUS", + "CDA.LCRA.TS_STORED", + "CDA.LRB.ALL", + "CDA.LRB.REALTIME_OPS", + "CDA.LRB.STATUS", + "CDA.LRB.TS_STORED", + "CDA.LRC.ALL", + "CDA.LRC.REALTIME_OPS", + "CDA.LRC.STATUS", + "CDA.LRC.TS_STORED", + "CDA.LRD.ALL", + "CDA.LRD.REALTIME_OPS", + "CDA.LRD.STATUS", + "CDA.LRD.TS_STORED", + "CDA.LRDG.ALL", + "CDA.LRDG.REALTIME_OPS", + "CDA.LRDG.STATUS", + "CDA.LRDG.TS_STORED", + "CDA.LRDO.ALL", + "CDA.LRDO.REALTIME_OPS", + "CDA.LRDO.STATUS", + "CDA.LRDO.TS_STORED", + "CDA.LRE.ALL", + "CDA.LRE.REALTIME_OPS", + "CDA.LRE.STATUS", + "CDA.LRE.TS_STORED", + "CDA.LRH.ALL", + "CDA.LRH.REALTIME_OPS", + "CDA.LRH.STATUS", + "CDA.LRH.TS_STORED", + "CDA.LRL.ALL", + "CDA.LRL.REALTIME_OPS", + "CDA.LRL.STATUS", + "CDA.LRL.TS_STORED", + "CDA.LRN.ALL", + "CDA.LRN.REALTIME_OPS", + "CDA.LRN.STATUS", + "CDA.LRN.TS_STORED", + "CDA.LRP.ALL", + "CDA.LRP.REALTIME_OPS", + "CDA.LRP.STATUS", + "CDA.LRP.TS_STORED", + "CDA.MVD.ALL", + "CDA.MVD.REALTIME_OPS", + "CDA.MVD.STATUS", + "CDA.MVD.TS_STORED", + "CDA.MVK.ALL", + "CDA.MVK.REALTIME_OPS", + "CDA.MVK.STATUS", + "CDA.MVK.TS_STORED", + "CDA.MVM.ALL", + "CDA.MVM.REALTIME_OPS", + "CDA.MVM.STATUS", + "CDA.MVM.TS_STORED", + "CDA.MVN.ALL", + "CDA.MVN.REALTIME_OPS", + "CDA.MVN.STATUS", + "CDA.MVN.TS_STORED", + "CDA.MVP.ALL", + "CDA.MVP.REALTIME_OPS", + "CDA.MVP.STATUS", + "CDA.MVP.TS_STORED", + "CDA.MVR.ALL", + "CDA.MVR.REALTIME_OPS", + "CDA.MVR.STATUS", + "CDA.MVR.TS_STORED", + "CDA.MVS.ALL", + "CDA.MVS.REALTIME_OPS", + "CDA.MVS.STATUS", + "CDA.MVS.TS_STORED", + "CDA.NAB.ALL", + "CDA.NAB.REALTIME_OPS", + "CDA.NAB.STATUS", + "CDA.NAB.TS_STORED", + "CDA.NAD.ALL", + "CDA.NAD.REALTIME_OPS", + "CDA.NAD.STATUS", + "CDA.NAD.TS_STORED", + "CDA.NAE.ALL", + "CDA.NAE.REALTIME_OPS", + "CDA.NAE.STATUS", + "CDA.NAE.TS_STORED", + "CDA.NAN.ALL", + "CDA.NAN.REALTIME_OPS", + "CDA.NAN.STATUS", + "CDA.NAN.TS_STORED", + "CDA.NAO.ALL", + "CDA.NAO.REALTIME_OPS", + "CDA.NAO.STATUS", + "CDA.NAO.TS_STORED", + "CDA.NAP.ALL", + "CDA.NAP.REALTIME_OPS", + "CDA.NAP.STATUS", + "CDA.NAP.TS_STORED", + "CDA.NDC.ALL", + "CDA.NDC.REALTIME_OPS", + "CDA.NDC.STATUS", + "CDA.NDC.TS_STORED", + "CDA.NWD.ALL", + "CDA.NWD.REALTIME_OPS", + "CDA.NWD.STATUS", + "CDA.NWD.TS_STORED", + "CDA.NWDM.ALL", + "CDA.NWDM.REALTIME_OPS", + "CDA.NWDM.STATUS", + "CDA.NWDM.TS_STORED", + "CDA.NWDP.ALL", + "CDA.NWDP.REALTIME_OPS", + "CDA.NWDP.STATUS", + "CDA.NWDP.TS_STORED", + "CDA.NWK.ALL", + "CDA.NWK.REALTIME_OPS", + "CDA.NWK.STATUS", + "CDA.NWK.TS_STORED", + "CDA.NWO.ALL", + "CDA.NWO.REALTIME_OPS", + "CDA.NWO.STATUS", + "CDA.NWO.TS_STORED", + "CDA.NWP.ALL", + "CDA.NWP.REALTIME_OPS", + "CDA.NWP.STATUS", + "CDA.NWP.TS_STORED", + "CDA.NWS.ALL", + "CDA.NWS.REALTIME_OPS", + "CDA.NWS.STATUS", + "CDA.NWS.TS_STORED", + "CDA.NWW.ALL", + "CDA.NWW.REALTIME_OPS", + "CDA.NWW.STATUS", + "CDA.NWW.TS_STORED", + "CDA.POA.ALL", + "CDA.POA.REALTIME_OPS", + "CDA.POA.STATUS", + "CDA.POA.TS_STORED", + "CDA.POD.ALL", + "CDA.POD.REALTIME_OPS", + "CDA.POD.STATUS", + "CDA.POD.TS_STORED", + "CDA.POH.ALL", + "CDA.POH.REALTIME_OPS", + "CDA.POH.STATUS", + "CDA.POH.TS_STORED", + "CDA.SAC.ALL", + "CDA.SAC.REALTIME_OPS", + "CDA.SAC.STATUS", + "CDA.SAC.TS_STORED", + "CDA.SAD.ALL", + "CDA.SAD.REALTIME_OPS", + "CDA.SAD.STATUS", + "CDA.SAD.TS_STORED", + "CDA.SAJ.ALL", + "CDA.SAJ.REALTIME_OPS", + "CDA.SAJ.STATUS", + "CDA.SAJ.TS_STORED", + "CDA.SAM.ALL", + "CDA.SAM.REALTIME_OPS", + "CDA.SAM.STATUS", + "CDA.SAM.TS_STORED", + "CDA.SAS.ALL", + "CDA.SAS.REALTIME_OPS", + "CDA.SAS.STATUS", + "CDA.SAS.TS_STORED", + "CDA.SAW.ALL", + "CDA.SAW.REALTIME_OPS", + "CDA.SAW.STATUS", + "CDA.SAW.TS_STORED", + "CDA.SPA.ALL", + "CDA.SPA.REALTIME_OPS", + "CDA.SPA.STATUS", + "CDA.SPA.TS_STORED", + "CDA.SPD.ALL", + "CDA.SPD.REALTIME_OPS", + "CDA.SPD.STATUS", + "CDA.SPD.TS_STORED", + "CDA.SPK.ALL", + "CDA.SPK.REALTIME_OPS", + "CDA.SPK.STATUS", + "CDA.SPK.TS_STORED", + "CDA.SPL.ALL", + "CDA.SPL.REALTIME_OPS", + "CDA.SPL.STATUS", + "CDA.SPL.TS_STORED", + "CDA.SPN.ALL", + "CDA.SPN.REALTIME_OPS", + "CDA.SPN.STATUS", + "CDA.SPN.TS_STORED", + "CDA.SWD.ALL", + "CDA.SWD.REALTIME_OPS", + "CDA.SWD.STATUS", + "CDA.SWD.TS_STORED", + "CDA.SWF.ALL", + "CDA.SWF.REALTIME_OPS", + "CDA.SWF.STATUS", + "CDA.SWF.TS_STORED", + "CDA.SWG.ALL", + "CDA.SWG.REALTIME_OPS", + "CDA.SWG.STATUS", + "CDA.SWG.TS_STORED", + "CDA.SWL.ALL", + "CDA.SWL.REALTIME_OPS", + "CDA.SWL.STATUS", + "CDA.SWL.TS_STORED", + "CDA.SWT.ALL", + "CDA.SWT.REALTIME_OPS", + "CDA.SWT.STATUS", + "CDA.SWT.TS_STORED", + "CDA.TEC.ALL", + "CDA.TEC.REALTIME_OPS", + "CDA.TEC.STATUS", + "CDA.TEC.TS_STORED", + "CDA.WCSC.ALL", + "CDA.WCSC.REALTIME_OPS", + "CDA.WCSC.STATUS", + "CDA.WCSC.TS_STORED", + "CDA.WPC.ALL", + "CDA.WPC.REALTIME_OPS", + "CDA.WPC.STATUS", + "CDA.WPC.TS_STORED" + ] +} \ No newline at end of file diff --git a/cwms-data-api/src/test/resources/tomcat/conf/broker.xml b/cwms-data-api/src/test/resources/tomcat/conf/broker.xml new file mode 100644 index 000000000..884eab602 --- /dev/null +++ b/cwms-data-api/src/test/resources/tomcat/conf/broker.xml @@ -0,0 +1,38 @@ + + + + + + + ActiveMQServer + false + + tcp://localhost:61616?httpEnabled=true + vm://0 + + + \ No newline at end of file diff --git a/docs/uml/queues/artemis_architecture.jpg b/docs/uml/queues/artemis_architecture.jpg new file mode 100644 index 000000000..d2b9de476 Binary files /dev/null and b/docs/uml/queues/artemis_architecture.jpg differ diff --git a/docs/uml/queues/artemis_camel_oracle_queue.png b/docs/uml/queues/artemis_camel_oracle_queue.png new file mode 100644 index 000000000..28d68b25b Binary files /dev/null and b/docs/uml/queues/artemis_camel_oracle_queue.png differ diff --git a/docs/uml/queues/artemis_camel_oracle_queue.puml b/docs/uml/queues/artemis_camel_oracle_queue.puml new file mode 100644 index 000000000..6dfef1b99 --- /dev/null +++ b/docs/uml/queues/artemis_camel_oracle_queue.puml @@ -0,0 +1,18 @@ +@startuml +participant "Oracle Queue" as OQ +participant "Apache Camel" as AC +participant "Artemis Server" as AS +participant "Artemis Client API" as ACA + +OQ -> AC: Produce Message (AQAPI) +activate OQ +activate AC +AC -> AS: Consume/Transform/Route Message +deactivate OQ +activate AS +AS -> ACA: Deliver Message (JMS) +deactivate AC +activate ACA +ACA --> AS: Acknowledge Receipt (JMS) +deactivate ACA +@enduml \ No newline at end of file diff --git a/gradle.properties.example b/gradle.properties.example index f289fb6b3..60a30065a 100644 --- a/gradle.properties.example +++ b/gradle.properties.example @@ -26,4 +26,9 @@ cda.war.context=cwms-data #testcontainer.cwms.bypass.office.id=HQ ## eroc must be lower case. #testcontainer.cwms.bypass.office.eroc=l2 -#testcontainer.cwms.bypass.network=database_net \ No newline at end of file +#testcontainer.cwms.bypass.network=database_net + +## Turns on messaging via Oracle AQ/ArtemisMQ/Apache Camel +## - make sure to increase number of connections in pool to support queues +#cwms.data.api.messaging.enabled=true +#cwms.data.api.messaging.artemis.broker.file=src/test/resources/tomcat/conf/broker.xml \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4ee28ff94..fda528b5d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -74,6 +74,14 @@ jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-da jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-xml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-xml", version.ref = "jackson" } +aqapi = { module = "com.oracle.database.jdbc:aqapi", version = "19.3.0.0" } +jmscommon = { module = "com.oracle.database.jdbc:jmscommon", version = "19.3.0.0" } +activemq-artemis-server = { module = "org.apache.activemq:artemis-server", version = "2.19.1" } +activemq-artemis-client = { module = "org.apache.activemq:artemis-jms-client-all", version = "2.19.1" } +activemq-artemis-stomp = { module = "org.apache.activemq:artemis-stomp-protocol", version = "2.19.1" } +#apache camel 2.x is the last to support JDK 8 +camel-core = {module = 'org.apache.camel:camel-core', version ='2.25.4' } +camel-jms = {module = 'org.apache.camel:camel-jms', version ='2.25.4' } #compile compileOnly diff --git a/websocket-testing/package.json b/websocket-testing/package.json new file mode 100644 index 000000000..4e649c23b --- /dev/null +++ b/websocket-testing/package.json @@ -0,0 +1,17 @@ +{ + "name": "websocket-testing", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "node ./src/main/javascript/stomp-ws-testing.js" + }, + "type": "module", + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "@stomp/stompjs": "^7.0.0", + "ws": "^8.17.0" + } +} diff --git a/websocket-testing/src/main/javascript/stomp-ws-testing.js b/websocket-testing/src/main/javascript/stomp-ws-testing.js new file mode 100644 index 000000000..6dcdf2446 --- /dev/null +++ b/websocket-testing/src/main/javascript/stomp-ws-testing.js @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2024 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import {Client} from '@stomp/stompjs'; + +import {WebSocket} from 'ws'; + +Object.assign(global, {WebSocket}); +const client = new Client({ + logRawCommunication: true, + connectHeaders: { + login: 'M5HECTEST', + passcode: 'testkey', + }, + brokerURL: 'ws://localhost:61616/topic', connectionTimeout: 1000, onConnect: () => { + console.log("Connected") + client.subscribe('CDA.SWT.ALL', message => { + console.log(`Received: ${message.body} from CDA.SWT.ALL`); + message.ack(); + }, {ack: 'client'}); + client.subscribe('CDA.SWT.TS_STORED', message => { + console.log(`Received: ${message.body} from CDA.SWT.TS_STORED`); + message.ack(); + }, {ack: 'client'}); + }, onStompError: (frame) => { + console.log('Broker reported error: ' + frame.headers['message']); + console.log('Additional details: ' + frame.body); + } +}); + +client.activate(); \ No newline at end of file