Skip to content

Commit

Permalink
fix: avoid duplicating container log streams
Browse files Browse the repository at this point in the history
  • Loading branch information
smlx committed Jul 20, 2023
1 parent b82623f commit 5d4c5ab
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
2 changes: 2 additions & 0 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package k8s

import (
"sync"
"time"

"k8s.io/client-go/kubernetes"
Expand All @@ -22,6 +23,7 @@ var timeoutSeconds = int64(timeout / time.Second)
type Client struct {
config *rest.Config
clientset *kubernetes.Clientset
logCIDs sync.Map
}

// NewClient creates a new kubernetes API client.
Expand Down
39 changes: 27 additions & 12 deletions internal/k8s/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,34 @@ func linewiseCopy(ctx context.Context, prefix string, logs chan<- string,
// tail its logs. Otherwise the containers are just iterated over and tailLines
// number of log lines are read from each and written to the logs channel.
func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
p *corev1.Pod, container string, follow bool, tailLines int64,
p *corev1.Pod, containerName string, follow bool, tailLines int64,
logs chan<- string) error {
spew.Dump("readLogs", follow, tailLines)
var containers []string
if container != "" {
containers = append(containers, container)
} else {
for _, cObj := range p.Spec.Containers {
containers = append(containers, cObj.Name)
var cStatuses []corev1.ContainerStatus
if containerName != "" {
for _, cStatus := range p.Status.ContainerStatuses {
if containerName == cStatus.Name {
cStatuses = append(cStatuses, cStatus)
break
}
}
if len(cStatuses) == 0 {
return fmt.Errorf("couldn't find container: %s", containerName)
}
} else {
cStatuses = p.Status.ContainerStatuses
}
spew.Dump("container count", len(containers))
for _, name := range containers {
spew.Dump("container count", len(cStatuses))
for _, cStatus := range cStatuses {
// skip logging if this container is already being logged
if _, exists := c.logCIDs.LoadOrStore(cStatus.ContainerID, true); exists {
continue
}
//c.loggingContainerIDs.LoadOrStore(container.Descriptor)
// read logs for a single container
req := c.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name,
&corev1.PodLogOptions{
Container: name,
Container: cStatus.Name,
Follow: follow,
Timestamps: true,
TailLines: &tailLines,
Expand All @@ -82,8 +93,12 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
spew.Dump(err)
return fmt.Errorf("couldn't stream logs: %v", err)
}
// copy loop vars so they can be referenced in the closure
cName := cStatus.Name
cID := cStatus.ContainerID
egSend.Go(func() error {
linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, container), logs,
defer c.logCIDs.Delete(cID)
linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs,
logStream)
fmt.Println("done linewiseCopy")
return nil
Expand Down Expand Up @@ -240,7 +255,7 @@ func (c *Client) Logs(ctx context.Context, namespace, deployment,
// If not following the logs, avoid constructing an informer. Instead just
// read the logs and return.
for i := range pods.Items {
pod := pods.Items[i] // avoid copying loop var
pod := pods.Items[i] // copy loop var so it can be referenced in the closure
egSend.Go(func() error {
readLogsErr := c.readLogs(childCtx, &egSend, &pod, container, follow,
tailLines, logs)
Expand Down

0 comments on commit 5d4c5ab

Please sign in to comment.