Skip to content

Commit

Permalink
fix: concurrent map writes #707
Browse files Browse the repository at this point in the history
  • Loading branch information
Larvan2 authored and riolurs committed Aug 31, 2023
1 parent 54fee7b commit 44d1d2e
Showing 1 changed file with 58 additions and 44 deletions.
102 changes: 58 additions & 44 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/netip"
"net/url"
"strconv"
"sync"
"time"

"github.com/Dreamacro/clash/common/atomic"
Expand All @@ -36,7 +37,7 @@ type Proxy struct {
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
url string
extra map[string]*extraProxyState
extra sync.Map
}

// Alive implements C.Proxy
Expand All @@ -46,10 +47,8 @@ func (p *Proxy) Alive() bool {

// AliveForTestUrl implements C.Proxy
func (p *Proxy) AliveForTestUrl(url string) bool {
if p.extra != nil {
if state, ok := p.extra[url]; ok {
return state.alive.Load()
}
if state, ok := p.extra.Load(url); ok {
return state.(*extraProxyState).alive.Load()
}

return p.alive.Load()
Expand Down Expand Up @@ -88,16 +87,16 @@ func (p *Proxy) DelayHistory() []C.DelayHistory {
for _, item := range queueM {
histories = append(histories, item)
}

return histories
}

// DelayHistoryForTestUrl implements C.Proxy
func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
var queueM []C.DelayHistory
if p.extra != nil {
if state, ok := p.extra[url]; ok {
queueM = state.history.Copy()
}

if state, ok := p.extra.Load(url); ok {
queueM = state.(*extraProxyState).history.Copy()
}

if queueM == nil {
Expand All @@ -112,19 +111,25 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
}

func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extra := map[string][]C.DelayHistory{}
if p.extra != nil && len(p.extra) != 0 {
for testUrl, option := range p.extra {
histories := []C.DelayHistory{}
queueM := option.history.Copy()
for _, item := range queueM {
histories = append(histories, item)
}
extraHistory := map[string][]C.DelayHistory{}

p.extra.Range(func(k, v interface{}) bool {

extra[testUrl] = histories
testUrl := k.(string)
state := v.(*extraProxyState)

histories := []C.DelayHistory{}
queueM := state.history.Copy()

for _, item := range queueM {
histories = append(histories, item)
}
}
return extra

extraHistory[testUrl] = histories

return true
})
return extraHistory
}

// LastDelay return last history record. if proxy is not alive, return the max value of uint16.
Expand All @@ -149,11 +154,9 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
alive := p.alive.Load()
history := p.history.Last()

if p.extra != nil {
if state, ok := p.extra[url]; ok {
alive = state.alive.Load()
history = state.history.Last()
}
if state, ok := p.extra.Load(url); ok {
alive = state.(*extraProxyState).alive.Load()
history = state.(*extraProxyState).history.Last()
}

if !alive {
Expand Down Expand Up @@ -214,23 +217,19 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
record.Delay = t
}

if p.extra == nil {
p.extra = map[string]*extraProxyState{}
}

state, ok := p.extra[url]
state, ok := p.extra.Load(url)
if !ok {
state = &extraProxyState{
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
}
p.extra[url] = state
p.extra.Store(url, state)
}

state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
state.(*extraProxyState).alive.Store(alive)
state.(*extraProxyState).history.Put(record)
if state.(*extraProxyState).history.Len() > defaultHistoriesNum {
state.(*extraProxyState).history.Pop()
}
default:
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
Expand Down Expand Up @@ -307,7 +306,12 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
}

func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), "", map[string]*extraProxyState{}}
return &Proxy{
ProxyAdapter: adapter,
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
url: "",
extra: sync.Map{}}
}

func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
Expand Down Expand Up @@ -350,14 +354,24 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin
return C.OriginalHistory
}

if p.extra == nil {
store = C.ExtraHistory
} else {
if _, ok := p.extra[url]; ok {
store = C.ExtraHistory
} else if len(p.extra) < 2*C.DefaultMaxHealthCheckUrlNum {
store = C.ExtraHistory
}
length := 0
p.extra.Range(func(_, _ interface{}) bool {
length++
return length < 2*C.DefaultMaxHealthCheckUrlNum
})

if length == 0 {
return C.ExtraHistory
}

_, ok := p.extra.Load(url)
if ok {
return C.ExtraHistory
}

if length < 2*C.DefaultMaxHealthCheckUrlNum {
return C.ExtraHistory
}

return store
}

0 comments on commit 44d1d2e

Please sign in to comment.