Skip to content

Commit

Permalink
Use side inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Oct 19, 2023
1 parent e543d90 commit 388f1fa
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,11 @@ class VoyagerPairSCollectionOps(
require(!uri.exists, s"Voyager URI ${uri.value} already exists")

self.transform { in =>
// Do not use reifyAsIterableInGlobalWindow
// as iterableCoder may materialize the full collection
val vectors = in.asIterableSideInput
self.context
.parallelize(Seq((): Unit))
.reifySideInputAsValues(in.asIterableSideInput)
.map { case (_, xs) =>
.withSideInputs(vectors)
.map { case (_, ctx) =>
val indexUri = uri.value.resolve(VoyagerUri.IndexFile)
val namesUri = uri.value.resolve(VoyagerUri.NamesFile)
val isLocal = ScioUtil.isLocalUri(uri.value)
Expand All @@ -119,6 +118,7 @@ class VoyagerPairSCollectionOps(
(tmpIndex, tmpNames)
}

val xs = ctx(vectors)
val writer =
new VoyagerWriter(localIndex, localNames, spaceType, storageDataType, dim, ef, m)
writer.write(xs)
Expand All @@ -130,6 +130,7 @@ class VoyagerPairSCollectionOps(

uri
}
.toSCollection
}
}

Expand Down

0 comments on commit 388f1fa

Please sign in to comment.