Skip to content

Commit

Permalink
Merge pull request #406 from cybercongress/400-data-gpu
Browse files Browse the repository at this point in the history
Refactoring of data preparation for GPU, added workers
  • Loading branch information
cyborgshead committed Oct 15, 2019
2 parents dca9061 + d14bc4f commit 73d44d5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 48 deletions.
37 changes: 13 additions & 24 deletions x/rank/internal/keeper/calculate_cpu.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package keeper
import (
"github.com/cybercongress/cyberd/x/link"
"github.com/cybercongress/cyberd/x/rank/internal/types"

"sync"
)

const (
Expand Down Expand Up @@ -54,32 +52,23 @@ func step(ctx *types.CalculationContext, defaultRankWithCorrection float64, prev

rank := append(make([]float64, 0, len(prevrank)), prevrank...)

var wg sync.WaitGroup
wg.Add(len(ctx.GetInLinks()))

for cid := range ctx.GetInLinks() {
_, sortedCids, ok := ctx.GetSortedInLinks(cid)

go func(i link.CidNumber) {
defer wg.Done()
_, sortedCids, ok := ctx.GetSortedInLinks(i)

if !ok {
return
} else {
ksum := float64(0)
for _, j := range sortedCids {
linkStake := getOverallLinkStake(ctx, j, i)
jCidOutStake := getOverallOutLinksStake(ctx, j)
weight := float64(linkStake) / float64(jCidOutStake)
ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion
}

rank[i] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion
if !ok {
continue
} else {
ksum := float64(0)
for _, j := range sortedCids {
linkStake := getOverallLinkStake(ctx, j, cid)
jCidOutStake := getOverallOutLinksStake(ctx, j)
weight := float64(linkStake) / float64(jCidOutStake)
ksum = prevrank[j]*weight + ksum //force no-fma here by explicit conversion
}

}(link.CidNumber(cid))
rank[cid] = ksum*d + defaultRankWithCorrection //force no-fma here by explicit conversion
}
}
wg.Wait()

return rank
}

Expand Down
73 changes: 49 additions & 24 deletions x/rank/internal/keeper/calculate_gpu.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package keeper

import (
"github.com/cybercongress/cyberd/x/link"

"github.com/cybercongress/cyberd/x/rank/internal/types"
"github.com/tendermint/tendermint/libs/log"

"math"
"sync"
"time"
)
Expand All @@ -18,7 +19,7 @@ import (
*/
import "C"

func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float64 {
func calculateRankGPU(ctx *types.CalculationContext, logger log.Logger) []float64 {
start := time.Now()
if ctx.GetCidsCount() == 0 {
return make([]float64, 0)
Expand All @@ -44,35 +45,59 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float
stakes[uint64(acc)] = stake
}

ch := make(chan int64, 100000)
var wg sync.WaitGroup
var lock1 sync.Mutex
var lock2 sync.Mutex
wg.Add(int(cidsCount))

for i := int64(0); i < cidsCount; i++ {
/* Fill values */
go func(count int64) {
defer wg.Done()
if inLinks, sortedCids, ok := ctx.GetSortedInLinks(link.CidNumber(i)); ok {
for _, cid := range sortedCids {
inLinksCount[count] += uint32(len(inLinks[cid]))
for acc := range inLinks[cid] {
inLinksOuts = append(inLinksOuts, uint64(cid))
inLinksUsers = append(inLinksUsers, uint64(acc))
}
// the worker's function
f := func(count int64) {
defer wg.Done()
if inLinks, sortedCids, ok := ctx.GetSortedInLinks(link.CidNumber(count)); ok {
for _, cid := range sortedCids {
inLinksCount[count] += uint32(len(inLinks[cid]))
for acc := range inLinks[cid] {
lock2.Lock()
inLinksOuts = append(inLinksOuts, uint64(cid))
inLinksUsers = append(inLinksUsers, uint64(acc))
lock2.Unlock()
}
linksCount += uint64(inLinksCount[count])
}

if outLinks, ok := outLinks[link.CidNumber(count)]; ok {
for _, accs := range outLinks {
outLinksCount[count] += uint32(len(accs))
for acc := range accs {
outLinksUsers = append(outLinksUsers, uint64(acc))
}
linksCount += uint64(inLinksCount[count])
}

if outLinks, ok := outLinks[link.CidNumber(count)]; ok {
for _, accs := range outLinks {
outLinksCount[count] += uint32(len(accs))
for acc := range accs {
lock1.Lock()
outLinksUsers = append(outLinksUsers, uint64(acc))
lock1.Unlock()
}
}
}(i)
}
}

countWorkers := int64(math.Min(10000, float64(cidsCount)))

// here the workers start
for i:=int64(0); i < countWorkers; i++ {
go func() {
var count int64
for {
count = <- ch
f(count)
}
}()
}

// data is added to the channel for workers
for i := int64(0); i < cidsCount; i++ {
ch <- i
}

// waiting for a while all workers will finish work
wg.Wait()

/* Convert to C types */
Expand All @@ -89,7 +114,7 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float
cInLinksUsers := (*C.ulong)(&inLinksUsers[0])
cOutLinksUsers := (*C.ulong)(&outLinksUsers[0])

logger.Debug("Rank: data for gpu preparation", "time", time.Since(start))
logger.Info("Rank: data for gpu preparation", "time", time.Since(start))

start = time.Now()
cRank := (*C.double)(&rank[0])
Expand All @@ -99,7 +124,7 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float
cInLinksOuts, cInLinksUsers, cOutLinksUsers,
cRank,
)
logger.Debug("Rank: gpu calculations", "time", time.Since(start))
logger.Info("Rank: gpu calculations", "time", time.Since(start))

return rank
}

0 comments on commit 73d44d5

Please sign in to comment.