From 6974ea61d1b7174dc1f06bc09027e2712bddff2d Mon Sep 17 00:00:00 2001 From: Neville Li Date: Thu, 23 Jun 2016 14:12:53 -0400 Subject: [PATCH] make KryoAtomicCoder thread safe --- .../spotify/scio/coders/KryoAtomicCoder.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/coders/KryoAtomicCoder.scala b/scio-core/src/main/scala/com/spotify/scio/coders/KryoAtomicCoder.scala index e026107d87..314266aad2 100644 --- a/scio-core/src/main/scala/com/spotify/scio/coders/KryoAtomicCoder.scala +++ b/scio-core/src/main/scala/com/spotify/scio/coders/KryoAtomicCoder.scala @@ -56,12 +56,16 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] { } if (context.isWholeStream) { val output = new Output(outStream) - kryo.writeClassAndObject(output, value) + kryo.synchronized { + kryo.writeClassAndObject(output, value) + } output.flush() } else { val s = new ByteArrayOutputStream() val output = new Output(s) - kryo.writeClassAndObject(output, value) + kryo.synchronized { + kryo.writeClassAndObject(output, value) + } output.flush() s.close() @@ -72,7 +76,9 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] { override def decode(inStream: InputStream, context: Context): T = { val o = if (context.isWholeStream) { - kryo.readClassAndObject(new Input(inStream)) + kryo.synchronized { + kryo.readClassAndObject(new Input(inStream)) + } } else { val length = VarInt.decodeInt(inStream) if (length < 0) { @@ -81,7 +87,9 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] { val value = Array.ofDim[Byte](length) ByteStreams.readFully(inStream, value) - kryo.readClassAndObject(new Input(value)) + kryo.synchronized { + kryo.readClassAndObject(new Input(value)) + } } o.asInstanceOf[T] }