Skip to content

Commit

Permalink
bridgev2: add timeouts for event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Sep 4, 2024
1 parent e5ea10d commit 6c8519d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 49 deletions.
1 change: 1 addition & 0 deletions bridgev2/bridgeconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type BridgeConfig struct {
CommandPrefix string `yaml:"command_prefix"`
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
AsyncEvents bool `yaml:"async_events"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
Expand Down
1 change: 1 addition & 0 deletions bridgev2/bridgeconfig/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func doUpgrade(helper up.Helper) {
helper.Copy(up.Str, "bridge", "command_prefix")
helper.Copy(up.Bool, "bridge", "personal_filtering_spaces")
helper.Copy(up.Bool, "bridge", "private_chat_portal_meta")
helper.Copy(up.Bool, "bridge", "async_events")
helper.Copy(up.Bool, "bridge", "bridge_matrix_leave")
helper.Copy(up.Bool, "bridge", "tag_only_on_create")
helper.Copy(up.Bool, "bridge", "mute_only_on_create")
Expand Down
3 changes: 3 additions & 0 deletions bridgev2/matrix/mxmain/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ bridge:
# Whether the bridge should set names and avatars explicitly for DM portals.
# This is only necessary when using clients that don't support MSC4171.
private_chat_portal_meta: false
# Should events be handled asynchronously within portal rooms?
# If true, events may end up being out of order, but slow events won't block other ones.
async_events: false

# Should leaving Matrix rooms be bridged as leaving groups on the remote network?
bridge_matrix_leave: false
Expand Down
115 changes: 72 additions & 43 deletions bridgev2/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -275,23 +276,49 @@ func (portal *Portal) queueEvent(ctx context.Context, evt portalEvent) {

func (portal *Portal) eventLoop() {
for rawEvt := range portal.events {
switch evt := rawEvt.(type) {
case *portalMatrixEvent:
portal.handleMatrixEvent(evt.sender, evt.evt)
case *portalRemoteEvent:
portal.handleRemoteEvent(evt.source, evt.evt)
case *portalCreateEvent:
portal.handleCreateEvent(evt)
default:
panic(fmt.Errorf("illegal type %T in eventLoop", evt))
portal.handleSingleEventAsync(rawEvt)
}
}

func (portal *Portal) handleSingleEventAsync(rawEvt any) {
log := portal.Log.With().Logger()
if _, isCreate := rawEvt.(*portalCreateEvent); isCreate {
portal.handleSingleEvent(&log, rawEvt, func() {})
} else if portal.Bridge.Config.AsyncEvents {
go portal.handleSingleEvent(&log, rawEvt, func() {})
} else {
doneCh := make(chan struct{})
var backgrounded atomic.Bool
go portal.handleSingleEvent(&log, rawEvt, func() {
close(doneCh)
if backgrounded.Load() {
log.Debug().Msg("Event that took too long finally finished handling")
}
})
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
for i := 0; i < 10; i++ {
select {
case <-doneCh:
if i > 0 {
log.Debug().Msg("Event that took long finished handling")
}
return
case <-tick.C:
log.Warn().Msg("Event handling is taking long")
}
}
log.Warn().Msg("Event handling is taking too long, continuing in background")
backgrounded.Store(true)
}
}

func (portal *Portal) handleCreateEvent(evt *portalCreateEvent) {
func (portal *Portal) handleSingleEvent(log *zerolog.Logger, rawEvt any, doneCallback func()) {
ctx := log.WithContext(context.Background())
defer func() {
doneCallback()
if err := recover(); err != nil {
logEvt := zerolog.Ctx(evt.ctx).Error()
logEvt := log.Error()
if realErr, ok := err.(error); ok {
logEvt = logEvt.Err(realErr)
} else {
Expand All @@ -300,10 +327,36 @@ func (portal *Portal) handleCreateEvent(evt *portalCreateEvent) {
logEvt.
Bytes("stack", debug.Stack()).
Msg("Portal creation panicked")
evt.cb(fmt.Errorf("portal creation panicked"))
switch evt := rawEvt.(type) {
case *portalMatrixEvent:
if evt.evt.ID != "" {
go portal.sendErrorStatus(ctx, evt.evt, ErrPanicInEventHandler)
}
case *portalCreateEvent:
evt.cb(fmt.Errorf("portal creation panicked"))
}
}
}()
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
switch evt := rawEvt.(type) {
case *portalMatrixEvent:
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
return c.Str("action", "handle matrix event").
Stringer("event_id", evt.evt.ID).
Str("event_type", evt.evt.Type.Type)
})
portal.handleMatrixEvent(ctx, evt.sender, evt.evt)
case *portalRemoteEvent:
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
return c.Str("action", "handle remote event").
Str("source_id", string(evt.source.ID))
})
portal.handleRemoteEvent(ctx, evt.source, evt.evt)
case *portalCreateEvent:
*log = *zerolog.Ctx(evt.ctx)
evt.cb(portal.createMatrixRoomInLoop(evt.ctx, evt.source, evt.info, nil))
default:
panic(fmt.Errorf("illegal type %T in eventLoop", evt))
}
}

func (portal *Portal) FindPreferredLogin(ctx context.Context, user *User, allowRelay bool) (*UserLogin, *database.UserPortal, error) {
Expand Down Expand Up @@ -393,29 +446,8 @@ func (portal *Portal) checkConfusableName(ctx context.Context, userID id.UserID,
return false
}

func (portal *Portal) handleMatrixEvent(sender *User, evt *event.Event) {
log := portal.Log.With().
Str("action", "handle matrix event").
Stringer("event_id", evt.ID).
Str("event_type", evt.Type.Type).
Logger()
ctx := log.WithContext(context.TODO())
defer func() {
if err := recover(); err != nil {
logEvt := log.Error()
if realErr, ok := err.(error); ok {
logEvt = logEvt.Err(realErr)
} else {
logEvt = logEvt.Any(zerolog.ErrorFieldName, err)
}
logEvt.
Bytes("stack", debug.Stack()).
Msg("Matrix event handler panicked")
if evt.ID != "" {
go portal.sendErrorStatus(ctx, evt, ErrPanicInEventHandler)
}
}
}()
func (portal *Portal) handleMatrixEvent(ctx context.Context, sender *User, evt *event.Event) {
log := zerolog.Ctx(ctx)
if evt.Mautrix.EventSource&event.SourceEphemeral != 0 {
switch evt.Type {
case event.EphemeralEventReceipt:
Expand Down Expand Up @@ -1458,11 +1490,8 @@ func (portal *Portal) handleMatrixRedaction(ctx context.Context, sender *UserLog
portal.sendSuccessStatus(ctx, evt)
}

func (portal *Portal) handleRemoteEvent(source *UserLogin, evt RemoteEvent) {
log := portal.Log.With().
Str("source_id", string(source.ID)).
Str("action", "handle remote event").
Logger()
func (portal *Portal) handleRemoteEvent(ctx context.Context, source *UserLogin, evt RemoteEvent) {
log := zerolog.Ctx(ctx)
defer func() {
if err := recover(); err != nil {
logEvt := log.Error()
Expand All @@ -1481,7 +1510,6 @@ func (portal *Portal) handleRemoteEvent(source *UserLogin, evt RemoteEvent) {
c = c.Stringer("bridge_evt_type", evtType)
return evt.AddLogContext(c)
})
ctx := log.WithContext(context.TODO())
if portal.MXID == "" {
mcp, ok := evt.(RemoteEventThatMayCreatePortal)
if !ok || !mcp.ShouldCreatePortal() {
Expand Down Expand Up @@ -1823,7 +1851,8 @@ func (portal *Portal) handleRemoteUpsert(ctx context.Context, source *UserLogin,
}
if len(res.SubEvents) > 0 {
for _, subEvt := range res.SubEvents {
portal.handleRemoteEvent(source, subEvt)
log := portal.Log.With().Str("source_id", string(source.ID)).Str("action", "handle remote subevent").Logger()
portal.handleRemoteEvent(log.WithContext(ctx), source, subEvt)
}
}
return res.ContinueMessageHandling
Expand Down
20 changes: 14 additions & 6 deletions bridgev2/portalinternal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ func (portal *PortalInternals) EventLoop() {
(*Portal)(portal).eventLoop()
}

func (portal *PortalInternals) HandleCreateEvent(evt *portalCreateEvent) {
(*Portal)(portal).handleCreateEvent(evt)
func (portal *PortalInternals) HandleSingleEventAsync(rawEvt any) {
(*Portal)(portal).handleSingleEventAsync(rawEvt)
}

func (portal *PortalInternals) HandleSingleEvent(log *zerolog.Logger, rawEvt any, doneCallback func()) {
(*Portal)(portal).handleSingleEvent(log, rawEvt, doneCallback)
}

func (portal *PortalInternals) SendSuccessStatus(ctx context.Context, evt *event.Event) {
Expand All @@ -53,8 +57,8 @@ func (portal *PortalInternals) CheckConfusableName(ctx context.Context, userID i
return (*Portal)(portal).checkConfusableName(ctx, userID, name)
}

func (portal *PortalInternals) HandleMatrixEvent(sender *User, evt *event.Event) {
(*Portal)(portal).handleMatrixEvent(sender, evt)
func (portal *PortalInternals) HandleMatrixEvent(ctx context.Context, sender *User, evt *event.Event) {
(*Portal)(portal).handleMatrixEvent(ctx, sender, evt)
}

func (portal *PortalInternals) HandleMatrixReceipts(ctx context.Context, evt *event.Event) {
Expand Down Expand Up @@ -109,8 +113,8 @@ func (portal *PortalInternals) HandleMatrixRedaction(ctx context.Context, sender
(*Portal)(portal).handleMatrixRedaction(ctx, sender, origSender, evt)
}

func (portal *PortalInternals) HandleRemoteEvent(source *UserLogin, evt RemoteEvent) {
(*Portal)(portal).handleRemoteEvent(source, evt)
func (portal *PortalInternals) HandleRemoteEvent(ctx context.Context, source *UserLogin, evt RemoteEvent) {
(*Portal)(portal).handleRemoteEvent(ctx, source, evt)
}

func (portal *PortalInternals) GetIntentAndUserMXIDFor(ctx context.Context, sender EventSender, source *UserLogin, otherLogins []*UserLogin, evtType RemoteEventType) (intent MatrixAPI, extraUserID id.UserID) {
Expand Down Expand Up @@ -297,6 +301,10 @@ func (portal *PortalInternals) DoThreadBackfill(ctx context.Context, source *Use
(*Portal)(portal).doThreadBackfill(ctx, source, threadID)
}

func (portal *PortalInternals) CutoffMessages(ctx context.Context, messages []*BackfillMessage, aggressiveDedup, forward bool, lastMessage *database.Message) []*BackfillMessage {
return (*Portal)(portal).cutoffMessages(ctx, messages, aggressiveDedup, forward, lastMessage)
}

func (portal *PortalInternals) SendBackfill(ctx context.Context, source *UserLogin, messages []*BackfillMessage, forceForward, markRead, inThread bool) {
(*Portal)(portal).sendBackfill(ctx, source, messages, forceForward, markRead, inThread)
}
Expand Down

0 comments on commit 6c8519d

Please sign in to comment.