-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Voyager support in Scio #4996
Voyager support in Scio #4996
Conversation
Codecov Report
@@ Coverage Diff @@
## main #4996 +/- ##
==========================================
+ Coverage 63.12% 63.15% +0.03%
==========================================
Files 283 286 +3
Lines 10659 10739 +80
Branches 765 767 +2
==========================================
+ Hits 6728 6782 +54
- Misses 3931 3957 +26
|
Thanks @patrickwmcgee . I just converted the PR to draft until this gets ready. Will have a look at it ASAP |
* the `index.hnsw` and `names.json` are. | ||
*/ | ||
trait VoyagerUri extends Serializable { | ||
val logger = LoggerFactory.getLogger(this.getClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer logger declaration in the companion object, with @transient lazy val
to avoid serialization
*/ | ||
trait VoyagerUri extends Serializable { | ||
val logger = LoggerFactory.getLogger(this.getClass) | ||
val path: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer def
in traits
} | ||
|
||
def files: Seq[String] = Seq("index.hnsw", "names.json") | ||
implicit val voyagerUriCoder: Coder[VoyagerUri] = Coder.kryo[VoyagerUri] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could probably do better than kryo
if VoyagerUri
is a sealed trait
and implementations case classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious if you could follow up on this with more of an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure! basically, Scio can leverage Magnolia to automatically derive compile-time Coders for Scala ADTs: case classes, and sealed traits (sealed
is important so that Magnolia knows in advance every possibility it can expect when it decodes an instance of the trait). These types of coders are much more efficient than Kryo, which basically traverses the class fields at runtime.
So, you could have a setup like this:
sealed trait VoyagerUri {
def path: String
...
}
case class LocalVoyagerUri(path: String) extends VoyagerUri {
override def....
}
case class RemoteVoyagerUri(path: String, private rfu: RemoteFileUtil) extends VoyagerUri {
override def ...
}
object RemoteVoyagerUri {
def apply(path: String, options: PipelineOptions): RemoteVoyagerUri =
RemoteVoyagerUri(path, RemoteFileUtil.create(options))
}
Then you can remove the implicit Kryo coder, and it should just work ™️ .
We even have a specialized type of unit test assertion for checking that your Coder doesn't fall back to kryo:
import com.spotify.scio.testing.CoderAssertions._
class VoyagerTest extends PipelineSpec {
"VoyagerUri" should "not use Kryo" in {
val localUri: VoyagerUri = LocalVoyagerUri("blah")
localUri coderShould notFallback()
val remoteUri: VoyagerUri = RemoteVoyagerUri("blah", PipelineOptionsFactory.create())
remoteUri coderShould notFallback()
}
}
(more examples in CoderTest).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this so we do not have ADT. VoyagerUri
is a simple value-class around a normal URI. the RemoteFileUtil
can be passed as implicit paramer to exist
The other functions should not de defined into the model. Moved them to the side-input
or the dedicated asVoyager
SCOllection operation
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/VoyagerUri.scala
Outdated
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/VoyagerUri.scala
Outdated
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/package.scala
Outdated
Show resolved
Hide resolved
You need to run |
aeb4660
to
e118a6b
Compare
b0ab4ae
to
95b6614
Compare
} | ||
|
||
override private[voyager] def saveAndClose(w: VoyagerWriter): Unit = { | ||
val tempPath: Path = Files.createTempDirectory("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val tempPath: Path = Files.createTempDirectory("") | |
val tempPath: Path = Files.createTempDirectory("voyager-") |
|
||
override private[voyager] def saveAndClose(w: VoyagerWriter): Unit = { | ||
val tempPath: Path = Files.createTempDirectory("") | ||
logger.info(s"temp path: $tempPath") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.info(s"temp path: $tempPath") |
I would move this to line 106, so we can log it as "Uploaded Voyager file from $tempPath to $path/$f" 👍
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/package.scala
Outdated
Show resolved
Hide resolved
): SCollection[VoyagerUri] = { | ||
val uuid: UUID = UUID.randomUUID() | ||
val tempLocation: String = self.context.options.getTempLocation | ||
require(tempLocation != null, s"--tempLocation arg is required") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require(tempLocation != null, s"--tempLocation arg is required") | |
require(tempLocation != null, s"Voyager writes require --tempLocation to be set.") |
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/package.scala
Outdated
Show resolved
Hide resolved
) extends SideInput[VoyagerReader] { | ||
override def get[I, O](context: DoFn[I, O]#ProcessContext): VoyagerReader = { | ||
val uri = context.sideInput(view) | ||
VOYAGER_URI_MAP.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain the synchronized map use case a bit? I guess it would be if a user tried to load two Voyager SideInputs w/ the same URI at different nodes of the job graph... is loading the reader a really expensive operation itself, like does it load everything into memory at construction time, or is it just a pointer to a URI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the loading is really expensive and having multiple nodes / threads try to create a reader concurrently will cause the job to OOM. This sort of alleviates that and lets it be a singleton of uri -> reader at least for constructing and loading it into memory.
…ge.scala Co-authored-by: Claire McGinty <claire.d.mcginty@gmail.com>
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/VoyagerUri.scala
Outdated
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/package.scala
Outdated
Show resolved
Hide resolved
scio-core/src/main/java/com/spotify/scio/util/RemoteFileUtil.java
Outdated
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/syntax/AllSyntax.scala
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/syntax/SCollectionSyntax.scala
Show resolved
Hide resolved
scio-extra/src/main/scala/com/spotify/scio/extra/voyager/syntax/ScioContextSyntax.scala
Show resolved
Hide resolved
Took the freedom to refactor the PR to avoid making the same design mistakes which were done in the Annoy module |
This adds in support for Voyager to Scio, the api is modeled after the existing
Annoy
implementation since they're both Nearest Neighbor lookups.The integration tests are still a work in progress as I couldn't get them to run locally and will iterate on them here (since CI should be able to properly run them with permissions) but the rest of the code should have basic unit tests and I've done integration style tests in dataflow to