From d14bc4fba5c8431d733322e814a7019b24a1ea84 Mon Sep 17 00:00:00 2001 From: Valery Litvin Date: Wed, 9 Oct 2019 15:58:05 +0300 Subject: [PATCH] Refactoring of data preparation for GPU, added workers --- x/rank/internal/keeper/calculate_cpu.go | 37 +++++-------- x/rank/internal/keeper/calculate_gpu.go | 73 +++++++++++++++++-------- 2 files changed, 62 insertions(+), 48 deletions(-) mode change 100644 => 100755 x/rank/internal/keeper/calculate_cpu.go mode change 100644 => 100755 x/rank/internal/keeper/calculate_gpu.go diff --git a/x/rank/internal/keeper/calculate_cpu.go b/x/rank/internal/keeper/calculate_cpu.go old mode 100644 new mode 100755 index 62e612d8..7970bfe1 --- a/x/rank/internal/keeper/calculate_cpu.go +++ b/x/rank/internal/keeper/calculate_cpu.go @@ -3,8 +3,6 @@ package keeper import ( "github.com/cybercongress/cyberd/x/link" "github.com/cybercongress/cyberd/x/rank/internal/types" - - "sync" ) const ( @@ -54,32 +52,23 @@ func step(ctx *types.CalculationContext, defaultRankWithCorrection float64, prev rank := append(make([]float64, 0, len(prevrank)), prevrank...) - var wg sync.WaitGroup - wg.Add(len(ctx.GetInLinks())) - for cid := range ctx.GetInLinks() { + _, sortedCids, ok := ctx.GetSortedInLinks(cid) - go func(i link.CidNumber) { - defer wg.Done() - _, sortedCids, ok := ctx.GetSortedInLinks(i) - - if !ok { - return - } else { - ksum := float64(0) - for _, j := range sortedCids { - linkStake := getOverallLinkStake(ctx, j, i) - jCidOutStake := getOverallOutLinksStake(ctx, j) - weight := float64(linkStake) / float64(jCidOutStake) - ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion - } - - rank[i] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion + if !ok { + continue + } else { + ksum := float64(0) + for _, j := range sortedCids { + linkStake := getOverallLinkStake(ctx, j, cid) + jCidOutStake := getOverallOutLinksStake(ctx, j) + weight := float64(linkStake) / float64(jCidOutStake) + ksum = prevrank[j]*weight + ksum //force no-fma here by explicit conversion } - - }(link.CidNumber(cid)) + rank[cid] = ksum*d + defaultRankWithCorrection //force no-fma here by explicit conversion + } } - wg.Wait() + return rank } diff --git a/x/rank/internal/keeper/calculate_gpu.go b/x/rank/internal/keeper/calculate_gpu.go old mode 100644 new mode 100755 index 427fadcc..ef00fb2d --- a/x/rank/internal/keeper/calculate_gpu.go +++ b/x/rank/internal/keeper/calculate_gpu.go @@ -4,9 +4,10 @@ package keeper import ( "github.com/cybercongress/cyberd/x/link" - + "github.com/cybercongress/cyberd/x/rank/internal/types" "github.com/tendermint/tendermint/libs/log" + "math" "sync" "time" ) @@ -18,7 +19,7 @@ import ( */ import "C" -func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float64 { +func calculateRankGPU(ctx *types.CalculationContext, logger log.Logger) []float64 { start := time.Now() if ctx.GetCidsCount() == 0 { return make([]float64, 0) @@ -44,35 +45,59 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float stakes[uint64(acc)] = stake } + ch := make(chan int64, 100000) var wg sync.WaitGroup + var lock1 sync.Mutex + var lock2 sync.Mutex wg.Add(int(cidsCount)) - for i := int64(0); i < cidsCount; i++ { - /* Fill values */ - go func(count int64) { - defer wg.Done() - if inLinks, sortedCids, ok := ctx.GetSortedInLinks(link.CidNumber(i)); ok { - for _, cid := range sortedCids { - inLinksCount[count] += uint32(len(inLinks[cid])) - for acc := range inLinks[cid] { - inLinksOuts = append(inLinksOuts, uint64(cid)) - inLinksUsers = append(inLinksUsers, uint64(acc)) - } + // the worker's function + f := func(count int64) { + defer wg.Done() + if inLinks, sortedCids, ok := ctx.GetSortedInLinks(link.CidNumber(count)); ok { + for _, cid := range sortedCids { + inLinksCount[count] += uint32(len(inLinks[cid])) + for acc := range inLinks[cid] { + lock2.Lock() + inLinksOuts = append(inLinksOuts, uint64(cid)) + inLinksUsers = append(inLinksUsers, uint64(acc)) + lock2.Unlock() } - linksCount += uint64(inLinksCount[count]) } - - if outLinks, ok := outLinks[link.CidNumber(count)]; ok { - for _, accs := range outLinks { - outLinksCount[count] += uint32(len(accs)) - for acc := range accs { - outLinksUsers = append(outLinksUsers, uint64(acc)) - } + linksCount += uint64(inLinksCount[count]) + } + + if outLinks, ok := outLinks[link.CidNumber(count)]; ok { + for _, accs := range outLinks { + outLinksCount[count] += uint32(len(accs)) + for acc := range accs { + lock1.Lock() + outLinksUsers = append(outLinksUsers, uint64(acc)) + lock1.Unlock() } } - }(i) + } + } + + countWorkers := int64(math.Min(10000, float64(cidsCount))) + + // here the workers start + for i:=int64(0); i < countWorkers; i++ { + go func() { + var count int64 + for { + count = <- ch + f(count) + } + }() + } + + // data is added to the channel for workers + for i := int64(0); i < cidsCount; i++ { + ch <- i } + // waiting for a while all workers will finish work wg.Wait() /* Convert to C types */ @@ -89,7 +114,7 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float cInLinksUsers := (*C.ulong)(&inLinksUsers[0]) cOutLinksUsers := (*C.ulong)(&outLinksUsers[0]) - logger.Debug("Rank: data for gpu preparation", "time", time.Since(start)) + logger.Info("Rank: data for gpu preparation", "time", time.Since(start)) start = time.Now() cRank := (*C.double)(&rank[0]) @@ -99,7 +124,7 @@ func calculateRankGPU(ctx *types2.CalculationContext, logger log.Logger) []float cInLinksOuts, cInLinksUsers, cOutLinksUsers, cRank, ) - logger.Debug("Rank: gpu calculations", "time", time.Since(start)) + logger.Info("Rank: gpu calculations", "time", time.Since(start)) return rank }