Skip to content

Commit

Permalink
Fix issues with voyager (#5034)
Browse files Browse the repository at this point in the history
* Fix issues with voyager

* Use side inputs
  • Loading branch information
RustedBones authored Oct 19, 2023
1 parent e9d6135 commit ee2b08d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ final case class VoyagerUri(value: URI) extends AnyVal {
}

object VoyagerUri {
def apply(value: String): VoyagerUri = new VoyagerUri(new URI(value))
def apply(value: String): VoyagerUri =
new VoyagerUri(URI.create(value.stripSuffix("/") + "/"))

private[voyager] val IndexFile = "index.hnsw"
private[voyager] val NamesFile = "names.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ class VoyagerPairSCollectionOps(
require(!uri.exists, s"Voyager URI ${uri.value} already exists")

self.transform { in =>
in.reifyAsIterableInGlobalWindow
.map { xs =>
val vectors = in.asIterableSideInput
self.context
.parallelize(Seq((): Unit))
.withSideInputs(vectors)
.map { case (_, ctx) =>
val indexUri = uri.value.resolve(VoyagerUri.IndexFile)
val namesUri = uri.value.resolve(VoyagerUri.NamesFile)
val isLocal = ScioUtil.isLocalUri(uri.value)
Expand All @@ -115,6 +118,7 @@ class VoyagerPairSCollectionOps(
(tmpIndex, tmpNames)
}

val xs = ctx(vectors)
val writer =
new VoyagerWriter(localIndex, localNames, spaceType, storageDataType, dim, ef, m)
writer.write(xs)
Expand All @@ -126,6 +130,7 @@ class VoyagerPairSCollectionOps(

uri
}
.toSCollection
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ class VoyagerTest extends PipelineSpec {
} should have message s"requirement failed: Voyager URI ${uri.value} already exists"
}

"VoyagerUri" should "not use Kryo" in {
val uri = VoyagerUri("gs://this-that")
"VoyagerUri" should "normalize uri as directory" in {
VoyagerUri("gs://this-that").value.toString shouldBe "gs://this-that/"
}

it should "not use Kryo" in {
val uri = VoyagerUri("gs://this-that/")
uri coderShould notFallback()
}
}

0 comments on commit ee2b08d

Please sign in to comment.