diff --git a/internal/k8s/client.go b/internal/k8s/client.go index fd25b6a2..f0bbc632 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -3,6 +3,7 @@ package k8s import ( + "sync" "time" "k8s.io/client-go/kubernetes" @@ -22,6 +23,7 @@ var timeoutSeconds = int64(timeout / time.Second) type Client struct { config *rest.Config clientset *kubernetes.Clientset + logCIDs sync.Map } // NewClient creates a new kubernetes API client. diff --git a/internal/k8s/logs.go b/internal/k8s/logs.go index 752abf28..69ac62be 100644 --- a/internal/k8s/logs.go +++ b/internal/k8s/logs.go @@ -52,23 +52,34 @@ func linewiseCopy(ctx context.Context, prefix string, logs chan<- string, // tail its logs. Otherwise the containers are just iterated over and tailLines // number of log lines are read from each and written to the logs channel. func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, - p *corev1.Pod, container string, follow bool, tailLines int64, + p *corev1.Pod, containerName string, follow bool, tailLines int64, logs chan<- string) error { spew.Dump("readLogs", follow, tailLines) - var containers []string - if container != "" { - containers = append(containers, container) - } else { - for _, cObj := range p.Spec.Containers { - containers = append(containers, cObj.Name) + var cStatuses []corev1.ContainerStatus + if containerName != "" { + for _, cStatus := range p.Status.ContainerStatuses { + if containerName == cStatus.Name { + cStatuses = append(cStatuses, cStatus) + break + } + } + if len(cStatuses) == 0 { + return fmt.Errorf("couldn't find container: %s", containerName) } + } else { + cStatuses = p.Status.ContainerStatuses } - spew.Dump("container count", len(containers)) - for _, name := range containers { + spew.Dump("container count", len(cStatuses)) + for _, cStatus := range cStatuses { + // skip logging if this container is already being logged + if _, exists := c.logCIDs.LoadOrStore(cStatus.ContainerID, true); exists { + continue + } + //c.loggingContainerIDs.LoadOrStore(container.Descriptor) // read logs for a single container req := c.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &corev1.PodLogOptions{ - Container: name, + Container: cStatus.Name, Follow: follow, Timestamps: true, TailLines: &tailLines, @@ -82,8 +93,12 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, spew.Dump(err) return fmt.Errorf("couldn't stream logs: %v", err) } + // copy loop vars so they can be referenced in the closure + cName := cStatus.Name + cID := cStatus.ContainerID egSend.Go(func() error { - linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, container), logs, + defer c.logCIDs.Delete(cID) + linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs, logStream) fmt.Println("done linewiseCopy") return nil @@ -240,7 +255,7 @@ func (c *Client) Logs(ctx context.Context, namespace, deployment, // If not following the logs, avoid constructing an informer. Instead just // read the logs and return. for i := range pods.Items { - pod := pods.Items[i] // avoid copying loop var + pod := pods.Items[i] // copy loop var so it can be referenced in the closure egSend.Go(func() error { readLogsErr := c.readLogs(childCtx, &egSend, &pod, container, follow, tailLines, logs)