Skip to content

Commit

Permalink
make KryoAtomicCoder thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Jun 23, 2016
1 parent b21eddc commit 6974ea6
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] {
}
if (context.isWholeStream) {
val output = new Output(outStream)
kryo.writeClassAndObject(output, value)
kryo.synchronized {
kryo.writeClassAndObject(output, value)
}
output.flush()
} else {
val s = new ByteArrayOutputStream()
val output = new Output(s)
kryo.writeClassAndObject(output, value)
kryo.synchronized {
kryo.writeClassAndObject(output, value)
}
output.flush()
s.close()

Expand All @@ -72,7 +76,9 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] {

override def decode(inStream: InputStream, context: Context): T = {
val o = if (context.isWholeStream) {
kryo.readClassAndObject(new Input(inStream))
kryo.synchronized {
kryo.readClassAndObject(new Input(inStream))
}
} else {
val length = VarInt.decodeInt(inStream)
if (length < 0) {
Expand All @@ -81,7 +87,9 @@ private[scio] class KryoAtomicCoder[T] extends AtomicCoder[T] {

val value = Array.ofDim[Byte](length)
ByteStreams.readFully(inStream, value)
kryo.readClassAndObject(new Input(value))
kryo.synchronized {
kryo.readClassAndObject(new Input(value))
}
}
o.asInstanceOf[T]
}
Expand Down

0 comments on commit 6974ea6

Please sign in to comment.