From 4bf80c78b4d2d0dd9c835c4e8ef41e926687cd38 Mon Sep 17 00:00:00 2001 From: Amit Kumar Mondal Date: Sun, 16 Jul 2023 12:41:54 +0200 Subject: [PATCH] Added Compression Support in MQTT RPC --- .../com/osgifx/console/agent/rpc/mqtt/MqttRPC.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/com.osgifx.console.agent.api/src/main/java/com/osgifx/console/agent/rpc/mqtt/MqttRPC.java b/com.osgifx.console.agent.api/src/main/java/com/osgifx/console/agent/rpc/mqtt/MqttRPC.java index 2f4d24d7..78f86ac2 100644 --- a/com.osgifx.console.agent.api/src/main/java/com/osgifx/console/agent/rpc/mqtt/MqttRPC.java +++ b/com.osgifx.console.agent.api/src/main/java/com/osgifx/console/agent/rpc/mqtt/MqttRPC.java @@ -129,7 +129,7 @@ public void open() { } private RpcMessage decodeMessage(final ByteBuffer payload) throws Exception { - return new JSONCodec().dec().from(payload.array()).get(RpcMessage.class); + return new JSONCodec().dec().inflate().from(payload.array()).get(RpcMessage.class); } @Override @@ -220,7 +220,7 @@ private int send(final RpcMessage msg) throws Exception { synchronized (publisher) { final ByteArrayOutputStream bout = new ByteArrayOutputStream(); try { - new JSONCodec().enc().to(bout).put(msg); + new JSONCodec().enc().deflate().to(bout).put(msg); final ByteBuffer data = ByteBuffer.wrap(bout.toByteArray()); final Mqtt5Message message = new Mqtt5Message(); @@ -268,14 +268,14 @@ private T waitForResult(final int id, final Type type) throws Exception { return null; } if (result.exception) { - final String msg = new JSONCodec().dec().from(result.value).get(String.class); + final String msg = new JSONCodec().dec().inflate().from(result.value).get(String.class); trace("Exception during agent communication: " + msg); throw new RuntimeException(msg); } if (type == byte[].class) { return (T) result.value; } - return (T) new JSONCodec().dec().from(result.value).get(type); + return (T) new JSONCodec().dec().inflate().from(result.value).get(type); } long elapsedInNanos = System.nanoTime() - startInNanos; long delayInMillis = deadlineInMillis - TimeUnit.NANOSECONDS.toMillis(elapsedInNanos); @@ -318,7 +318,8 @@ private void executeCommand(final String cmd, final int id, final List a if (type == byte[].class) { parameters[i] = args.get(i); } else { - parameters[i] = new JSONCodec().dec().from(args.get(i)).get(m.getGenericParameterTypes()[i]); + parameters[i] = new JSONCodec().dec().inflate().from(args.get(i)) + .get(m.getGenericParameterTypes()[i]); } } try { @@ -355,7 +356,7 @@ private RpcMessage msg(final int msgId, final Method method, final Object[] args argValue = (byte[]) arg; } else { final ByteArrayOutputStream bout = new ByteArrayOutputStream(); - new JSONCodec().enc().to(bout).put(arg); + new JSONCodec().enc().deflate().to(bout).put(arg); argValue = bout.toByteArray(); } final String encodedValue = Base64.getEncoder().encodeToString(argValue);