Skip to content

Commit

Permalink
Added Compression Support in MQTT RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
amitjoy committed Jul 21, 2023
1 parent 9319c6a commit 4bf80c7
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void open() {
}

private RpcMessage decodeMessage(final ByteBuffer payload) throws Exception {
return new JSONCodec().dec().from(payload.array()).get(RpcMessage.class);
return new JSONCodec().dec().inflate().from(payload.array()).get(RpcMessage.class);
}

@Override
Expand Down Expand Up @@ -220,7 +220,7 @@ private int send(final RpcMessage msg) throws Exception {
synchronized (publisher) {
final ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
new JSONCodec().enc().to(bout).put(msg);
new JSONCodec().enc().deflate().to(bout).put(msg);

final ByteBuffer data = ByteBuffer.wrap(bout.toByteArray());
final Mqtt5Message message = new Mqtt5Message();
Expand Down Expand Up @@ -268,14 +268,14 @@ private <T> T waitForResult(final int id, final Type type) throws Exception {
return null;
}
if (result.exception) {
final String msg = new JSONCodec().dec().from(result.value).get(String.class);
final String msg = new JSONCodec().dec().inflate().from(result.value).get(String.class);
trace("Exception during agent communication: " + msg);
throw new RuntimeException(msg);
}
if (type == byte[].class) {
return (T) result.value;
}
return (T) new JSONCodec().dec().from(result.value).get(type);
return (T) new JSONCodec().dec().inflate().from(result.value).get(type);
}
long elapsedInNanos = System.nanoTime() - startInNanos;
long delayInMillis = deadlineInMillis - TimeUnit.NANOSECONDS.toMillis(elapsedInNanos);
Expand Down Expand Up @@ -318,7 +318,8 @@ private void executeCommand(final String cmd, final int id, final List<byte[]> a
if (type == byte[].class) {
parameters[i] = args.get(i);
} else {
parameters[i] = new JSONCodec().dec().from(args.get(i)).get(m.getGenericParameterTypes()[i]);
parameters[i] = new JSONCodec().dec().inflate().from(args.get(i))
.get(m.getGenericParameterTypes()[i]);
}
}
try {
Expand Down Expand Up @@ -355,7 +356,7 @@ private RpcMessage msg(final int msgId, final Method method, final Object[] args
argValue = (byte[]) arg;
} else {
final ByteArrayOutputStream bout = new ByteArrayOutputStream();
new JSONCodec().enc().to(bout).put(arg);
new JSONCodec().enc().deflate().to(bout).put(arg);
argValue = bout.toByteArray();
}
final String encodedValue = Base64.getEncoder().encodeToString(argValue);
Expand Down

0 comments on commit 4bf80c7

Please sign in to comment.