diff --git a/component/discovery/process/analyze/analyze.go b/component/discovery/process/analyze/analyze.go new file mode 100644 index 000000000000..b94e565fb2ae --- /dev/null +++ b/component/discovery/process/analyze/analyze.go @@ -0,0 +1,53 @@ +package analyze + +import ( + "debug/elf" + "io" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +const ( + labelValueTrue = "true" + labelValueFalse = "false" +) + +type Results struct { + Labels map[string]string +} + +type Input struct { + PID uint32 + PIDs string + File io.ReaderAt + ElfFile *elf.File +} + +// analyzerFunc is called with a particular pid and a reader into its binary. +// +// If an error occurs analyzing the binary/process information it is returned. +// If there is strong evidence that this process has been detected, the +// analyzer can return io.EOF and it will skip all following analyzers. +type analyzerFunc func(input Input, analysis *Results) error + +func Analyze(logger log.Logger, input Input) *Results { + res := &Results{ + Labels: make(map[string]string), + } + for _, a := range []analyzerFunc{ + analyzeBinary, + analyzeGo, + analyzePython, + analyzeDotNet, + analyzeJava, + } { + if err := a(input, res); err == io.EOF { + break + } else if err != nil { + level.Warn(logger).Log("msg", "error during", "func", "todo", "err", err) + } + } + + return res +} diff --git a/component/discovery/process/analyze/cache/buildid.go b/component/discovery/process/analyze/cache/buildid.go new file mode 100644 index 000000000000..508ea81e5ef1 --- /dev/null +++ b/component/discovery/process/analyze/cache/buildid.go @@ -0,0 +1,85 @@ +package cache + +import ( + "bytes" + "debug/elf" + "encoding/hex" + "errors" + "fmt" +) + +// copypaste from https://github.com/grafana/pyroscope/blob/8a7fe2b80c219bfda9be685ff27ca1dee4218a42/ebpf/symtab/elf/buildid.go#L31 + +var ( + ErrNoBuildIDSection = fmt.Errorf("build ID section not found") +) + +func BuildID(f *elf.File) (string, error) { + id, err := GNUBuildID(f) + if err != nil && !errors.Is(err, ErrNoBuildIDSection) { + return "", err + } + if id != "" { + return id, nil + } + id, err = GoBuildID(f) + if err != nil && !errors.Is(err, ErrNoBuildIDSection) { + return "", err + } + if id != "" { + return id, nil + } + + return "", ErrNoBuildIDSection +} + +var goBuildIDSep = []byte("/") + +func GoBuildID(f *elf.File) (string, error) { + buildIDSection := f.Section(".note.go.buildid") + if buildIDSection == nil { + return "", ErrNoBuildIDSection + } + + data, err := buildIDSection.Data() + if err != nil { + return "", fmt.Errorf("reading .note.go.buildid %w", err) + } + if len(data) < 17 { + return "", fmt.Errorf(".note.gnu.build-id is too small") + } + + data = data[16 : len(data)-1] + if len(data) < 40 || bytes.Count(data, goBuildIDSep) < 2 { + return "", fmt.Errorf("wrong .note.go.buildid ") + } + id := string(data) + if id == "redacted" { + return "", fmt.Errorf("blacklisted .note.go.buildid ") + } + return id, nil +} + +func GNUBuildID(f *elf.File) (string, error) { + buildIDSection := f.Section(".note.gnu.build-id") + if buildIDSection == nil { + return "", ErrNoBuildIDSection + } + + data, err := buildIDSection.Data() + if err != nil { + return "", fmt.Errorf("reading .note.gnu.build-id %w", err) + } + if len(data) < 16 { + return "", fmt.Errorf(".note.gnu.build-id is too small") + } + if !bytes.Equal([]byte("GNU"), data[12:15]) { + return "", fmt.Errorf(".note.gnu.build-id is not a GNU build-id") + } + rawBuildID := data[16:] + if len(rawBuildID) != 20 && len(rawBuildID) != 8 { // 8 is xxhash, for example in Container-Optimized OS + return "", fmt.Errorf(".note.gnu.build-id has wrong size ") + } + buildIDHex := hex.EncodeToString(rawBuildID) + return buildIDHex, nil +} diff --git a/component/discovery/process/analyze/cache/cache.go b/component/discovery/process/analyze/cache/cache.go new file mode 100644 index 000000000000..d6550466b7ca --- /dev/null +++ b/component/discovery/process/analyze/cache/cache.go @@ -0,0 +1,146 @@ +//go:build linux + +package cache + +import ( + "debug/elf" + "os" + "path/filepath" + "strconv" + + "github.com/go-kit/log" + "github.com/grafana/agent/component/discovery/process/analyze" +) + +type Cache struct { + l log.Logger + pids map[uint32]*Entry + stats map[Stat]*Entry + buildIDs map[string]*analyze.Results +} + +func New(logger log.Logger) *Cache { + return &Cache{ + l: logger, + pids: make(map[uint32]*Entry), + stats: make(map[Stat]*Entry), + buildIDs: make(map[string]*analyze.Results), + } +} + +type Entry struct { + Results *analyze.Results + Stat Stat + BuildID string +} + +func (c *Cache) GetPID(pid uint32) *Entry { + return c.pids[pid] +} + +func (c *Cache) Put(pid uint32, a *Entry) { + c.pids[pid] = a + if a.Stat.Inode != 0 && a.Stat.Dev != 0 { + c.stats[a.Stat] = a + } + if a.BuildID != "" { + c.buildIDs[a.BuildID] = a.Results + } +} + +func (c *Cache) GetStat(s Stat) *Entry { + return c.stats[s] +} + +func (c *Cache) GetBuildID(buildID string) *analyze.Results { + if buildID == "" { + return nil + } + return c.buildIDs[buildID] +} +func (c *Cache) AnalyzePID(pid string) (*analyze.Results, error) { + ipid, _ := strconv.Atoi(pid) + exePath := filepath.Join("/proc", pid, "exe") + return c.AnalyzePIDPath(uint32(ipid), pid, exePath) +} +func (c *Cache) AnalyzePIDPath(pid uint32, pidS string, exePath string) (*analyze.Results, error) { + e := c.GetPID(pid) + if e != nil { + return e.Results, nil + } + + // check if executable exists + fi, err := os.Stat(exePath) + if err != nil { + return nil, err + } + st := StatFromFileInfo(fi) + e = c.GetStat(st) + if e != nil { + c.Put(pid, e) + return e.Results, nil + } + + // get path to executable + f, err := os.Open(exePath) + if err != nil { + return nil, err + } + defer f.Close() + ef, err := elf.NewFile(f) + if err != nil { + return nil, err + } + defer ef.Close() + + buildID, _ := BuildID(ef) + r := c.GetBuildID(buildID) + if r != nil { + c.Put(pid, &Entry{ + Results: r, + Stat: st, + BuildID: buildID, + }) + return r, nil + } + + r = analyze.Analyze(c.l, analyze.Input{ + PID: pid, + PIDs: pidS, + File: f, + ElfFile: ef, + }) + + c.Put(pid, &Entry{ + Results: r, + Stat: st, + BuildID: buildID, + }) + return r, nil +} + +func (c *Cache) GC(active map[uint32]struct{}) { + for pid := range c.pids { + if _, ok := active[pid]; !ok { + delete(c.pids, pid) + } + } + reachableStats := make(map[Stat]struct{}) + reachableBuildIDs := make(map[string]struct{}) + for _, e := range c.pids { + reachableStats[e.Stat] = struct{}{} + if e.BuildID != "" { + reachableBuildIDs[e.BuildID] = struct{}{} + } + } + for s := range c.stats { + if _, ok := reachableStats[s]; !ok { + delete(c.stats, s) + } + } + for id := range c.buildIDs { + if _, ok := reachableBuildIDs[id]; !ok { + delete(c.buildIDs, id) + } + } +} diff --git a/component/discovery/process/analyze/cache/cache_test.go b/component/discovery/process/analyze/cache/cache_test.go new file mode 100644 index 000000000000..e2b8423c3b80 --- /dev/null +++ b/component/discovery/process/analyze/cache/cache_test.go @@ -0,0 +1,94 @@ +//go:build linux + +package cache + +import ( + "io" + "os" + "testing" + + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" +) + +func copyFile(t *testing.T, src, dst string) { + t.Helper() + s, err := os.Open(src) + if err != nil { + t.Fatal(err) + } + defer s.Close() + d, err := os.Create(dst) + if err != nil { + t.Fatal(err) + } + defer d.Close() + _, err = io.Copy(d, s) + if err != nil { + t.Fatal(err) + } +} + +func TestCache(t *testing.T) { + d := t.TempDir() + copyFile(t, "/proc/self/exe", d+"/exe1") + copyFile(t, "/proc/self/exe", d+"/exe2") + err := os.Symlink(d+"/exe1", d+"/exe1-symlink") + require.NoError(t, err) + + l := util.TestLogger(t) + c := New(l) + r1, err := c.AnalyzePIDPath(1, "1", d+"/exe1") + require.NoError(t, err) + r2, err := c.AnalyzePIDPath(1, "1", d+"/exe1") + require.NoError(t, err) + require.True(t, r1 == r2) + + r3, err := c.AnalyzePIDPath(2, "2", d+"/exe1-symlink") + require.NoError(t, err) + require.True(t, r1 == r3) + + require.Equal(t, 2, len(c.pids)) + require.Equal(t, 1, len(c.stats)) + require.Equal(t, 1, len(c.buildIDs)) + + r4, err := c.AnalyzePIDPath(3, "3", d+"/exe2") + require.NoError(t, err) + require.True(t, r1 == r4) + + require.Equal(t, 3, len(c.pids)) + require.Equal(t, 2, len(c.stats)) + require.Equal(t, 1, len(c.buildIDs)) + + c.GC(map[uint32]struct{}{1: {}, 2: {}, 3: {}}) + + require.Equal(t, 3, len(c.pids)) + require.Equal(t, 2, len(c.stats)) + require.Equal(t, 1, len(c.buildIDs)) + + c.GC(map[uint32]struct{}{2: {}, 3: {}}) + + require.Equal(t, 2, len(c.pids)) + require.Equal(t, 2, len(c.stats)) + require.Equal(t, 1, len(c.buildIDs)) + + r3, err = c.AnalyzePIDPath(2, "2", d+"/exe1-symlink") + require.NoError(t, err) + require.True(t, r1 == r3) + + r4, err = c.AnalyzePIDPath(3, "3", d+"/exe2") + require.NoError(t, err) + require.True(t, r1 == r4) + + c.GC(map[uint32]struct{}{3: {}}) + + require.Equal(t, 1, len(c.pids)) + require.Equal(t, 1, len(c.stats)) + require.Equal(t, 1, len(c.buildIDs)) + + c.GC(map[uint32]struct{}{}) + + require.Equal(t, 0, len(c.pids)) + require.Equal(t, 0, len(c.stats)) + require.Equal(t, 0, len(c.buildIDs)) +} diff --git a/component/discovery/process/analyze/cache/stat_linux.go b/component/discovery/process/analyze/cache/stat_linux.go new file mode 100644 index 000000000000..eefe469bda9c --- /dev/null +++ b/component/discovery/process/analyze/cache/stat_linux.go @@ -0,0 +1,27 @@ +//go:build linux + +package cache + +import ( + "os" + "syscall" +) + +// copypaste from https://github.com/grafana/pyroscope/blob/8a7fe2b80c219bfda9be685ff27ca1dee4218a42/ebpf/symtab/stat_linux.go#L14-L13 + +type Stat struct { + Dev uint64 + Inode uint64 +} + +func StatFromFileInfo(file os.FileInfo) Stat { + sys := file.Sys() + sysStat, ok := sys.(*syscall.Stat_t) + if !ok || sysStat == nil { + return Stat{} + } + return Stat{ + Dev: sysStat.Dev, + Inode: sysStat.Ino, + } +} diff --git a/component/discovery/process/analyze/cpp.go b/component/discovery/process/analyze/cpp.go new file mode 100644 index 000000000000..6ff54e84e593 --- /dev/null +++ b/component/discovery/process/analyze/cpp.go @@ -0,0 +1,43 @@ +package analyze + +import ( + "strings" + + "github.com/xyproto/ainur" +) + +const ( + LabelCPP = "__meta_process_cpp__" + LabelCompiler = "__meta_process_binary_compiler__" + LabelStatic = "__meta_process_binary_static__" + LabelStripped = "__meta_process_binary_striped__" +) + +func analyzeBinary(input Input, a *Results) error { + m := a.Labels + libs, err := input.ElfFile.ImportedLibraries() + if err != nil { + return err + } + + for _, lib := range libs { + if strings.Contains(lib, "libc++") || strings.Contains(lib, "libstdc++") { + m[LabelCPP] = labelValueTrue + break + } + } + + m[LabelCompiler] = ainur.Compiler(input.ElfFile) + if ainur.Static(input.ElfFile) { + m[LabelStatic] = labelValueTrue + } else { + m[LabelStatic] = labelValueFalse + } + if ainur.Stripped(input.ElfFile) { + m[LabelStripped] = labelValueTrue + } else { + m[LabelStripped] = labelValueFalse + } + + return nil +} diff --git a/component/discovery/process/analyze/dotnet.go b/component/discovery/process/analyze/dotnet.go new file mode 100644 index 000000000000..b0246662994d --- /dev/null +++ b/component/discovery/process/analyze/dotnet.go @@ -0,0 +1,84 @@ +package analyze + +import ( + "io" + "path/filepath" + "strconv" + "strings" + + "github.com/prometheus/procfs" + "github.com/pyroscope-io/dotnetdiag" +) + +const ( + LabelDotNet = "__meta_process_dotnet__" + LabelDotNetArch = "__meta_process_dotnet_arch__" + LabelDotNetOS = "__meta_process_dotnet_os__" + LabelDotNetVersion = "__meta_process_dotnet_version__" + LabelDotNetDiagnosticSocket = "__meta_process_dotnet_diagnostic_socket__" + LabelDotNetCommandLine = "__meta_process_dotnet_command_line__" + LabelDotNetAssemblyName = "__meta_process_dotnet_assembly_name__" +) + +func analyzeDotNet(input Input, a *Results) error { + m := a.Labels + // small hack: the per process api procfs.Proc doesn't support reading NetUnix, so i am using the global one + procPath := filepath.Join("/proc", input.PIDs) + procph, err := procfs.NewFS(procPath) + if err != nil { + return err + } + netunix, err := procph.NetUNIX() + if err != nil { + return err + } + sockets := map[string]*procfs.NetUNIXLine{} + for _, sock := range netunix.Rows { + if !strings.HasPrefix(filepath.Base(sock.Path), "dotnet-diagnostic-") { + continue + } + sockets[strconv.FormatUint(sock.Inode, 10)] = sock + } + + // now get the inodes for the fds of the process and see if they match + + procp, err := procfs.NewProc(int(input.PID)) + if err != nil { + return err + } + fdinfo, err := procp.FileDescriptorsInfo() + if err != nil { + return err + } + unixSocket := "" + for _, fd := range fdinfo { + sock, found := sockets[fd.Ino] + if !found { + continue + } + unixSocket = filepath.Join(procPath, "root", sock.Path) + break + } + + // bail if no unix socket found + if unixSocket == "" { + return nil + } + + // connect to the dotnet socket and retrieve metadata + ddc := dotnetdiag.NewClient(unixSocket) + info, err := ddc.ProcessInfo2() + if err != nil { + return err + } + + m[LabelDotNet] = labelValueTrue + m[LabelDotNetCommandLine] = info.CommandLine + m[LabelDotNetOS] = info.OS + m[LabelDotNetArch] = info.Arch + m[LabelDotNetAssemblyName] = info.AssemblyName + m[LabelDotNetVersion] = info.RuntimeVersion + m[LabelDotNetDiagnosticSocket] = unixSocket + + return io.EOF +} diff --git a/component/discovery/process/analyze/go.go b/component/discovery/process/analyze/go.go new file mode 100644 index 000000000000..e3896b7510bd --- /dev/null +++ b/component/discovery/process/analyze/go.go @@ -0,0 +1,74 @@ +package analyze + +import ( + "debug/buildinfo" + "io" + "regexp" + "strings" +) + +const ( + LabelGo = "__meta_process_go__" + LabelGoVersion = "__meta_process_go_version__" + LabelGoModulePath = "__meta_process_go_module_path__" + LabelGoModuleVersion = "__meta_process_go_module_version__" + LabelGoSdk = "__meta_process_go_sdk__" + LabelGoSdkVersion = "__meta_process_go_sdk_version__" + LabelGoDeltaProf = "__meta_process_go_godeltaprof__" + LabelGoDeltaProfVersion = "__meta_process_go_godeltaprof_version__" + LabelGoBuildSettingPrefix = "__meta_process_go_build_setting_" + + goSdkModule = "github.com/grafana/pyroscope-go" + godeltaprofModule = "github.com/grafana/pyroscope-go/godeltaprof" +) + +func analyzeGo(input Input, a *Results) error { + m := a.Labels + info, err := buildinfo.Read(input.File) // it reads elf second time + if err != nil { + if err.Error() == "not a Go executable" { + return nil + } + return err + } + + m[LabelGo] = labelValueTrue + + if info.GoVersion != "" { + m[LabelGoVersion] = info.GoVersion + } + if info.Main.Path != "" { + m[LabelGoModulePath] = info.Main.Path + } + if info.Main.Version != "" { + m[LabelGoModuleVersion] = info.Main.Version + } + + for _, setting := range info.Settings { + k := sanitizeLabelName(setting.Key) + m[LabelGoBuildSettingPrefix+k] = setting.Value + } + + for _, dep := range info.Deps { + switch dep.Path { + case goSdkModule: + m[LabelGoSdk] = labelValueTrue + m[LabelGoSdkVersion] = dep.Version + case godeltaprofModule: + m[LabelGoDeltaProf] = labelValueTrue + m[LabelGoDeltaProfVersion] = dep.Version + default: + //todo should we optionally/configurable include all deps? + continue + } + } + + return io.EOF +} + +var sanitizeRe = regexp.MustCompile("[^a-zA-Z0-9_]") + +func sanitizeLabelName(s string) string { + s = sanitizeRe.ReplaceAllString(s, "_") + return strings.ToLower(s) +} diff --git a/component/discovery/process/analyze/java.go b/component/discovery/process/analyze/java.go new file mode 100644 index 000000000000..1ce3a079b334 --- /dev/null +++ b/component/discovery/process/analyze/java.go @@ -0,0 +1,318 @@ +package analyze + +import ( + "bufio" + "errors" + "fmt" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/prometheus/procfs" + "golang.org/x/sys/unix" +) + +const ( + labelJava = "__meta_process_java__" + labelJavaVersion = "__meta_process_java_version__" + labelJavaVersionDate = "__meta_process_java_version_date__" + labelJavaClasspath = "__meta_process_java_classpath__" + LabelJavaHome = "__meta_process_java_home__" + labelJavaVMFlags = "__meta_process_java_vm_flags__" + labelJavaVMType = "__meta_process_java_vm_type__" + labelJavaOsName = "__meta_process_java_os_name__" + labelJavaOsArch = "__meta_process_java_os_arch__" +) + +type jvmInfo struct { + classpath string + javaHome string + javaVersion string + javaVersionDate string + vmFlags string + vmType string + osName string + osArch string +} + +func analyzeJava(input Input, a *Results) error { + m := a.Labels + proc, err := procfs.NewProc(int(input.PID)) + if err != nil { + return err + } + + executable, err := proc.Executable() + if err != nil { + return err + } + isJava := false + if strings.HasSuffix(executable, "java") { + isJava = true + } else { + cmdLine, err := proc.CmdLine() + if err != nil { + return err + } + for _, c := range cmdLine { + if strings.HasPrefix(c, "java") { + isJava = true + break + } + } + } + if !isJava { + return nil + } + + m[labelJava] = labelValueTrue + jInfo, err := getInfoFromJcmd(int(input.PID)) + if err != nil { + return nil + } + if jInfo.classpath != "" { + m[labelJavaClasspath] = jInfo.classpath + } + if jInfo.javaHome != "" { + m[LabelJavaHome] = jInfo.javaHome + } + if jInfo.javaVersion != "" { + m[labelJavaVersion] = jInfo.javaVersion + } + if jInfo.javaVersionDate != "" { + m[labelJavaVersionDate] = jInfo.javaVersionDate + } + if jInfo.vmFlags != "" { + m[labelJavaVMFlags] = jInfo.vmFlags + } + if jInfo.vmType != "" { + m[labelJavaVMType] = jInfo.vmType + } + if jInfo.osName != "" { + m[labelJavaOsName] = jInfo.osName + } + if jInfo.osArch != "" { + m[labelJavaOsArch] = jInfo.osArch + } + return nil +} + +func getInfoFromJcmd(pid int) (*jvmInfo, error) { + output, err := attachAndRunJcmdCommand(pid, "VM.system_properties") + if err != nil { + return nil, err + } + props := strings.Split(output, "\n") + j := &jvmInfo{ + vmType: "jdk", + } + for _, p := range props { + writeValue(p, "java.home", &j.javaHome) + writeValue(p, "java.class.path", &j.classpath) + writeValue(p, "os.name", &j.osName) + writeValue(p, "os.arch", &j.osArch) + writeValue(p, "java.version", &j.javaVersion) + writeValue(p, "java.version.date", &j.javaVersionDate) + } + output, err = attachAndRunJcmdCommand(pid, "VM.flags") + if err != nil { + return j, nil + } + parts := strings.Split(output, "\n") + if len(parts) > 1 { + j.vmFlags = parts[1] + } + return j, nil +} + +func writeValue(p, n string, dest *string) { + if strings.HasPrefix(p, n+"=") { + *dest = strings.Trim(p[len(n)+1:], "\"") + } +} + +func attachAndRunJcmdCommand(pid int, cmd string) (string, error) { + agentUid := uint32(os.Geteuid()) + agentGid := uint32(os.Getegid()) + targetUid, targetGid, nsPid, err := getProcessInfo(pid) + if err != nil { + return "", err + } + + err = enterNS(pid, "net") + if err != nil { + return "", err + } + err = enterNS(pid, "ipc") + if err != nil { + return "", err + } + err = enterNS(pid, "mnt") + if err != nil { + return "", err + } + + if (agentGid != targetGid && syscall.Setegid(int(targetGid)) != nil) || (agentUid != targetUid && syscall.Seteuid(int(targetUid)) != nil) { + return "", errors.New("failed to change credentials to match the target process") + } + + tmpPath, err := getTmpPath(pid) + if err != nil { + return "", err + } + + signal.Ignore(syscall.SIGPIPE) + + if !checkSocket(nsPid, tmpPath) { + if err = attachToJvm(pid, nsPid, tmpPath); err != nil { + return "", err + } + } + + fd, err := connectSocket(nsPid, tmpPath) + if err != nil { + return "", err + } + defer unix.Close(fd) + + return sendRequest(fd, "jcmd", cmd) +} + +func getProcessInfo(pid int) (uid, gid uint32, nspid int, err error) { + path := fmt.Sprintf("/proc/%d/status", pid) + statusFile, err := os.Open(path) + if err != nil { + return 0, 0, 0, err + } + defer statusFile.Close() + + scanner := bufio.NewScanner(statusFile) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Fields(line) + + switch fields[0] { + case "Uid:": + uid64, err := strconv.ParseUint(fields[1], 10, 32) + if err != nil { + return 0, 0, 0, err + } + uid = uint32(uid64) + case "Gid:": + gid64, err := strconv.ParseUint(fields[1], 10, 32) + if err != nil { + return 0, 0, 0, err + } + gid = uint32(gid64) + case "NStgid:": + // PID namespaces can be nested; the last one is the innermost one + for _, s := range fields[1:] { + nspid, err = strconv.Atoi(s) + if err != nil { + return 0, 0, 0, err + } + } + default: + } + } + return uid, gid, nspid, nil +} + +func enterNS(pid int, nsType string) error { + path := fmt.Sprintf("/proc/%d/ns/%s", pid, nsType) + selfPath := fmt.Sprintf("/proc/self/ns/%s", nsType) + + var oldNSStat, newNSStat syscall.Stat_t + if err := syscall.Stat(selfPath, &oldNSStat); err == nil { + if err := syscall.Stat(path, &newNSStat); err == nil { + if oldNSStat.Ino != newNSStat.Ino { + newNS, err := syscall.Open(path, syscall.O_RDONLY, 0) + _ = syscall.Close(newNS) + if err != nil { + return err + } + } + } + } + return nil +} + +func getTmpPath(pid int) (path string, err error) { + path = fmt.Sprintf("/proc/%d/root/tmp", pid) + var stats syscall.Stat_t + return path, syscall.Stat(path, &stats) +} + +func checkSocket(pid int, tmpPath string) bool { + path := fmt.Sprintf("%s/.java_pid%d", tmpPath, pid) + + var stats syscall.Stat_t + return syscall.Stat(path, &stats) == nil && (stats.Mode&unix.S_IFSOCK) != 0 +} + +func attachToJvm(pid, nspid int, tmpPath string) error { + path := fmt.Sprintf("%s/.attach_pid%d", tmpPath, nspid) + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, os.FileMode(0660)) + if err != nil { + return err + } + defer file.Close() + defer os.Remove(path) + + err = syscall.Kill(pid, syscall.SIGQUIT) + if err != nil { + return err + } + + var ts = time.Millisecond * 20 + attached := false + for !attached && ts.Nanoseconds() < int64(500*time.Millisecond) { + time.Sleep(ts) + attached = checkSocket(nspid, tmpPath) + ts += 20 * time.Millisecond + } + return err +} + +func connectSocket(pid int, tmpPath string) (int, error) { + fd, err := unix.Socket(unix.AF_UNIX, unix.SOCK_STREAM, 0) + if err != nil { + return -1, err + } + addr := unix.SockaddrUnix{ + Name: fmt.Sprintf("%s/.java_pid%d", tmpPath, pid), + } + return fd, unix.Connect(fd, &addr) +} + +func sendRequest(fd int, cmd string, arg string) (string, error) { + request := make([]byte, 0, 6+len(cmd)+len(arg)) + request = append(request, byte('1')) + request = append(request, byte(0)) + + request = append(request, []byte(cmd)...) + request = append(request, byte(0)) + + request = append(request, []byte(arg)...) + request = append(request, []byte{0, 0, 0}...) + + _, err := unix.Write(fd, request) + if err != nil { + return "", err + } + + response := make([]byte, 0) + + buf := make([]byte, 8192) + n, _ := unix.Read(fd, buf) + + for n != 0 { + response = append(response, buf...) + n, err = unix.Read(fd, buf) + } + + return string(response), err +} diff --git a/component/discovery/process/analyze/python.go b/component/discovery/process/analyze/python.go new file mode 100644 index 000000000000..663c65f61a59 --- /dev/null +++ b/component/discovery/process/analyze/python.go @@ -0,0 +1,41 @@ +package analyze + +import ( + "strings" +) + +const ( + LabelPython = "__meta_process_python__" + LabelPythonVersion = "__meta_process_python_version__" + + libpythonPrefix = "libpython" +) + +func analyzePython(input Input, a *Results) error { + m := a.Labels + + libs, err := input.ElfFile.ImportedLibraries() + if err != nil { + return err + } + + var pythonVersion string + for _, lib := range libs { + if strings.HasPrefix(lib, libpythonPrefix) { + pythonVersion = lib[len(libpythonPrefix):] + pos := strings.Index(pythonVersion, ".so") + if pos < 0 { + continue + } + pythonVersion = pythonVersion[:pos] + break + } + } + if pythonVersion == "" { + return nil + } + m[LabelPython] = labelValueTrue + m[LabelPythonVersion] = pythonVersion + + return nil +} diff --git a/component/discovery/process/args.go b/component/discovery/process/args.go index 636f6231867d..f17e35898bb1 100644 --- a/component/discovery/process/args.go +++ b/component/discovery/process/args.go @@ -13,22 +13,24 @@ type Arguments struct { } type DiscoverConfig struct { - Cwd bool `river:"cwd,attr,optional"` - Exe bool `river:"exe,attr,optional"` - Commandline bool `river:"commandline,attr,optional"` - Username bool `river:"username,attr,optional"` - UID bool `river:"uid,attr,optional"` - ContainerID bool `river:"container_id,attr,optional"` + Cwd bool `river:"cwd,attr,optional"` + Exe bool `river:"exe,attr,optional"` + Commandline bool `river:"commandline,attr,optional"` + Username bool `river:"username,attr,optional"` + UID bool `river:"uid,attr,optional"` + ContainerID bool `river:"container_id,attr,optional"` + AnalyzeExecutable bool `river:"analyze_executable,attr,optional"` } var DefaultConfig = Arguments{ Join: nil, RefreshInterval: 60 * time.Second, DiscoverConfig: DiscoverConfig{ - Cwd: true, - Exe: true, - Commandline: true, - ContainerID: true, + Cwd: true, + Exe: true, + Commandline: true, + ContainerID: true, + AnalyzeExecutable: false, }, } diff --git a/component/discovery/process/discover.go b/component/discovery/process/discover.go index f8444a5e38fb..21bf1eef7fc3 100644 --- a/component/discovery/process/discover.go +++ b/component/discovery/process/discover.go @@ -13,6 +13,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/agent/component/discovery" + "github.com/grafana/agent/component/discovery/process/analyze" + analCache "github.com/grafana/agent/component/discovery/process/analyze/cache" gopsutil "github.com/shirou/gopsutil/v3/process" "golang.org/x/sys/unix" ) @@ -27,21 +29,22 @@ const ( labelProcessContainerID = "__container_id__" ) -type process struct { - pid string +type Process struct { + PID string exe string cwd string commandline string containerID string username string uid string + Analysis *analyze.Results } -func (p process) String() string { - return fmt.Sprintf("pid=%s exe=%s cwd=%s commandline=%s containerID=%s", p.pid, p.exe, p.cwd, p.commandline, p.containerID) +func (p Process) String() string { + return fmt.Sprintf("pid=%s exe=%s cwd=%s commandline=%s containerID=%s", p.PID, p.exe, p.cwd, p.commandline, p.containerID) } -func convertProcesses(ps []process) []discovery.Target { +func convertProcesses(ps []Process) []discovery.Target { var res []discovery.Target for _, p := range ps { t := convertProcess(p) @@ -50,9 +53,9 @@ func convertProcesses(ps []process) []discovery.Target { return res } -func convertProcess(p process) discovery.Target { +func convertProcess(p Process) discovery.Target { t := make(discovery.Target, 5) - t[labelProcessID] = p.pid + t[labelProcessID] = p.PID if p.exe != "" { t[labelProcessExe] = p.exe } @@ -71,15 +74,21 @@ func convertProcess(p process) discovery.Target { if p.uid != "" { t[labelProcessUID] = p.uid } + if p.Analysis != nil { + for k, v := range p.Analysis.Labels { + t[k] = v + } + } + return t } -func discover(l log.Logger, cfg *DiscoverConfig) ([]process, error) { +func Discover(l log.Logger, cfg *DiscoverConfig, cache *analCache.Cache) ([]Process, error) { processes, err := gopsutil.Processes() if err != nil { return nil, fmt.Errorf("failed to list processes: %w", err) } - res := make([]process, 0, len(processes)) + res := make([]Process, 0, len(processes)) loge := func(pid int, e error) { if errors.Is(e, unix.ESRCH) { return @@ -89,6 +98,7 @@ func discover(l log.Logger, cfg *DiscoverConfig) ([]process, error) { } _ = level.Error(l).Log("msg", "failed to get process info", "err", e, "pid", pid) } + active := make(map[uint32]struct{}) for _, p := range processes { spid := fmt.Sprintf("%d", p.Pid) var ( @@ -139,16 +149,28 @@ func discover(l log.Logger, cfg *DiscoverConfig) ([]process, error) { continue } } - res = append(res, process{ - pid: spid, + var ar *analyze.Results + if cfg.AnalyzeExecutable { + ar, err = cache.AnalyzePID(spid) + if err != nil { + level.Error(l).Log("msg", "error analyzing process", "pid", spid, "err", err) + continue + } + } + + res = append(res, Process{ + PID: spid, exe: exe, cwd: cwd, commandline: commandline, containerID: containerID, username: username, uid: uid, + Analysis: ar, }) + active[uint32(p.Pid)] = struct{}{} } + cache.GC(active) return res, nil } diff --git a/component/discovery/process/join_test.go b/component/discovery/process/join_test.go index 8ddd7dc7cdf9..c5762f13e502 100644 --- a/component/discovery/process/join_test.go +++ b/component/discovery/process/join_test.go @@ -18,20 +18,20 @@ func TestJoin(t *testing.T) { }{ { []discovery.Target{ - convertProcess(process{ - pid: "239", + convertProcess(Process{ + PID: "239", exe: "/bin/foo", cwd: "/", containerID: "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", }), - convertProcess(process{ - pid: "240", + convertProcess(Process{ + PID: "240", exe: "/bin/bar", cwd: "/tmp", containerID: "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", }), - convertProcess(process{ - pid: "241", + convertProcess(Process{ + PID: "241", exe: "/bin/bash", cwd: "/opt", containerID: "", @@ -85,28 +85,28 @@ func TestJoin(t *testing.T) { }, { []discovery.Target{ - convertProcess(process{ - pid: "239", + convertProcess(Process{ + PID: "239", exe: "/bin/foo", cwd: "/", containerID: "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", }), - convertProcess(process{ - pid: "240", + convertProcess(Process{ + PID: "240", exe: "/bin/bar", cwd: "/", containerID: "", }), }, []discovery.Target{}, []discovery.Target{ - convertProcess(process{ - pid: "239", + convertProcess(Process{ + PID: "239", exe: "/bin/foo", cwd: "/", containerID: "7edda1de1e0d1d366351e478359cf5fa16bb8ab53063a99bb119e56971bfb7e2", }), - convertProcess(process{ - pid: "240", + convertProcess(Process{ + PID: "240", exe: "/bin/bar", cwd: "/", containerID: "", diff --git a/component/discovery/process/list-processes/main.go b/component/discovery/process/list-processes/main.go new file mode 100644 index 000000000000..59259b0553f3 --- /dev/null +++ b/component/discovery/process/list-processes/main.go @@ -0,0 +1,57 @@ +//go:build linux + +package main + +import ( + "os" + "sort" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + analCache "github.com/grafana/agent/component/discovery/process/analyze/cache" + + "github.com/grafana/agent/component/discovery/process" +) + +var logger = log.NewLogfmtLogger(os.Stderr) + +func run() error { + cache := analCache.New(logger) + processes, err := process.Discover(logger, &process.DiscoverConfig{}, cache) + if err != nil { + return err + } + + var ( + keys = make([]string, 16) + attributes = make([]interface{}, 16) + ) + + for _, p := range processes { + attributes = attributes[:4] + attributes[0] = "msg" + attributes[1] = "found process" + attributes[2] = "pid" + attributes[3] = p.PID + + keys = keys[:0] + for k := range p.Analysis.Labels { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + attributes = append(attributes, k, p.Analysis.Labels[k]) + } + + level.Info(logger).Log(attributes...) + } + + return nil +} + +func main() { + if err := run(); err != nil { + level.Error(logger).Log("msg", "failed to discover processes", "err", err) + } +} diff --git a/component/discovery/process/process.go b/component/discovery/process/process.go index a32077ece804..83cd5aae6ec6 100644 --- a/component/discovery/process/process.go +++ b/component/discovery/process/process.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/component" "github.com/grafana/agent/component/discovery" + analCache "github.com/grafana/agent/component/discovery/process/analyze/cache" ) func init() { @@ -29,6 +30,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { onStateChange: opts.OnStateChange, argsUpdates: make(chan Arguments), args: args, + analCache: analCache.New(opts.Logger), } return c, nil } @@ -39,11 +41,12 @@ type Component struct { processes []discovery.Target argsUpdates chan Arguments args Arguments + analCache *analCache.Cache } func (c *Component) Run(ctx context.Context) error { doDiscover := func() error { - processes, err := discover(c.l, &c.args.DiscoverConfig) + processes, err := Discover(c.l, &c.args.DiscoverConfig, c.analCache) if err != nil { return err } diff --git a/docs/sources/flow/reference/components/discovery.process.md b/docs/sources/flow/reference/components/discovery.process.md index 839948d3d65b..e1497500c3fd 100644 --- a/docs/sources/flow/reference/components/discovery.process.md +++ b/docs/sources/flow/reference/components/discovery.process.md @@ -31,10 +31,10 @@ discovery.process "LABEL" { The following arguments are supported: -| Name | Type | Description | Default | Required | -|--------------------|---------------------|-----------------------------------------------------------------------------------------|---------|----------| +| Name | Type | Description | Default | Required | +|--------------------|---------------------|------------------------------------------------------------------------------------------|---------|----------| | `join` | `list(map(string))` | Join external targets to discovered processes targets based on `__container_id__` label. | | no | -| `refresh_interval` | `duration` | How often to sync targets. | "60s" | no | +| `refresh_interval` | `duration` | How often to sync targets. | "60s" | no | ### Targets joining @@ -42,6 +42,15 @@ If `join` is specified, `discovery.process` will join the discovered processes b For example, if `join` is specified as follows: +```river +discovery.process "all" { + join = discovery.kubernetes.