diff --git a/src/packages/server/conat/socketio/dns-scan-k8s-api.ts b/src/packages/server/conat/socketio/dns-scan-k8s-api.ts new file mode 100644 index 0000000000..a8ffeafa3a --- /dev/null +++ b/src/packages/server/conat/socketio/dns-scan-k8s-api.ts @@ -0,0 +1,100 @@ +import * as fs from "fs"; +import * as https from "https"; + +// Define the options interface for type safety +interface ListPodsOptions { + labelSelector?: string; // e.g. "app=foo,env=prod" +} + +const NAMESPACE: string = fs + .readFileSync( + "/var/run/secrets/kubernetes.io/serviceaccount/namespace", + "utf8", + ) + .trim(); +const CA: Buffer = fs.readFileSync( + "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", +); + +async function listPods(options: ListPodsOptions = {}): Promise { + try { + // Read service account details, token could be rotated + const token = fs + .readFileSync( + "/var/run/secrets/kubernetes.io/serviceaccount/token", + "utf8", + ) + .trim(); + + // Base API path + let path = `/api/v1/namespaces/${NAMESPACE}/pods`; + + const queryParams: string[] = []; + if (options.labelSelector) { + queryParams.push( + `labelSelector=${encodeURIComponent(options.labelSelector)}`, + ); + } + + if (queryParams.length > 0) { + path += `?${queryParams.join("&")}`; + } + + const query: https.RequestOptions = { + hostname: "kubernetes.default.svc", + path, + method: "GET", + headers: { + Authorization: `Bearer ${token}`, + Accept: "application/json", + }, + ca: [CA], + }; + + return new Promise((resolve, reject) => { + const req = https.request(query, (res) => { + let data = ""; + res.on("data", (chunk) => { + data += chunk; + }); + res.on("end", () => { + if (res.statusCode !== 200) { + reject( + new Error( + `K8S API request failed. status=${res.statusCode}: ${data}`, + ), + ); + } else { + try { + resolve(JSON.parse(data)); + } catch (parseError) { + reject(parseError); + } + } + }); + }); + + req.on("error", (error) => reject(error)); + req.end(); + }); + } catch (error) { + throw new Error( + `Failed to read service account files: ${(error as Error).message}`, + ); + } +} + +export async function getAddressesFromK8sApi(): Promise< + { name: string; podIP: string }[] +> { + const res = await listPods({ labelSelector: "run=hub-conat-router" }); + const ret: { name: string; podIP: string }[] = []; + for (const pod of res.items) { + const name = pod.metadata?.name; + const podIP = pod.status?.podIP; + if (name && podIP) { + ret.push({ name, podIP }); + } + } + return ret; +} diff --git a/src/packages/server/conat/socketio/dns-scan.ts b/src/packages/server/conat/socketio/dns-scan.ts index 701b095ae6..9a89f25e5e 100644 --- a/src/packages/server/conat/socketio/dns-scan.ts +++ b/src/packages/server/conat/socketio/dns-scan.ts @@ -5,16 +5,32 @@ COCALC_SERVICE */ import { delay } from "awaiting"; -import type { ConatServer } from "@cocalc/conat/core/server"; import { lookup } from "dns/promises"; -import port from "@cocalc/backend/port"; import { hostname } from "node:os"; -import { getLogger } from "@cocalc/backend/logger"; + import { executeCode } from "@cocalc/backend/execute-code"; -import { split } from "@cocalc/util/misc"; +import { getLogger } from "@cocalc/backend/logger"; +import port from "@cocalc/backend/port"; +import type { ConatServer } from "@cocalc/conat/core/server"; +import { split, unreachable } from "@cocalc/util/misc"; +import { getAddressesFromK8sApi } from "./dns-scan-k8s-api"; export const SCAN_INTERVAL = 15_000; +type PeerDiscovery = "KUBECTL" | "API"; + +function isPeerDiscovery(x: string): x is PeerDiscovery { + return x === "KUBECTL" || x === "API"; +} + +const PEER_DISCOVERY: PeerDiscovery = (function () { + const val = process.env.COCALC_CONAT_PEER_DISCOVERY ?? "KUBECTL"; + if (!isPeerDiscovery(val)) { + throw Error(`Invalid COCALC_CONAT_PEER_DISCOVERY: ${val}`); + } + return val; +})(); + const logger = getLogger("conat:socketio:dns-scan"); export async function dnsScan(server: ConatServer) { @@ -83,6 +99,32 @@ export async function getAddresses(): Promise { const h = hostname(); const i = h.lastIndexOf("-"); const prefix = h.slice(0, i); + + const podInfos = await getPodInfos(); + for (const { name, podIP } of podInfos) { + if (name != h && name.startsWith(prefix)) { + v.push(`http://${podIP}:${port}`); + } + } + return v; +} + +async function getPodInfos(): Promise<{ name: string; podIP: string }[]> { + switch (PEER_DISCOVERY) { + case "KUBECTL": + return await getAddressesFromKubectl(); + case "API": + return await getAddressesFromK8sApi(); + default: + unreachable(PEER_DISCOVERY); + throw Error(`Unknown PEER_DISCOVERY: ${PEER_DISCOVERY}`); + } +} + +async function getAddressesFromKubectl(): Promise< + { name: string; podIP: string }[] +> { + const ret: { name: string; podIP: string }[] = []; const { stdout } = await executeCode({ command: "kubectl", args: [ @@ -97,10 +139,10 @@ export async function getAddresses(): Promise { for (const x of stdout.split("\n")) { const row = split(x); if (row.length == 2) { - if (row[0] != h && row[0].startsWith(prefix)) { - v.push(`http://${row[1]}:${port}`); - } + ret.push({ name: row[0], podIP: row[1] }); + } else { + logger.warn(`Unexpected row from kubectl: ${x}`); } } - return v; + return ret; }