From 0931d1b27591dcf869af27e2d3cd6c167ae0a9dc Mon Sep 17 00:00:00 2001 From: magodo Date: Thu, 7 Sep 2023 20:45:36 +0800 Subject: [PATCH 01/12] Support shared worker --- internal/connect_event.go | 46 +++++ {worker => internal}/message_event.go | 6 +- {worker => internal}/message_port.go | 21 +-- internal/nonblock.go | 12 ++ internal/shared_worker_global_scope.go | 59 +++++++ sharedworker/self.go | 55 ++++++ sharedworker/self_test.go | 37 ++++ sharedworker/shared_worker.go | 100 +++++++++++ sharedworker/shared_worker_test.go | 228 +++++++++++++++++++++++++ worker/self.go | 7 +- worker/worker.go | 11 +- 11 files changed, 559 insertions(+), 23 deletions(-) create mode 100644 internal/connect_event.go rename {worker => internal}/message_event.go (90%) rename {worker => internal}/message_port.go (81%) create mode 100644 internal/nonblock.go create mode 100644 internal/shared_worker_global_scope.go create mode 100644 sharedworker/self.go create mode 100644 sharedworker/self_test.go create mode 100644 sharedworker/shared_worker.go create mode 100644 sharedworker/shared_worker_test.go diff --git a/internal/connect_event.go b/internal/connect_event.go new file mode 100644 index 0000000..a8988ba --- /dev/null +++ b/internal/connect_event.go @@ -0,0 +1,46 @@ +//go:build js && wasm + +package internal + +import ( + "github.com/hack-pad/safejs" + "github.com/pkg/errors" +) + +// ConnectEvent is received from the channel returned by sharedworker.GlobalSelf's Listen(). +// Represents a JS MessageEvent for the connect event. +type ConnectEvent struct { + err error + ports []*MessagePort +} + +// Ports returns this event's ports or a parse error +func (e ConnectEvent) Ports() ([]*MessagePort, error) { + return e.ports, errors.Wrapf(e.err, "failed to parse ConnectEvent") +} + +func parseConnectEvent(v safejs.Value) ConnectEvent { + ports, err := v.Get("ports") + if err != nil { + return ConnectEvent{err: err} + } + portsLen, err := ports.Length() + if err != nil { + return ConnectEvent{err: err} + } + var msgports []*MessagePort + for i := 0; i < portsLen; i++ { + port, err := ports.Index(i) + if err != nil { + return ConnectEvent{err: err} + } + msgport, err := WrapMessagePort(port) + if err != nil { + return ConnectEvent{err: err} + } + msgports = append(msgports, msgport) + } + return ConnectEvent{ + ports: msgports, + } +} diff --git a/worker/message_event.go b/internal/message_event.go similarity index 90% rename from worker/message_event.go rename to internal/message_event.go index 44adf70..571e4f6 100644 --- a/worker/message_event.go +++ b/internal/message_event.go @@ -1,6 +1,6 @@ //go:build js && wasm -package worker +package internal import ( "github.com/hack-pad/safejs" @@ -12,7 +12,7 @@ import ( type MessageEvent struct { data safejs.Value err error - target *messagePort + target *MessagePort } // Data returns this event's data or a parse error @@ -25,7 +25,7 @@ func parseMessageEvent(v safejs.Value) MessageEvent { if err != nil { return MessageEvent{err: err} } - target, err := wrapMessagePort(value) + target, err := WrapMessagePort(value) if err != nil { return MessageEvent{err: err} } diff --git a/worker/message_port.go b/internal/message_port.go similarity index 81% rename from worker/message_port.go rename to internal/message_port.go index 8fb0e56..4a985e4 100644 --- a/worker/message_port.go +++ b/internal/message_port.go @@ -1,19 +1,18 @@ //go:build js && wasm -package worker +package internal import ( "context" "fmt" - "github.com/hack-pad/safejs" ) -type messagePort struct { +type MessagePort struct { jsMessagePort safejs.Value } -func wrapMessagePort(v safejs.Value) (*messagePort, error) { +func WrapMessagePort(v safejs.Value) (*MessagePort, error) { someMethod, err := v.Get("postMessage") if err != nil { return nil, err @@ -21,10 +20,10 @@ func wrapMessagePort(v safejs.Value) (*messagePort, error) { if truthy, err := someMethod.Truthy(); err != nil || !truthy { return nil, fmt.Errorf("invalid MessagePort value: postMessage is not a function") } - return &messagePort{v}, nil + return &MessagePort{v}, nil } -func (p *messagePort) PostMessage(data safejs.Value, transfers []safejs.Value) error { +func (p *MessagePort) PostMessage(data safejs.Value, transfers []safejs.Value) error { args := append([]any{data}, toJSSlice(transfers)) _, err := p.jsMessagePort.Call("postMessage", args...) return err @@ -38,7 +37,7 @@ func toJSSlice[Type any](slice []Type) []any { return newSlice } -func (p *messagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err error) { +func (p *MessagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err error) { ctx, cancel := context.WithCancel(ctx) defer func() { if err != nil { @@ -90,9 +89,7 @@ func (p *messagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err er return events, nil } -func nonBlocking(fn func(args []safejs.Value)) (safejs.Func, error) { - return safejs.FuncOf(func(_ safejs.Value, args []safejs.Value) any { - go fn(args) - return nil - }) +func (p *MessagePort) Close() error { + _, err := p.jsMessagePort.Call("close") + return err } diff --git a/internal/nonblock.go b/internal/nonblock.go new file mode 100644 index 0000000..94d063f --- /dev/null +++ b/internal/nonblock.go @@ -0,0 +1,12 @@ +//go:build js && wasm + +package internal + +import "github.com/hack-pad/safejs" + +func nonBlocking(fn func(args []safejs.Value)) (safejs.Func, error) { + return safejs.FuncOf(func(_ safejs.Value, args []safejs.Value) any { + go fn(args) + return nil + }) +} diff --git a/internal/shared_worker_global_scope.go b/internal/shared_worker_global_scope.go new file mode 100644 index 0000000..dcfa9d3 --- /dev/null +++ b/internal/shared_worker_global_scope.go @@ -0,0 +1,59 @@ +//go:build js && wasm + +package internal + +import ( + "context" + "fmt" + "github.com/hack-pad/safejs" +) + +type SharedWorkerGlobalScope struct { + self safejs.Value +} + +func WrapSharedWorkerGlobalScope(v safejs.Value) (*SharedWorkerGlobalScope, error) { + someMethod, err := v.Get("onconnect") + if err != nil { + return nil, err + } + if truthy, err := someMethod.Truthy(); err != nil || !truthy { + return nil, fmt.Errorf("invalid SharedWorkerGlobalScope value: onconnect is not a function") + } + return &SharedWorkerGlobalScope{v}, nil +} + +func (p *SharedWorkerGlobalScope) Listen(ctx context.Context) (_ <-chan ConnectEvent, err error) { + ctx, cancel := context.WithCancel(ctx) + defer func() { + if err != nil { + cancel() + } + }() + + events := make(chan ConnectEvent) + connectHandler, err := nonBlocking(func(args []safejs.Value) { + events <- parseConnectEvent(args[0]) + }) + if err != nil { + return nil, err + } + go func() { + <-ctx.Done() + _, err := p.self.Call("removeEventListener", "connect", connectHandler) + if err == nil { + connectHandler.Release() + } + close(events) + }() + _, err = p.self.Call("addEventListener", "connect", connectHandler) + if err != nil { + return nil, err + } + return events, nil +} + +func (p *SharedWorkerGlobalScope) Close() error { + _, err := p.self.Call("close") + return err +} diff --git a/sharedworker/self.go b/sharedworker/self.go new file mode 100644 index 0000000..9b698cd --- /dev/null +++ b/sharedworker/self.go @@ -0,0 +1,55 @@ +//go:build js && wasm + +package sharedworker + +import ( + "context" + "github.com/hack-pad/go-webworkers/internal" + + "github.com/hack-pad/safejs" +) + +// GlobalSelf represents the global scope, named "self", in the context of using SharedWorkers. +// Supports receiving connection via Listen(), where each of the ConnectEvent has Ports() whose +// first element represents the MessagePort connected with the channel with its parent, +// which in turns support receiving message via its Listen() and PostMessage(). +type GlobalSelf struct { + self safejs.Value + scope *internal.SharedWorkerGlobalScope +} + +// Self returns the global "self" +func Self() (*GlobalSelf, error) { + self, err := safejs.Global().Get("self") + if err != nil { + return nil, err + } + scope, err := internal.WrapSharedWorkerGlobalScope(self) + if err != nil { + return nil, err + } + return &GlobalSelf{ + self: self, + scope: scope, + }, nil +} + +// Listen sends connect events on a channel for events fired by connection calls to this worker from within the parent scope. +// Stops the listener and closes the channel when ctx is canceled. +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.ConnectEvent, error) { + return s.scope.Listen(ctx) +} + +// Close discards any tasks queued in the global scope's event loop, effectively closing this particular scope. +func (s *GlobalSelf) Close() error { + return s.scope.Close() +} + +// Name returns the name that the Worker was (optionally) given when it was created. +func (s *GlobalSelf) Name() (string, error) { + name, err := s.self.Get("name") + if err != nil { + return "", err + } + return name.String() +} diff --git a/sharedworker/self_test.go b/sharedworker/self_test.go new file mode 100644 index 0000000..1665611 --- /dev/null +++ b/sharedworker/self_test.go @@ -0,0 +1,37 @@ +//go:build js && wasm + +package sharedworker + +import ( + "testing" + + "github.com/hack-pad/safejs" +) + +func TestSelf(t *testing.T) { + t.Skip("This test case only runs inside a worker") + t.Parallel() + self, err := Self() + if err != nil { + t.Fatal(err) + } + if !self.self.Equal(safejs.MustGetGlobal("self")) { + t.Error("self is not equal to the global self") + } +} + +func TestSelfName(t *testing.T) { + t.Skip("This test case only runs inside a worker") + t.Parallel() + self, err := Self() + if err != nil { + t.Fatal(err) + } + name, err := self.Name() + if err != nil { + t.Fatal(err) + } + if name != "" { + t.Errorf("Expected %q, got %q", "", name) + } +} diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go new file mode 100644 index 0000000..4b30e0e --- /dev/null +++ b/sharedworker/shared_worker.go @@ -0,0 +1,100 @@ +//go:build js && wasm + +// Package sharedworker provides a Shared Web Workers driver for Go code compiled to WebAssembly. +package sharedworker + +import ( + "context" + "github.com/hack-pad/go-webworkers/internal" + + "github.com/hack-pad/safejs" +) + +var ( + jsWorker = safejs.MustGetGlobal("SharedWorker") + jsURL = safejs.MustGetGlobal("URL") + jsBlob = safejs.MustGetGlobal("Blob") +) + +// SharedWorker is a Shared Web Worker, which represents a background task created via a script. +// Use Listen() and PostMessage() to communicate with the worker. +type SharedWorker struct { + url string + name string + worker safejs.Value + msgport *internal.MessagePort +} + +// New starts a worker with the given script's URL and name +func New(url, name string) (*SharedWorker, error) { + worker, err := jsWorker.New(url, name) + if err != nil { + return nil, err + } + port, err := worker.Get("port") + if err != nil { + return nil, err + } + msgport, err := internal.WrapMessagePort(port) + if err != nil { + return nil, err + } + return &SharedWorker{ + url: url, + name: name, + msgport: msgport, + worker: worker, + }, nil +} + +// NewFromScript is like New, but starts the worker with the given script (in JavaScript) +func NewFromScript(jsScript, name string) (*SharedWorker, error) { + blob, err := jsBlob.New([]any{jsScript}, map[string]any{ + "type": "text/javascript", + }) + if err != nil { + return nil, err + } + objectURL, err := jsURL.Call("createObjectURL", blob) + if err != nil { + return nil, err + } + objectURLStr, err := objectURL.String() + if err != nil { + return nil, err + } + return New(objectURLStr, name) +} + +// URL returns the script URL of the worker +func (w *SharedWorker) URL() string { + return w.url +} + +// Name returns the name of the worker +func (w *SharedWorker) Name() string { + return w.name +} + +// PostMessage sends data in a message to the worker, optionally transferring ownership of all items in transfers. +// +// The data may be any value handled by the "structured clone algorithm", which includes cyclical references. +// +// Transfers is an optional array of Transferable objects to transfer ownership of. +// If the ownership of an object is transferred, it becomes unusable in the context it was sent from and becomes available only to the worker it was sent to. +// Transferable objects are instances of classes like ArrayBuffer, MessagePort or ImageBitmap objects that can be transferred. +// null is not an acceptable value for transfer. +func (w *SharedWorker) PostMessage(data safejs.Value, transfers []safejs.Value) error { + return w.msgport.PostMessage(data, transfers) +} + +// Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. +// Stops the listener and closes the channel when ctx is canceled. +func (w *SharedWorker) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { + return w.msgport.Listen(ctx) +} + +// Close disconnects the port to the worker, so it is no longer active. This stops the flow of messages to that port. +func (w *SharedWorker) Close() error { + return w.msgport.Close() +} diff --git a/sharedworker/shared_worker_test.go b/sharedworker/shared_worker_test.go new file mode 100644 index 0000000..edf2db9 --- /dev/null +++ b/sharedworker/shared_worker_test.go @@ -0,0 +1,228 @@ +//go:build js && wasm + +package sharedworker + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hack-pad/safejs" +) + +var ( + jsJSON = safejs.MustGetGlobal("JSON") + jsUint8Array = safejs.MustGetGlobal("Uint8Array") +) + +func makeBlobURL(t *testing.T, contents []byte, contentType string) string { + t.Helper() + jsContents, err := jsUint8Array.New(len(contents)) + if err != nil { + t.Fatal(err) + } + _, err = safejs.CopyBytesToJS(jsContents, contents) + if err != nil { + t.Fatal(err) + } + blob, err := jsBlob.New([]any{jsContents}, map[string]any{ + "type": contentType, + }) + if err != nil { + t.Fatal(err) + } + url, err := jsURL.Call("createObjectURL", blob) + if err != nil { + t.Fatal(err) + } + urlString, err := url.String() + if err != nil { + t.Fatal(err) + } + return urlString +} + +func TestNew(t *testing.T) { + t.Parallel() + const messageText = "Hello, world!" + blobURL := makeBlobURL(t, []byte(fmt.Sprintf(`"use strict"; +onconnect = (e) => { + const port = e.ports[0]; + port.postMessage(self.name + ": " + %q); +}; +`, messageText)), "text/javascript") + workerName := "worker" + worker, err := New(blobURL, workerName) + if err != nil { + t.Fatal(err) + } + + if worker.URL() != blobURL { + t.Fatalf("url expect=%q, got=%q", blobURL, worker.URL()) + } + + if worker.Name() != workerName { + t.Fatalf("url expect=%q, got=%q", workerName, worker.Name()) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Fatal(err) + } + if msg := workerName + ": " + messageText; dataStr != msg { + t.Errorf("Expected %q, got %q", msg, dataStr) + } +} + +func TestNewFromScript(t *testing.T) { + t.Parallel() + const messageText = "Hello, world!" + script := fmt.Sprintf(` +"use strict"; + +onconnect = (e) => { + const port = e.ports[0]; + port.postMessage(self.name + ": " + %q); +}; +`, messageText) + workerName := "worker" + worker, err := NewFromScript(script, workerName) + if err != nil { + t.Fatal(err) + } + if worker.URL() == "" { + t.Fatal("url unexpect to be empty") + } + + if worker.Name() != workerName { + t.Fatalf("url expect=%q, got=%q", workerName, worker.Name()) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Fatal(err) + } + if msg := workerName + ": " + messageText; dataStr != msg { + t.Errorf("Expected %q, got %q", msg, dataStr) + } +} + +func TestWorkerClose(t *testing.T) { + t.Parallel() + worker, err := NewFromScript(` +"use strict"; + +onconnect = (e) => { + const port = e.ports[0]; + port.postMessage("start"); + self.setTimeout(() => post.postMessage("done waiting"), 200); +}; +`, "") + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Error(err) + } + if dataStr != "start" { + t.Fatalf("Expected worker to send 'start', got %s", dataStr) + } + + err = worker.Close() + if err != nil { + t.Fatal(err) + } + + select { + case message := <-messages: + t.Errorf("Should not receive the delayed message on a terminated worker, got: %v", message) + case <-time.After(400 * time.Millisecond): + } +} + +func TestWorkerPostMessage(t *testing.T) { + t.Parallel() + const pingPongScript = ` +"use strict"; + +onconnect = (e) => { + const port = e.ports[0]; + port.onmessage = (event) => { + port.postMessage(event.data + " pong!"); + }; +}; +` + pingMessage, err := safejs.ValueOf("ping!") + if err != nil { + t.Fatal(err) + } + + t.Run("listen before post", func(t *testing.T) { + t.Parallel() + worker, err := NewFromScript(pingPongScript, "") + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + messages, err := worker.Listen(ctx) + if err != nil { + t.Fatal(err) + } + + err = worker.PostMessage(pingMessage, nil) + if err != nil { + t.Fatal(err) + } + + message := <-messages + data, err := message.Data() + if err != nil { + t.Fatal(err) + } + dataStr, err := data.String() + if err != nil { + t.Error(err) + } + expectedResponse := "ping! pong!" + if dataStr != expectedResponse { + t.Errorf("Expected response %q, got: %q", expectedResponse, dataStr) + } + }) +} diff --git a/worker/self.go b/worker/self.go index cd773cd..28dab7b 100644 --- a/worker/self.go +++ b/worker/self.go @@ -4,6 +4,7 @@ package worker import ( "context" + "github.com/hack-pad/go-webworkers/internal" "github.com/hack-pad/safejs" ) @@ -12,7 +13,7 @@ import ( // Supports sending and receiving messages via PostMessage() and Listen(). type GlobalSelf struct { self safejs.Value - port *messagePort + port *internal.MessagePort } // Self returns the global "self" @@ -21,7 +22,7 @@ func Self() (*GlobalSelf, error) { if err != nil { return nil, err } - port, err := wrapMessagePort(self) + port, err := internal.WrapMessagePort(self) if err != nil { return nil, err } @@ -46,7 +47,7 @@ func (s *GlobalSelf) PostMessage(message safejs.Value, transfers []safejs.Value) // Listen sends message events on a channel for events fired by worker.postMessage() calls inside the main thread's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan MessageEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { return s.port.Listen(ctx) } diff --git a/worker/worker.go b/worker/worker.go index 17ec9b0..be298b1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,10 +1,11 @@ //go:build js && wasm -// Package worker provides a Web Workers driver for Go code compiled to WebAssembly. +// Package worker provides a Dedicated Web Workers driver for Go code compiled to WebAssembly. package worker import ( "context" + "github.com/hack-pad/go-webworkers/internal" "github.com/hack-pad/safejs" ) @@ -15,11 +16,11 @@ var ( jsBlob = safejs.MustGetGlobal("Blob") ) -// Worker is a Web Worker, which represents a background task created via a script. +// Worker is a Dedicaetd Web Worker, which represents a background task created via a script. // Use Listen() and PostMessage() to communicate with the worker. type Worker struct { worker safejs.Value - port *messagePort + port *internal.MessagePort } // Options contains optional configuration for new Workers @@ -46,7 +47,7 @@ func New(url string, options Options) (*Worker, error) { if err != nil { return nil, err } - port, err := wrapMessagePort(worker) + port, err := internal.WrapMessagePort(worker) if err != nil { return nil, err } @@ -96,6 +97,6 @@ func (w *Worker) PostMessage(data safejs.Value, transfers []safejs.Value) error // Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (w *Worker) Listen(ctx context.Context) (<-chan MessageEvent, error) { +func (w *Worker) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { return w.port.Listen(ctx) } From ede2dd47a2e9b788813112b62b5aa569b7057eeb Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 08:43:47 +0800 Subject: [PATCH 02/12] Not expose the controller `Stop()` method --- internal/message_port.go | 6 +--- sharedworker/shared_worker.go | 6 +--- sharedworker/shared_worker_test.go | 46 ------------------------------ 3 files changed, 2 insertions(+), 56 deletions(-) diff --git a/internal/message_port.go b/internal/message_port.go index 4a985e4..67e556e 100644 --- a/internal/message_port.go +++ b/internal/message_port.go @@ -5,6 +5,7 @@ package internal import ( "context" "fmt" + "github.com/hack-pad/safejs" ) @@ -88,8 +89,3 @@ func (p *MessagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err er } return events, nil } - -func (p *MessagePort) Close() error { - _, err := p.jsMessagePort.Call("close") - return err -} diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index 4b30e0e..76795f5 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -5,6 +5,7 @@ package sharedworker import ( "context" + "github.com/hack-pad/go-webworkers/internal" "github.com/hack-pad/safejs" @@ -93,8 +94,3 @@ func (w *SharedWorker) PostMessage(data safejs.Value, transfers []safejs.Value) func (w *SharedWorker) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { return w.msgport.Listen(ctx) } - -// Close disconnects the port to the worker, so it is no longer active. This stops the flow of messages to that port. -func (w *SharedWorker) Close() error { - return w.msgport.Close() -} diff --git a/sharedworker/shared_worker_test.go b/sharedworker/shared_worker_test.go index edf2db9..27c1334 100644 --- a/sharedworker/shared_worker_test.go +++ b/sharedworker/shared_worker_test.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/hack-pad/safejs" ) @@ -130,51 +129,6 @@ onconnect = (e) => { } } -func TestWorkerClose(t *testing.T) { - t.Parallel() - worker, err := NewFromScript(` -"use strict"; - -onconnect = (e) => { - const port = e.ports[0]; - port.postMessage("start"); - self.setTimeout(() => post.postMessage("done waiting"), 200); -}; -`, "") - if err != nil { - t.Fatal(err) - } - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - messages, err := worker.Listen(ctx) - if err != nil { - t.Fatal(err) - } - message := <-messages - data, err := message.Data() - if err != nil { - t.Fatal(err) - } - dataStr, err := data.String() - if err != nil { - t.Error(err) - } - if dataStr != "start" { - t.Fatalf("Expected worker to send 'start', got %s", dataStr) - } - - err = worker.Close() - if err != nil { - t.Fatal(err) - } - - select { - case message := <-messages: - t.Errorf("Should not receive the delayed message on a terminated worker, got: %v", message) - case <-time.After(400 * time.Millisecond): - } -} - func TestWorkerPostMessage(t *testing.T) { t.Parallel() const pingPongScript = ` From b6f434ad0d45acff5d0937b445a6ea276b2ce1e2 Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 10:08:34 +0800 Subject: [PATCH 03/12] Stop creating a abstract connect event --- internal/connect_event.go | 46 -------------------- internal/listen_message_event.go | 60 ++++++++++++++++++++++++++ internal/message_event.go | 27 ++++++++++++ internal/message_port.go | 41 ++---------------- internal/nonblock.go | 12 ------ internal/shared_worker_global_scope.go | 29 +++---------- sharedworker/self.go | 7 ++- 7 files changed, 100 insertions(+), 122 deletions(-) delete mode 100644 internal/connect_event.go create mode 100644 internal/listen_message_event.go delete mode 100644 internal/nonblock.go diff --git a/internal/connect_event.go b/internal/connect_event.go deleted file mode 100644 index a8988ba..0000000 --- a/internal/connect_event.go +++ /dev/null @@ -1,46 +0,0 @@ -//go:build js && wasm - -package internal - -import ( - "github.com/hack-pad/safejs" - "github.com/pkg/errors" -) - -// ConnectEvent is received from the channel returned by sharedworker.GlobalSelf's Listen(). -// Represents a JS MessageEvent for the connect event. -type ConnectEvent struct { - err error - ports []*MessagePort -} - -// Ports returns this event's ports or a parse error -func (e ConnectEvent) Ports() ([]*MessagePort, error) { - return e.ports, errors.Wrapf(e.err, "failed to parse ConnectEvent") -} - -func parseConnectEvent(v safejs.Value) ConnectEvent { - ports, err := v.Get("ports") - if err != nil { - return ConnectEvent{err: err} - } - portsLen, err := ports.Length() - if err != nil { - return ConnectEvent{err: err} - } - var msgports []*MessagePort - for i := 0; i < portsLen; i++ { - port, err := ports.Index(i) - if err != nil { - return ConnectEvent{err: err} - } - msgport, err := WrapMessagePort(port) - if err != nil { - return ConnectEvent{err: err} - } - msgports = append(msgports, msgport) - } - return ConnectEvent{ - ports: msgports, - } -} diff --git a/internal/listen_message_event.go b/internal/listen_message_event.go new file mode 100644 index 0000000..4054da4 --- /dev/null +++ b/internal/listen_message_event.go @@ -0,0 +1,60 @@ +package internal + +import ( + "context" + + "github.com/hack-pad/safejs" +) + +// listen adds the EventListener on the listener for the specified events. +// It returns a channel, which will send the MessageEvent(s) listened on, until the ctx is canceled. +func listen(ctx context.Context, listener safejs.Value, events ...string) (_ <-chan MessageEvent, err error) { + ctx, cancel := context.WithCancel(ctx) + defer func() { + if err != nil { + cancel() + } + }() + + eventsCh := make(chan MessageEvent) + + var handlers []safejs.Func + for range events { + handler, err := nonBlocking(func(args []safejs.Value) { + eventsCh <- parseMessageEvent(args[0]) + }) + if err != nil { + return nil, err + } + handlers = append(handlers, handler) + } + + go func() { + <-ctx.Done() + for i := range events { + event, handler := events[i], handlers[i] + _, err := listener.Call("removeEventListener", event, handler) + if err == nil { + handler.Release() + } + } + close(eventsCh) + }() + + for i := range events { + event, handler := events[i], handlers[i] + _, err = listener.Call("addEventListener", event, handler) + if err != nil { + return nil, err + } + } + + return eventsCh, nil +} + +func nonBlocking(fn func(args []safejs.Value)) (safejs.Func, error) { + return safejs.FuncOf(func(_ safejs.Value, args []safejs.Value) any { + go fn(args) + return nil + }) +} diff --git a/internal/message_event.go b/internal/message_event.go index 571e4f6..ed15398 100644 --- a/internal/message_event.go +++ b/internal/message_event.go @@ -13,6 +13,7 @@ type MessageEvent struct { data safejs.Value err error target *MessagePort + ports []*MessagePort } // Data returns this event's data or a parse error @@ -20,6 +21,11 @@ func (e MessageEvent) Data() (safejs.Value, error) { return e.data, errors.Wrapf(e.err, "failed to parse MessageEvent %+v", e.data) } +// Ports returns this event's ports or a parse error +func (e MessageEvent) Ports() ([]*MessagePort, error) { + return e.ports, errors.Wrapf(e.err, "failed to parse MessageEvent %+v", e.data) +} + func parseMessageEvent(v safejs.Value) MessageEvent { value, err := v.Get("target") if err != nil { @@ -33,8 +39,29 @@ func parseMessageEvent(v safejs.Value) MessageEvent { if err != nil { return MessageEvent{err: err} } + ports, err := v.Get("ports") + if err != nil { + return MessageEvent{err: err} + } + portsLen, err := ports.Length() + if err != nil { + return MessageEvent{err: err} + } + var msgports []*MessagePort + for i := 0; i < portsLen; i++ { + port, err := ports.Index(i) + if err != nil { + return MessageEvent{err: err} + } + msgport, err := WrapMessagePort(port) + if err != nil { + return MessageEvent{err: err} + } + msgports = append(msgports, msgport) + } return MessageEvent{ data: data, target: target, + ports: msgports, } } diff --git a/internal/message_port.go b/internal/message_port.go index 67e556e..7f97af7 100644 --- a/internal/message_port.go +++ b/internal/message_port.go @@ -38,48 +38,13 @@ func toJSSlice[Type any](slice []Type) []any { return newSlice } -func (p *MessagePort) Listen(ctx context.Context) (_ <-chan MessageEvent, err error) { - ctx, cancel := context.WithCancel(ctx) - defer func() { - if err != nil { - cancel() - } - }() - - events := make(chan MessageEvent) - messageHandler, err := nonBlocking(func(args []safejs.Value) { - events <- parseMessageEvent(args[0]) - }) - if err != nil { - return nil, err - } - errorHandler, err := nonBlocking(func(args []safejs.Value) { - events <- parseMessageEvent(args[0]) - }) +// Listen starts the MessagePort to listen on the "message" and "messageerror" events, until the ctx is canceled. +func (p *MessagePort) Listen(ctx context.Context) (<-chan MessageEvent, error) { + events, err := listen(ctx, p.jsMessagePort, "message", "messageerror") if err != nil { return nil, err } - go func() { - <-ctx.Done() - _, err := p.jsMessagePort.Call("removeEventListener", "message", messageHandler) - if err == nil { - messageHandler.Release() - } - _, err = p.jsMessagePort.Call("removeEventListener", "messageerror", errorHandler) - if err == nil { - errorHandler.Release() - } - close(events) - }() - _, err = p.jsMessagePort.Call("addEventListener", "message", messageHandler) - if err != nil { - return nil, err - } - _, err = p.jsMessagePort.Call("addEventListener", "messageerror", errorHandler) - if err != nil { - return nil, err - } if start, err := p.jsMessagePort.Get("start"); err == nil { if truthy, err := start.Truthy(); err == nil && truthy { if _, err := p.jsMessagePort.Call("start"); err != nil { diff --git a/internal/nonblock.go b/internal/nonblock.go deleted file mode 100644 index 94d063f..0000000 --- a/internal/nonblock.go +++ /dev/null @@ -1,12 +0,0 @@ -//go:build js && wasm - -package internal - -import "github.com/hack-pad/safejs" - -func nonBlocking(fn func(args []safejs.Value)) (safejs.Func, error) { - return safejs.FuncOf(func(_ safejs.Value, args []safejs.Value) any { - go fn(args) - return nil - }) -} diff --git a/internal/shared_worker_global_scope.go b/internal/shared_worker_global_scope.go index dcfa9d3..c6f0ca2 100644 --- a/internal/shared_worker_global_scope.go +++ b/internal/shared_worker_global_scope.go @@ -5,6 +5,7 @@ package internal import ( "context" "fmt" + "github.com/hack-pad/safejs" ) @@ -23,36 +24,16 @@ func WrapSharedWorkerGlobalScope(v safejs.Value) (*SharedWorkerGlobalScope, erro return &SharedWorkerGlobalScope{v}, nil } -func (p *SharedWorkerGlobalScope) Listen(ctx context.Context) (_ <-chan ConnectEvent, err error) { - ctx, cancel := context.WithCancel(ctx) - defer func() { - if err != nil { - cancel() - } - }() - - events := make(chan ConnectEvent) - connectHandler, err := nonBlocking(func(args []safejs.Value) { - events <- parseConnectEvent(args[0]) - }) - if err != nil { - return nil, err - } - go func() { - <-ctx.Done() - _, err := p.self.Call("removeEventListener", "connect", connectHandler) - if err == nil { - connectHandler.Release() - } - close(events) - }() - _, err = p.self.Call("addEventListener", "connect", connectHandler) +// Listen listens on the "connect" events, until the ctx is canceled. +func (p *SharedWorkerGlobalScope) Listen(ctx context.Context) (<-chan MessageEvent, error) { + events, err := listen(ctx, p.self, "connect") if err != nil { return nil, err } return events, nil } +// Close discards any tasks queued in the global scope's event loop, effectively closing this particular scope. func (p *SharedWorkerGlobalScope) Close() error { _, err := p.self.Call("close") return err diff --git a/sharedworker/self.go b/sharedworker/self.go index 9b698cd..f69b3ae 100644 --- a/sharedworker/self.go +++ b/sharedworker/self.go @@ -4,6 +4,7 @@ package sharedworker import ( "context" + "github.com/hack-pad/go-webworkers/internal" "github.com/hack-pad/safejs" @@ -34,9 +35,11 @@ func Self() (*GlobalSelf, error) { }, nil } -// Listen sends connect events on a channel for events fired by connection calls to this worker from within the parent scope. +// Listen sends message events representing the connect event on a channel for events fired +// by connection calls to this worker from within the parent scope. +// Users are expected to call the Ports() on the MessageEvent, and take the 1st one as the target MessagePort. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.ConnectEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { return s.scope.Listen(ctx) } From fda1ded7ecda3102a583d0be7e48fadd6d0d6ccd Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 11:15:14 +0800 Subject: [PATCH 04/12] package rename `internal` -> `types` to be exposed --- sharedworker/self.go | 8 ++++---- sharedworker/shared_worker.go | 8 ++++---- {internal => types}/listen_message_event.go | 2 +- {internal => types}/message_event.go | 2 +- {internal => types}/message_port.go | 2 +- {internal => types}/shared_worker_global_scope.go | 2 +- worker/self.go | 8 ++++---- worker/worker.go | 8 ++++---- 8 files changed, 20 insertions(+), 20 deletions(-) rename {internal => types}/listen_message_event.go (98%) rename {internal => types}/message_event.go (98%) rename {internal => types}/message_port.go (98%) rename {internal => types}/shared_worker_global_scope.go (98%) diff --git a/sharedworker/self.go b/sharedworker/self.go index f69b3ae..f4d9431 100644 --- a/sharedworker/self.go +++ b/sharedworker/self.go @@ -5,7 +5,7 @@ package sharedworker import ( "context" - "github.com/hack-pad/go-webworkers/internal" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" ) @@ -16,7 +16,7 @@ import ( // which in turns support receiving message via its Listen() and PostMessage(). type GlobalSelf struct { self safejs.Value - scope *internal.SharedWorkerGlobalScope + scope *types.SharedWorkerGlobalScope } // Self returns the global "self" @@ -25,7 +25,7 @@ func Self() (*GlobalSelf, error) { if err != nil { return nil, err } - scope, err := internal.WrapSharedWorkerGlobalScope(self) + scope, err := types.WrapSharedWorkerGlobalScope(self) if err != nil { return nil, err } @@ -39,7 +39,7 @@ func Self() (*GlobalSelf, error) { // by connection calls to this worker from within the parent scope. // Users are expected to call the Ports() on the MessageEvent, and take the 1st one as the target MessagePort. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { return s.scope.Listen(ctx) } diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index 76795f5..82410da 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -6,7 +6,7 @@ package sharedworker import ( "context" - "github.com/hack-pad/go-webworkers/internal" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" ) @@ -23,7 +23,7 @@ type SharedWorker struct { url string name string worker safejs.Value - msgport *internal.MessagePort + msgport *types.MessagePort } // New starts a worker with the given script's URL and name @@ -36,7 +36,7 @@ func New(url, name string) (*SharedWorker, error) { if err != nil { return nil, err } - msgport, err := internal.WrapMessagePort(port) + msgport, err := types.WrapMessagePort(port) if err != nil { return nil, err } @@ -91,6 +91,6 @@ func (w *SharedWorker) PostMessage(data safejs.Value, transfers []safejs.Value) // Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (w *SharedWorker) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { +func (w *SharedWorker) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { return w.msgport.Listen(ctx) } diff --git a/internal/listen_message_event.go b/types/listen_message_event.go similarity index 98% rename from internal/listen_message_event.go rename to types/listen_message_event.go index 4054da4..b720648 100644 --- a/internal/listen_message_event.go +++ b/types/listen_message_event.go @@ -1,4 +1,4 @@ -package internal +package types import ( "context" diff --git a/internal/message_event.go b/types/message_event.go similarity index 98% rename from internal/message_event.go rename to types/message_event.go index ed15398..32e45b0 100644 --- a/internal/message_event.go +++ b/types/message_event.go @@ -1,6 +1,6 @@ //go:build js && wasm -package internal +package types import ( "github.com/hack-pad/safejs" diff --git a/internal/message_port.go b/types/message_port.go similarity index 98% rename from internal/message_port.go rename to types/message_port.go index 7f97af7..d0b21ff 100644 --- a/internal/message_port.go +++ b/types/message_port.go @@ -1,6 +1,6 @@ //go:build js && wasm -package internal +package types import ( "context" diff --git a/internal/shared_worker_global_scope.go b/types/shared_worker_global_scope.go similarity index 98% rename from internal/shared_worker_global_scope.go rename to types/shared_worker_global_scope.go index c6f0ca2..43ae0c3 100644 --- a/internal/shared_worker_global_scope.go +++ b/types/shared_worker_global_scope.go @@ -1,6 +1,6 @@ //go:build js && wasm -package internal +package types import ( "context" diff --git a/worker/self.go b/worker/self.go index 28dab7b..3916a67 100644 --- a/worker/self.go +++ b/worker/self.go @@ -4,7 +4,7 @@ package worker import ( "context" - "github.com/hack-pad/go-webworkers/internal" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" ) @@ -13,7 +13,7 @@ import ( // Supports sending and receiving messages via PostMessage() and Listen(). type GlobalSelf struct { self safejs.Value - port *internal.MessagePort + port *types.MessagePort } // Self returns the global "self" @@ -22,7 +22,7 @@ func Self() (*GlobalSelf, error) { if err != nil { return nil, err } - port, err := internal.WrapMessagePort(self) + port, err := types.WrapMessagePort(self) if err != nil { return nil, err } @@ -47,7 +47,7 @@ func (s *GlobalSelf) PostMessage(message safejs.Value, transfers []safejs.Value) // Listen sends message events on a channel for events fired by worker.postMessage() calls inside the main thread's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { return s.port.Listen(ctx) } diff --git a/worker/worker.go b/worker/worker.go index be298b1..853ad7e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,7 @@ package worker import ( "context" - "github.com/hack-pad/go-webworkers/internal" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" ) @@ -20,7 +20,7 @@ var ( // Use Listen() and PostMessage() to communicate with the worker. type Worker struct { worker safejs.Value - port *internal.MessagePort + port *types.MessagePort } // Options contains optional configuration for new Workers @@ -47,7 +47,7 @@ func New(url string, options Options) (*Worker, error) { if err != nil { return nil, err } - port, err := internal.WrapMessagePort(worker) + port, err := types.WrapMessagePort(worker) if err != nil { return nil, err } @@ -97,6 +97,6 @@ func (w *Worker) PostMessage(data safejs.Value, transfers []safejs.Value) error // Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (w *Worker) Listen(ctx context.Context) (<-chan internal.MessageEvent, error) { +func (w *Worker) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { return w.port.Listen(ctx) } From 4be82d74d4b87ebdb6c891dd142897e7e8d2d626 Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 17:23:08 +0800 Subject: [PATCH 05/12] Avoid getting SharedWorker from shared worker (will fail) --- sharedworker/shared_worker.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index 82410da..d591f3a 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -12,9 +12,8 @@ import ( ) var ( - jsWorker = safejs.MustGetGlobal("SharedWorker") - jsURL = safejs.MustGetGlobal("URL") - jsBlob = safejs.MustGetGlobal("Blob") + jsURL = safejs.MustGetGlobal("URL") + jsBlob = safejs.MustGetGlobal("Blob") ) // SharedWorker is a Shared Web Worker, which represents a background task created via a script. @@ -28,7 +27,7 @@ type SharedWorker struct { // New starts a worker with the given script's URL and name func New(url, name string) (*SharedWorker, error) { - worker, err := jsWorker.New(url, name) + worker, err := safejs.MustGetGlobal("SharedWorker").New(url, name) if err != nil { return nil, err } From 380a3b4ae853ddcd5cceb580f379907b6f58ab20 Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 18:11:14 +0800 Subject: [PATCH 06/12] Change the way to check for SharedWorkerGlobalScope --- types/shared_worker_global_scope.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/types/shared_worker_global_scope.go b/types/shared_worker_global_scope.go index 43ae0c3..7454bbf 100644 --- a/types/shared_worker_global_scope.go +++ b/types/shared_worker_global_scope.go @@ -14,12 +14,12 @@ type SharedWorkerGlobalScope struct { } func WrapSharedWorkerGlobalScope(v safejs.Value) (*SharedWorkerGlobalScope, error) { - someMethod, err := v.Get("onconnect") + someMethod, err := v.Get("SharedWorkerGlobalScope") if err != nil { return nil, err } if truthy, err := someMethod.Truthy(); err != nil || !truthy { - return nil, fmt.Errorf("invalid SharedWorkerGlobalScope value: onconnect is not a function") + return nil, fmt.Errorf("invalid SharedWorkerGlobalScope value: SharedWorkerGlobalScope is not a function") } return &SharedWorkerGlobalScope{v}, nil } From a86993f5d03a39b9ae78231bceb1e2ede6cfe82d Mon Sep 17 00:00:00 2001 From: magodo Date: Fri, 8 Sep 2023 22:11:48 +0800 Subject: [PATCH 07/12] Split the message event for message and connect as connect has no `target` while message has --- sharedworker/self.go | 2 +- sharedworker/shared_worker.go | 2 +- types/listen_message_event.go | 10 +++-- types/message_event.go | 67 ----------------------------- types/message_event_connect.go | 43 ++++++++++++++++++ types/message_event_message.go | 39 +++++++++++++++++ types/message_port.go | 4 +- types/shared_worker_global_scope.go | 4 +- worker/self.go | 3 +- worker/worker.go | 3 +- 10 files changed, 99 insertions(+), 78 deletions(-) delete mode 100644 types/message_event.go create mode 100644 types/message_event_connect.go create mode 100644 types/message_event_message.go diff --git a/sharedworker/self.go b/sharedworker/self.go index f4d9431..2171d80 100644 --- a/sharedworker/self.go +++ b/sharedworker/self.go @@ -39,7 +39,7 @@ func Self() (*GlobalSelf, error) { // by connection calls to this worker from within the parent scope. // Users are expected to call the Ports() on the MessageEvent, and take the 1st one as the target MessagePort. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEventConnect, error) { return s.scope.Listen(ctx) } diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index d591f3a..1ab74d2 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -90,6 +90,6 @@ func (w *SharedWorker) PostMessage(data safejs.Value, transfers []safejs.Value) // Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (w *SharedWorker) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { +func (w *SharedWorker) Listen(ctx context.Context) (<-chan types.MessageEventMessage, error) { return w.msgport.Listen(ctx) } diff --git a/types/listen_message_event.go b/types/listen_message_event.go index b720648..65b1cb2 100644 --- a/types/listen_message_event.go +++ b/types/listen_message_event.go @@ -6,9 +6,13 @@ import ( "github.com/hack-pad/safejs" ) +type listenable interface { + MessageEventConnect | MessageEventMessage +} + // listen adds the EventListener on the listener for the specified events. // It returns a channel, which will send the MessageEvent(s) listened on, until the ctx is canceled. -func listen(ctx context.Context, listener safejs.Value, events ...string) (_ <-chan MessageEvent, err error) { +func listen[T listenable](ctx context.Context, listener safejs.Value, parseFunc func(safejs.Value) T, events ...string) (_ <-chan T, err error) { ctx, cancel := context.WithCancel(ctx) defer func() { if err != nil { @@ -16,12 +20,12 @@ func listen(ctx context.Context, listener safejs.Value, events ...string) (_ <-c } }() - eventsCh := make(chan MessageEvent) + eventsCh := make(chan T) var handlers []safejs.Func for range events { handler, err := nonBlocking(func(args []safejs.Value) { - eventsCh <- parseMessageEvent(args[0]) + eventsCh <- parseFunc(args[0]) }) if err != nil { return nil, err diff --git a/types/message_event.go b/types/message_event.go deleted file mode 100644 index 32e45b0..0000000 --- a/types/message_event.go +++ /dev/null @@ -1,67 +0,0 @@ -//go:build js && wasm - -package types - -import ( - "github.com/hack-pad/safejs" - "github.com/pkg/errors" -) - -// MessageEvent is received from the channel returned by Listen(). -// Represents a JS MessageEvent. -type MessageEvent struct { - data safejs.Value - err error - target *MessagePort - ports []*MessagePort -} - -// Data returns this event's data or a parse error -func (e MessageEvent) Data() (safejs.Value, error) { - return e.data, errors.Wrapf(e.err, "failed to parse MessageEvent %+v", e.data) -} - -// Ports returns this event's ports or a parse error -func (e MessageEvent) Ports() ([]*MessagePort, error) { - return e.ports, errors.Wrapf(e.err, "failed to parse MessageEvent %+v", e.data) -} - -func parseMessageEvent(v safejs.Value) MessageEvent { - value, err := v.Get("target") - if err != nil { - return MessageEvent{err: err} - } - target, err := WrapMessagePort(value) - if err != nil { - return MessageEvent{err: err} - } - data, err := v.Get("data") - if err != nil { - return MessageEvent{err: err} - } - ports, err := v.Get("ports") - if err != nil { - return MessageEvent{err: err} - } - portsLen, err := ports.Length() - if err != nil { - return MessageEvent{err: err} - } - var msgports []*MessagePort - for i := 0; i < portsLen; i++ { - port, err := ports.Index(i) - if err != nil { - return MessageEvent{err: err} - } - msgport, err := WrapMessagePort(port) - if err != nil { - return MessageEvent{err: err} - } - msgports = append(msgports, msgport) - } - return MessageEvent{ - data: data, - target: target, - ports: msgports, - } -} diff --git a/types/message_event_connect.go b/types/message_event_connect.go new file mode 100644 index 0000000..0374812 --- /dev/null +++ b/types/message_event_connect.go @@ -0,0 +1,43 @@ +package types + +import ( + "github.com/hack-pad/safejs" + "github.com/pkg/errors" +) + +// MessageEventConnect represents a JS MessageEvent received from the "connect" event. +type MessageEventConnect struct { + ports []*MessagePort + err error +} + +// Ports returns this event's ports or a parse error +func (e MessageEventConnect) Ports() ([]*MessagePort, error) { + return e.ports, errors.Wrapf(e.err, "failed to parse MessageEventConnect %+v", e.ports) +} + +func parseMessageEventConnect(v safejs.Value) MessageEventConnect { + ports, err := v.Get("ports") + if err != nil { + return MessageEventConnect{err: err} + } + portsLen, err := ports.Length() + if err != nil { + return MessageEventConnect{err: err} + } + var msgports []*MessagePort + for i := 0; i < portsLen; i++ { + port, err := ports.Index(i) + if err != nil { + return MessageEventConnect{err: err} + } + msgport, err := WrapMessagePort(port) + if err != nil { + return MessageEventConnect{err: err} + } + msgports = append(msgports, msgport) + } + return MessageEventConnect{ + ports: msgports, + } +} diff --git a/types/message_event_message.go b/types/message_event_message.go new file mode 100644 index 0000000..28f9da1 --- /dev/null +++ b/types/message_event_message.go @@ -0,0 +1,39 @@ +//go:build js && wasm + +package types + +import ( + "github.com/hack-pad/safejs" + "github.com/pkg/errors" +) + +// MessageEventMessage represents a JS MessageEvent received from the "message" event. +type MessageEventMessage struct { + data safejs.Value + err error + target *MessagePort +} + +// Data returns this event's data or a parse error +func (e MessageEventMessage) Data() (safejs.Value, error) { + return e.data, errors.Wrapf(e.err, "failed to parse MessageEventMessage %+v", e.data) +} + +func parseMessageEventMessage(v safejs.Value) MessageEventMessage { + value, err := v.Get("target") + if err != nil { + return MessageEventMessage{err: err} + } + target, err := WrapMessagePort(value) + if err != nil { + return MessageEventMessage{err: err} + } + data, err := v.Get("data") + if err != nil { + return MessageEventMessage{err: err} + } + return MessageEventMessage{ + data: data, + target: target, + } +} diff --git a/types/message_port.go b/types/message_port.go index d0b21ff..7318f62 100644 --- a/types/message_port.go +++ b/types/message_port.go @@ -39,8 +39,8 @@ func toJSSlice[Type any](slice []Type) []any { } // Listen starts the MessagePort to listen on the "message" and "messageerror" events, until the ctx is canceled. -func (p *MessagePort) Listen(ctx context.Context) (<-chan MessageEvent, error) { - events, err := listen(ctx, p.jsMessagePort, "message", "messageerror") +func (p *MessagePort) Listen(ctx context.Context) (<-chan MessageEventMessage, error) { + events, err := listen(ctx, p.jsMessagePort, parseMessageEventMessage, "message", "messageerror") if err != nil { return nil, err } diff --git a/types/shared_worker_global_scope.go b/types/shared_worker_global_scope.go index 7454bbf..bb084f3 100644 --- a/types/shared_worker_global_scope.go +++ b/types/shared_worker_global_scope.go @@ -25,8 +25,8 @@ func WrapSharedWorkerGlobalScope(v safejs.Value) (*SharedWorkerGlobalScope, erro } // Listen listens on the "connect" events, until the ctx is canceled. -func (p *SharedWorkerGlobalScope) Listen(ctx context.Context) (<-chan MessageEvent, error) { - events, err := listen(ctx, p.self, "connect") +func (p *SharedWorkerGlobalScope) Listen(ctx context.Context) (<-chan MessageEventConnect, error) { + events, err := listen(ctx, p.self, parseMessageEventConnect, "connect") if err != nil { return nil, err } diff --git a/worker/self.go b/worker/self.go index 3916a67..2da9000 100644 --- a/worker/self.go +++ b/worker/self.go @@ -4,6 +4,7 @@ package worker import ( "context" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" @@ -47,7 +48,7 @@ func (s *GlobalSelf) PostMessage(message safejs.Value, transfers []safejs.Value) // Listen sends message events on a channel for events fired by worker.postMessage() calls inside the main thread's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { +func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEventMessage, error) { return s.port.Listen(ctx) } diff --git a/worker/worker.go b/worker/worker.go index 853ad7e..9f10831 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,6 +5,7 @@ package worker import ( "context" + "github.com/hack-pad/go-webworkers/types" "github.com/hack-pad/safejs" @@ -97,6 +98,6 @@ func (w *Worker) PostMessage(data safejs.Value, transfers []safejs.Value) error // Listen sends message events on a channel for events fired by self.postMessage() calls inside the Worker's global scope. // Stops the listener and closes the channel when ctx is canceled. -func (w *Worker) Listen(ctx context.Context) (<-chan types.MessageEvent, error) { +func (w *Worker) Listen(ctx context.Context) (<-chan types.MessageEventMessage, error) { return w.port.Listen(ctx) } From 6df7159b16a21beb25001be5b52f95bec14baa36 Mon Sep 17 00:00:00 2001 From: magodo Date: Sun, 10 Sep 2023 12:01:39 +0800 Subject: [PATCH 08/12] Avoid getting Worker from shared worker (will fail) --- worker/worker.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 9f10831..b183456 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,9 +12,8 @@ import ( ) var ( - jsWorker = safejs.MustGetGlobal("Worker") - jsURL = safejs.MustGetGlobal("URL") - jsBlob = safejs.MustGetGlobal("Blob") + jsURL = safejs.MustGetGlobal("URL") + jsBlob = safejs.MustGetGlobal("Blob") ) // Worker is a Dedicaetd Web Worker, which represents a background task created via a script. @@ -44,7 +43,7 @@ func New(url string, options Options) (*Worker, error) { if err != nil { return nil, err } - worker, err := jsWorker.New(url, jsOptions) + worker, err := safejs.MustGetGlobal("Worker").New(url, jsOptions) if err != nil { return nil, err } From 39c226d7b6bee9667298857308712cd1d3328749 Mon Sep 17 00:00:00 2001 From: magodo Date: Tue, 12 Sep 2023 10:38:29 +0800 Subject: [PATCH 09/12] Expose `Close` for shared worker and MessagePort --- sharedworker/shared_worker.go | 5 +++++ types/message_port.go | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index 1ab74d2..88368be 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -93,3 +93,8 @@ func (w *SharedWorker) PostMessage(data safejs.Value, transfers []safejs.Value) func (w *SharedWorker) Listen(ctx context.Context) (<-chan types.MessageEventMessage, error) { return w.msgport.Listen(ctx) } + +// Close closes the message port of this worker. +func (w *SharedWorker) Close(ctx context.Context) error { + return w.msgport.Close() +} diff --git a/types/message_port.go b/types/message_port.go index 7318f62..2e6f06a 100644 --- a/types/message_port.go +++ b/types/message_port.go @@ -54,3 +54,9 @@ func (p *MessagePort) Listen(ctx context.Context) (<-chan MessageEventMessage, e } return events, nil } + +// Close disconnects the port, so it is no longer active. This stops the flow of messages to that port. +func (p *MessagePort) Close() error { + _, err := p.jsMessagePort.Call("close") + return err +} From 57d096c2bde9dec8958780262b12d9f7ba4fe5fa Mon Sep 17 00:00:00 2001 From: magodo Date: Tue, 12 Sep 2023 10:47:26 +0800 Subject: [PATCH 10/12] Remove ctx --- sharedworker/shared_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sharedworker/shared_worker.go b/sharedworker/shared_worker.go index 88368be..2d9afd1 100644 --- a/sharedworker/shared_worker.go +++ b/sharedworker/shared_worker.go @@ -95,6 +95,6 @@ func (w *SharedWorker) Listen(ctx context.Context) (<-chan types.MessageEventMes } // Close closes the message port of this worker. -func (w *SharedWorker) Close(ctx context.Context) error { +func (w *SharedWorker) Close() error { return w.msgport.Close() } From 27d68ceee71cf03bf21f5397c7f16297f777b366 Mon Sep 17 00:00:00 2001 From: magodo Date: Tue, 12 Sep 2023 20:59:32 +0800 Subject: [PATCH 11/12] SelfConnShared support `Location()` --- sharedworker/self.go | 21 ++++++------- types/shared_worker_global_scope.go | 47 +++++++++++++++++++++++++++++ types/workerlocation.go | 17 +++++++++++ 3 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 types/workerlocation.go diff --git a/sharedworker/self.go b/sharedworker/self.go index 2171d80..d136f33 100644 --- a/sharedworker/self.go +++ b/sharedworker/self.go @@ -15,8 +15,7 @@ import ( // first element represents the MessagePort connected with the channel with its parent, // which in turns support receiving message via its Listen() and PostMessage(). type GlobalSelf struct { - self safejs.Value - scope *types.SharedWorkerGlobalScope + self *types.SharedWorkerGlobalScope } // Self returns the global "self" @@ -30,8 +29,7 @@ func Self() (*GlobalSelf, error) { return nil, err } return &GlobalSelf{ - self: self, - scope: scope, + self: scope, }, nil } @@ -40,19 +38,20 @@ func Self() (*GlobalSelf, error) { // Users are expected to call the Ports() on the MessageEvent, and take the 1st one as the target MessagePort. // Stops the listener and closes the channel when ctx is canceled. func (s *GlobalSelf) Listen(ctx context.Context) (<-chan types.MessageEventConnect, error) { - return s.scope.Listen(ctx) + return s.self.Listen(ctx) } // Close discards any tasks queued in the global scope's event loop, effectively closing this particular scope. func (s *GlobalSelf) Close() error { - return s.scope.Close() + return s.self.Close() } // Name returns the name that the Worker was (optionally) given when it was created. func (s *GlobalSelf) Name() (string, error) { - name, err := s.self.Get("name") - if err != nil { - return "", err - } - return name.String() + return s.self.Name() +} + +// Location returns the WorkerLocation in the form of url.URL for this worker. +func (s *GlobalSelf) Location() (*types.WorkerLocation, error) { + return s.self.Location() } diff --git a/types/shared_worker_global_scope.go b/types/shared_worker_global_scope.go index bb084f3..83bbe21 100644 --- a/types/shared_worker_global_scope.go +++ b/types/shared_worker_global_scope.go @@ -38,3 +38,50 @@ func (p *SharedWorkerGlobalScope) Close() error { _, err := p.self.Call("close") return err } + +// Name returns the name that the Worker was (optionally) given when it was created. +func (p *SharedWorkerGlobalScope) Name() (string, error) { + v, err := p.self.Get("name") + if err != nil { + return "", err + } + return v.String() +} + +// Location returns the WorkerLocation in the form of url.URL for this worker. +func (p *SharedWorkerGlobalScope) Location() (*WorkerLocation, error) { + loc, err := p.self.Get("location") + if err != nil { + return nil, err + } + + location := &WorkerLocation{} + l := []struct { + target *string + prop string + }{ + {&location.Hash, "hash"}, + {&location.Host, "host"}, + {&location.HostName, "hostname"}, + {&location.Href, "href"}, + {&location.Origin, "origin"}, + {&location.PathName, "pathname"}, + {&location.Port, "port"}, + {&location.Protocol, "protocol"}, + {&location.Search, "search"}, + } + + for _, entry := range l { + v, err := loc.Get(entry.prop) + if err != nil { + return nil, err + } + vv, err := v.String() + if err != nil { + return nil, err + } + *entry.target = vv + } + + return location, nil +} diff --git a/types/workerlocation.go b/types/workerlocation.go new file mode 100644 index 0000000..37d03c3 --- /dev/null +++ b/types/workerlocation.go @@ -0,0 +1,17 @@ +package types + +type WorkerLocation struct { + Hash string + Host string + HostName string + Href string + Origin string + PathName string + Port string + Protocol string + Search string +} + +// func (loc WorkerLocation) String() string { +// return loc.Href +// } From 7883d6d64483abd7e5e10c6457a9dd9fb4934b01 Mon Sep 17 00:00:00 2001 From: magodo Date: Tue, 12 Sep 2023 21:03:13 +0800 Subject: [PATCH 12/12] fix test --- sharedworker/self_test.go | 20 +++++++++++--------- types/workerlocation.go | 6 +++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sharedworker/self_test.go b/sharedworker/self_test.go index 1665611..2854be4 100644 --- a/sharedworker/self_test.go +++ b/sharedworker/self_test.go @@ -4,34 +4,36 @@ package sharedworker import ( "testing" - - "github.com/hack-pad/safejs" ) -func TestSelf(t *testing.T) { +func TestSelfName(t *testing.T) { t.Skip("This test case only runs inside a worker") t.Parallel() self, err := Self() if err != nil { t.Fatal(err) } - if !self.self.Equal(safejs.MustGetGlobal("self")) { - t.Error("self is not equal to the global self") + name, err := self.Name() + if err != nil { + t.Fatal(err) + } + if name != "" { + t.Errorf("Expected %q, got %q", "", name) } } -func TestSelfName(t *testing.T) { +func TestSelfLocation(t *testing.T) { t.Skip("This test case only runs inside a worker") t.Parallel() self, err := Self() if err != nil { t.Fatal(err) } - name, err := self.Name() + loc, err := self.Location() if err != nil { t.Fatal(err) } - if name != "" { - t.Errorf("Expected %q, got %q", "", name) + if loc.String() == "" { + t.Errorf("Expected %q, got %q", loc.String(), "") } } diff --git a/types/workerlocation.go b/types/workerlocation.go index 37d03c3..576ed72 100644 --- a/types/workerlocation.go +++ b/types/workerlocation.go @@ -12,6 +12,6 @@ type WorkerLocation struct { Search string } -// func (loc WorkerLocation) String() string { -// return loc.Href -// } +func (loc WorkerLocation) String() string { + return loc.Href +}