Skip to content

Commit

Permalink
feat: Add tokenizer interface for Drain Training (#13069)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed May 29, 2024
1 parent 6e45550 commit 797bb64
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 332 deletions.
25 changes: 9 additions & 16 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package drain
import (
"math"
"strconv"
"strings"
"unicode"

"github.com/hashicorp/golang-lru/v2/simplelru"
Expand Down Expand Up @@ -161,6 +160,7 @@ func New(config *Config, metrics *Metrics) *Drain {
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: splittingTokenizer{}, // Default to this for now
}
return d
}
Expand All @@ -171,6 +171,7 @@ type Drain struct {
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -182,10 +183,13 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.getContentAsTokens(content), nil, ts)
return d.train(d.tokenizer.Tokenize(content), d.tokenizer.Join, ts)
}

func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
if len(tokens) < 4 {
return nil
}
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand Down Expand Up @@ -215,7 +219,7 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := tokenizePattern(content, d.config.ParamString)
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand All @@ -237,10 +241,6 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
return matchCluster
}

func tokenizePattern(content, param string) []string {
return deduplicatePlaceholders(strings.Split(content, " "), param)
}

func deduplicatePlaceholders(tokens []string, param string) []string {
if len(tokens) < 2 {
return tokens
Expand All @@ -258,7 +258,7 @@ func deduplicatePlaceholders(tokens []string, param string) []string {
}

func (d *Drain) PatternString(c *LogCluster) string {
s := strings.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString), " ")
s := d.tokenizer.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString))
if s == d.config.ParamString {
return ""
}
Expand All @@ -271,18 +271,11 @@ func (d *Drain) Delete(cluster *LogCluster) {

// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens := d.getContentAsTokens(content)
contentTokens := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}

func (d *Drain) getContentAsTokens(content string) []string {
for _, extraDelimiter := range d.config.ExtraDelimiters {
content = strings.Replace(content, extraDelimiter, " ", -1)
}
return strings.Split(content, " ")
}

func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {
tokenCount := len(tokens)

Expand Down
49 changes: 10 additions & 39 deletions pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,21 @@ import (

func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
tests := []struct {
name string
drain *Drain
inputFile string
}{
{
name: `Patterns for agent logfmt logs`,
inputFile: `testdata/agent-logfmt.txt`,
},
{
name: `Patterns for ingester logfmt logs`,
inputFile: `testdata/ingester-logfmt.txt`,
},
{
name: `Patterns for Drone json logs`,
inputFile: `testdata/drone-json.txt`,
},
{
name: "Patterns for distributor logfmt logs",
inputFile: "testdata/distributor-logfmt.txt",
},
{
name: "Patterns for journald logs",
inputFile: "testdata/journald.txt",
},
{
name: "Patterns for kafka logs",
inputFile: "testdata/kafka.txt",
},
{
name: "Patterns for kubernetes logs",
inputFile: "testdata/kubernetes.txt",
},
{
name: "Patterns for vault logs",
inputFile: "testdata/vault.txt",
},
{
name: "Patterns for calico logs",
inputFile: "testdata/calico.txt",
},
{inputFile: `testdata/agent-logfmt.txt`},
{inputFile: `testdata/ingester-logfmt.txt`},
{inputFile: `testdata/drone-json.txt`},
{inputFile: "testdata/distributor-logfmt.txt"},
{inputFile: "testdata/journald.txt"},
{inputFile: "testdata/kafka.txt"},
{inputFile: "testdata/kubernetes.txt"},
{inputFile: "testdata/vault.txt"},
{inputFile: "testdata/calico.txt"},
}

for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.Run(tt.inputFile, func(b *testing.B) {
file, err := os.Open(tt.inputFile)
require.NoError(b, err)
defer file.Close()
Expand Down
Loading

0 comments on commit 797bb64

Please sign in to comment.