diff --git a/.github/workflows/prover-docker-light-release.yml b/.github/workflows/prover-docker-light-release.yml new file mode 100644 index 0000000000..71c70854e6 --- /dev/null +++ b/.github/workflows/prover-docker-light-release.yml @@ -0,0 +1,58 @@ +name: Build and Release Prover Light Docker Image + +on: + push: + tags: + - "light-prover*" + workflow_dispatch: + inputs: + tag: + description: 'Tag for the Docker image' + required: true + default: 'latest' + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/light-prover-light + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=tag + type=raw,value=latest,enable={{is_default_branch}} + type=raw,value=${{ github.event.inputs.tag }},enable=${{ github.event_name == 'workflow_dispatch' }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: prover/server + file: prover/server/Dockerfile.light + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max \ No newline at end of file diff --git a/.github/workflows/prover-docker-release.yml b/.github/workflows/prover-docker-release.yml new file mode 100644 index 0000000000..c97989e22b --- /dev/null +++ b/.github/workflows/prover-docker-release.yml @@ -0,0 +1,68 @@ +name: Build and Release Prover Docker Image + +on: + push: + tags: + - "light-prover*" + workflow_dispatch: + inputs: + tag: + description: 'Tag for the Docker image' + required: true + default: 'latest' + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/light-prover + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Download proving keys + run: | + cd prover/server + ./scripts/download_keys_docker.sh + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=tag + type=raw,value=latest,enable={{is_default_branch}} + type=raw,value=${{ github.event.inputs.tag }},enable=${{ github.event_name == 'workflow_dispatch' }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: prover/server + file: prover/server/Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Clean up proving keys + if: always() + run: | + rm -rf prover/server/proving-keys \ No newline at end of file diff --git a/.github/workflows/prover-test.yml b/.github/workflows/prover-test.yml index 1c2cab91a1..5c35efa06b 100644 --- a/.github/workflows/prover-test.yml +++ b/.github/workflows/prover-test.yml @@ -26,6 +26,16 @@ jobs: if: github.event.pull_request.draft == false runs-on: buildjet-8vcpu-ubuntu-2204 timeout-minutes: 120 + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - name: Checkout sources uses: actions/checkout@v4 @@ -68,6 +78,44 @@ jobs: cd prover/server go test ./prover -timeout 60m + - name: Redis Queue tests + env: + TEST_REDIS_URL: redis://localhost:6379/15 + run: | + cd prover/server + go test -v -run TestRedis -timeout 10m + + - name: Queue cleanup tests + env: + TEST_REDIS_URL: redis://localhost:6379/15 + run: | + cd prover/server + go test -v -run TestCleanup -timeout 5m + + - name: Worker selection tests + run: | + cd prover/server + go test -v -run TestWorkerSelection -timeout 5m + + - name: Batch operations queue routing tests + run: | + cd prover/server + go test -v -run TestBatchOperations -timeout 5m + + - name: Queue processing flow tests + env: + TEST_REDIS_URL: redis://localhost:6379/15 + run: | + cd prover/server + go test -v -run TestJobProcessingFlow -timeout 5m + + - name: Failed job status tests + env: + TEST_REDIS_URL: redis://localhost:6379/15 + run: | + cd prover/server + go test -v -run TestFailedJobStatus -timeout 5m + - name: Lightweight integration tests if: ${{ github.event.pull_request.base.ref == 'main' }} run: | diff --git a/prover/server/DOCKER.md b/prover/server/DOCKER.md new file mode 100644 index 0000000000..e83a2191ad --- /dev/null +++ b/prover/server/DOCKER.md @@ -0,0 +1,129 @@ +# Docker Setup for Light Prover + +This document describes the Docker setup for the Light Protocol prover server, including build processes and deployment workflows. + +## Available Docker Images + +### 1. Full Prover Image (`Dockerfile`) + +The main Docker image that includes all necessary proving keys for production use. + +**Features:** +- Contains pre-downloaded proving keys (mainnet, inclusion, non-inclusion, combined) +- Ready to use for proof generation +- Larger image size due to embedded keys +- Built via `prover-docker-release.yml` workflow + +**Usage:** +```bash +docker run ghcr.io/lightprotocol/light-protocol/light-prover:latest start --run-mode rpc --keys-dir /proving-keys/ +``` + +### 2. Light Prover Image (`Dockerfile.light`) + +A lightweight image without proving keys, suitable for development or custom key management. + +**Features:** +- No embedded proving keys +- Smaller image size +- Requires external key management +- Built via `prover-docker-light-release.yml` workflow + +**Usage:** +```bash +# Mount your own keys directory +docker run -v /path/to/your/keys:/proving-keys ghcr.io/lightprotocol/light-protocol/light-prover-light:latest start --keys-dir /proving-keys/ + +# Or run without keys for development +docker run ghcr.io/lightprotocol/light-protocol/light-prover-light:latest start +``` + +## Key Management Scripts + +### `scripts/download_keys_docker.sh` + +Specialized script that downloads only the proving keys needed for the Docker build: + +- `mainnet_inclusion_26_*` keys +- `inclusion_32_*` keys +- `non-inclusion_26_*` and `non-inclusion_40_*` keys +- `combined_26_*` and `combined_32_40_*` keys + +This is more efficient than the full `download_keys.sh light` script as it excludes: +- `append-with-proofs_32_*` keys +- `update_32_*` keys +- `address-append_40_*` keys + +### `scripts/download_keys.sh` + +Original script with two modes: +- `light`: Downloads keys including batch operations (less efficient for Docker) +- `full`: Downloads all available keys + +## GitHub Workflows + +### `prover-docker-release.yml` + +Builds and publishes the full prover image with embedded keys. + +**Triggers:** +- Push to tags matching `light-prover*` +- Manual workflow dispatch + +**Process:** +1. Downloads proving keys using `download_keys_docker.sh` +2. Builds Docker image with `Dockerfile` +3. Pushes to GitHub Container Registry +4. Cleans up downloaded keys + +### `prover-docker-light-release.yml` + +Builds and publishes the lightweight prover image without keys. + +**Triggers:** +- Push to tags matching `light-prover*` +- Manual workflow dispatch + +**Process:** +1. Builds Docker image with `Dockerfile.light` +2. Pushes to GitHub Container Registry + +## Local Development + +### Building Images Locally + +For the full image: +```bash +cd prover/server +./scripts/download_keys_docker.sh +docker build -t light-prover . +``` + +For the light image: +```bash +cd prover/server +docker build -f Dockerfile.light -t light-prover-light . +``` + +### Testing Images + +Test the full image: +```bash +docker run --rm light-prover start --run-mode rpc --keys-dir /proving-keys/ +``` + +Test the light image: +```bash +docker run --rm light-prover-light start +``` + +## Image Registry + +Both images are published to GitHub Container Registry: + +- Full image: `ghcr.io/lightprotocol/light-protocol/light-prover` +- Light image: `ghcr.io/lightprotocol/light-protocol/light-prover-light` + +Tags follow the pattern: +- `latest`: Latest release from main branch +- ``: Specific version tags (e.g., `light-prover-v1.0.0`) diff --git a/prover/server/Dockerfile.light b/prover/server/Dockerfile.light new file mode 100644 index 0000000000..6300312e36 --- /dev/null +++ b/prover/server/Dockerfile.light @@ -0,0 +1,26 @@ +FROM golang:1.20.3-alpine AS builder + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download && go mod verify + +COPY . . + +ENV CGO_ENABLED=0 +RUN go build -v -o /usr/local/bin/light-prover . + +RUN mkdir -p /tmp/empty_proving_keys + +FROM gcr.io/distroless/base-debian11:nonroot + +COPY --from=builder /usr/local/bin/light-prover /usr/local/bin/light-prover + +WORKDIR /proving-keys + +COPY --chown=nonroot:nonroot --from=builder /tmp/empty_proving_keys /proving-keys/ + +WORKDIR / + +ENTRYPOINT [ "light-prover" ] +CMD [ "start" ] diff --git a/prover/server/docker-compose.yml b/prover/server/docker-compose.yml new file mode 100644 index 0000000000..04b8715dba --- /dev/null +++ b/prover/server/docker-compose.yml @@ -0,0 +1,22 @@ +services: + redis: + image: redis:7.4.4-alpine3.21 + container_name: redis + ports: + - "6379:6379" + restart: unless-stopped + + prover: + image: sergeytimoshin/prover-light:1.0.0 + container_name: prover + ports: + - "3001:3001" + volumes: + - ./proving-keys:/proving-keys/:ro + command: > + start + --run-mode forester-test + --redis-url=redis://redis:6379 + depends_on: + - redis + restart: unless-stopped diff --git a/prover/server/main.go b/prover/server/main.go index 193d65c3d8..33724a9d12 100644 --- a/prover/server/main.go +++ b/prover/server/main.go @@ -444,11 +444,6 @@ func runCli() { Usage: "Run only HTTP server (no queue workers)", Value: false, }, - &cli.IntFlag{ - Name: "queue-workers", - Usage: "Number of queue worker goroutines", - Value: 1, - }, }, Action: func(context *cli.Context) error { if context.Bool("json-logging") { @@ -481,14 +476,13 @@ func runCli() { queueOnly := context.Bool("queue-only") serverOnly := context.Bool("server-only") - numWorkers := context.Int("queue-workers") enableQueue := redisURL != "" && !serverOnly enableServer := !queueOnly if os.Getenv("QUEUE_MODE") == "true" { enableQueue = true - if os.Getenv("SERVER _MODE") != "true" { + if os.Getenv("SERVER_MODE") != "true" { enableServer = false } } @@ -497,10 +491,9 @@ func runCli() { Bool("enable_queue", enableQueue). Bool("enable_server", enableServer). Str("redis_url", redisURL). - Int("queue_workers", numWorkers). Msg("Starting ZK Prover service") - var workers []*server.QueueWorker + var workers []server.QueueWorker var redisQueue *server.RedisQueue var instance server.RunningJob @@ -514,22 +507,49 @@ func runCli() { return fmt.Errorf("failed to connect to Redis: %w", err) } - startResultCleanup(redisQueue) + startCleanupRoutines(redisQueue) if stats, err := redisQueue.GetQueueStats(); err == nil { logging.Logger().Info().Interface("initial_queue_stats", stats).Msg("Redis connection successful") } - if numWorkers <= 0 { - numWorkers = 1 + logging.Logger().Info().Msg("Starting queue workers") + + startAllWorkers := runMode == prover.Forester || runMode == prover.ForesterTest + + var workersStarted []string + + // Start update worker for batch-update circuits or forester modes + if startAllWorkers || containsCircuit(circuits, "update") || containsCircuit(circuits, "update-test") { + updateWorker := server.NewUpdateQueueWorker(redisQueue, psv1, psv2) + workers = append(workers, updateWorker) + go updateWorker.Start() + workersStarted = append(workersStarted, "update") } - logging.Logger().Info().Int("workers", numWorkers).Msg("Starting queue workers") + // Start append worker for batch-append circuits or forester modes + if startAllWorkers || containsCircuit(circuits, "append-with-proofs") || containsCircuit(circuits, "append-with-proofs-test") { + appendWorker := server.NewAppendQueueWorker(redisQueue, psv1, psv2) + workers = append(workers, appendWorker) + go appendWorker.Start() + workersStarted = append(workersStarted, "append") + } - for i := 0; i < numWorkers; i++ { - worker := server.NewQueueWorker(i+1, redisQueue, psv1, psv2) - workers = append(workers, worker) - go worker.Start() + // Start address append worker for address-append circuits or forester modes + if startAllWorkers || containsCircuit(circuits, "address-append") || containsCircuit(circuits, "address-append-test") { + addressAppendWorker := server.NewAddressAppendQueueWorker(redisQueue, psv1, psv2) + workers = append(workers, addressAppendWorker) + go addressAppendWorker.Start() + workersStarted = append(workersStarted, "address-append") + } + + if len(workersStarted) == 0 { + logging.Logger().Warn().Msg("No queue workers started - no matching circuits found") + } else { + logging.Logger().Info(). + Strs("workers_started", workersStarted). + Bool("forester_mode", startAllWorkers). + Msg("Queue workers started") } } @@ -931,17 +951,68 @@ func debugProvingSystemKeys(keysDirPath string, runMode prover.RunMode, circuits } } -func startResultCleanup(redisQueue *server.RedisQueue) { +func startCleanupRoutines(redisQueue *server.RedisQueue) { + logging.Logger().Info().Msg("Running immediate cleanup on startup") + + if err := redisQueue.CleanupOldRequests(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old proof requests on startup") + } else { + logging.Logger().Info().Msg("Startup cleanup of old proof requests completed") + } + + if err := redisQueue.CleanupOldResults(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old results on startup") + } else { + logging.Logger().Info().Msg("Startup cleanup of old results completed") + } + + // Start cleanup for old proof requests (every 10 minutes) + go func() { + requestTicker := time.NewTicker(10 * time.Minute) + defer requestTicker.Stop() + + logging.Logger().Info().Msg("Started old proof requests cleanup routine (every 10 minutes)") + + for range requestTicker.C { + if err := redisQueue.CleanupOldRequests(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old proof requests") + } else { + logging.Logger().Debug().Msg("Old proof requests cleanup completed") + } + } + }() + + // Start less frequent cleanup for old results (every 1 hour) go func() { - ticker := time.NewTicker(1 * time.Hour) - defer ticker.Stop() + resultTicker := time.NewTicker(1 * time.Hour) + defer resultTicker.Stop() + + logging.Logger().Info().Msg("Started old results cleanup routine (every 1 hour)") - for range ticker.C { + for range resultTicker.C { if err := redisQueue.CleanupOldResults(); err != nil { logging.Logger().Error(). Err(err). Msg("Failed to cleanup old results") + } else { + logging.Logger().Debug().Msg("Old results cleanup completed") } } }() } + +// containsCircuit checks if the circuits slice contains the specified circuit +func containsCircuit(circuits []string, circuit string) bool { + for _, c := range circuits { + if c == circuit { + return true + } + } + return false +} diff --git a/prover/server/redis_queue_test.go b/prover/server/redis_queue_test.go new file mode 100644 index 0000000000..23a866d284 --- /dev/null +++ b/prover/server/redis_queue_test.go @@ -0,0 +1,1094 @@ +package main_test + +import ( + "context" + "encoding/json" + "fmt" + "io" + "light/light-prover/prover" + "light/light-prover/server" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/google/uuid" +) + +const TestRedisURL = "redis://localhost:6379/15" + +func setupRedisQueue(t *testing.T) *server.RedisQueue { + // Skip if Redis URL not available + redisURL := os.Getenv("TEST_REDIS_URL") + if redisURL == "" { + redisURL = TestRedisURL + } + + rq, err := server.NewRedisQueue(redisURL) + if err != nil { + t.Skipf("Redis not available for testing: %v", err) + } + + err = rq.Client.FlushDB(context.Background()).Err() + if err != nil { + t.Fatalf("Failed to flush Redis DB: %v", err) + } + + return rq +} + +func teardownRedisQueue(t *testing.T, rq *server.RedisQueue) { + if rq != nil { + rq.Client.FlushDB(context.Background()).Err() + rq.Client.Close() + } +} + +func TestPeriodicCleanupFunctionality(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + // Create a mix of old and recent jobs across multiple queues + now := time.Now() + oldTime := now.Add(-35 * time.Minute) // 35 minutes ago (should be removed) + recentTime := now.Add(-20 * time.Minute) // 20 minutes ago (should stay) + + // Create test jobs for all input queues + testJobs := []struct { + queueName string + job *server.ProofJob + shouldRemove bool + }{ + { + queueName: "zk_update_queue", + job: &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: oldTime, + }, + shouldRemove: true, + }, + { + queueName: "zk_update_queue", + job: &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: recentTime, + }, + shouldRemove: false, + }, + { + queueName: "zk_append_queue", + job: &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: oldTime, + }, + shouldRemove: true, + }, + { + queueName: "zk_address_append_queue", + job: &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"tree_height": 40, "batch_size": 10}`), + CreatedAt: recentTime, + }, + shouldRemove: false, + }, + } + + // Enqueue all test jobs + for _, testJob := range testJobs { + err := rq.EnqueueProof(testJob.queueName, testJob.job) + if err != nil { + t.Fatalf("Failed to enqueue test job to %s: %v", testJob.queueName, err) + } + } + + // Verify initial state + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get initial queue stats: %v", err) + } + + expectedInitial := map[string]int64{ + "zk_update_queue": 2, + "zk_append_queue": 1, + "zk_address_append_queue": 1, + } + + for queue, expected := range expectedInitial { + if stats[queue] != expected { + t.Errorf("Expected %s to have %d jobs initially, got %d", queue, expected, stats[queue]) + } + } + + // Run cleanup + err = rq.CleanupOldRequests() + if err != nil { + t.Errorf("CleanupOldRequests failed: %v", err) + } + + // Verify cleanup results + stats, err = rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats after cleanup: %v", err) + } + + // Count expected remaining jobs + expectedAfter := map[string]int64{ + "zk_update_queue": 1, // 1 recent job should remain + "zk_append_queue": 0, // 1 old job should be removed + "zk_address_append_queue": 1, // 1 recent job should remain + } + + for queue, expected := range expectedAfter { + if stats[queue] != expected { + t.Errorf("Expected %s to have %d jobs after cleanup, got %d", queue, expected, stats[queue]) + } + } + + // Verify we can still dequeue the remaining jobs + remainingUpdate, err := rq.DequeueProof("zk_update_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue remaining update job: %v", err) + } + if remainingUpdate == nil { + t.Errorf("Expected to find remaining update job") + } + + remainingAddress, err := rq.DequeueProof("zk_address_append_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue remaining address append job: %v", err) + } + if remainingAddress == nil { + t.Errorf("Expected to find remaining address append job") + } + + // Verify append queue is empty (old job was cleaned up) + emptyAppend, err := rq.DequeueProof("zk_append_queue", 500*time.Millisecond) + if err != nil { + t.Errorf("Failed to check empty append queue: %v", err) + } + if emptyAppend != nil { + t.Errorf("Expected append queue to be empty after cleanup, but found job: %v", emptyAppend) + } +} + +func TestCleanupOldProofRequests(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + // Create jobs with different ages + now := time.Now() + oldTime := now.Add(-45 * time.Minute) // 45 minutes ago (should be removed) + recentTime := now.Add(-15 * time.Minute) // 15 minutes ago (should stay) + + // Create old jobs (should be removed) + oldUpdateJob := &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: oldTime, + } + + oldAppendJob := &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: oldTime, + } + + // Create recent jobs (should stay) + recentUpdateJob := &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: recentTime, + } + + recentAppendJob := &server.ProofJob{ + ID: uuid.New().String(), + Type: "zk_proof", + Payload: json.RawMessage(`{"height": 32, "batch_size": 10}`), + CreatedAt: recentTime, + } + + // Enqueue all jobs + err := rq.EnqueueProof("zk_update_queue", oldUpdateJob) + if err != nil { + t.Fatalf("Failed to enqueue old update job: %v", err) + } + + err = rq.EnqueueProof("zk_append_queue", oldAppendJob) + if err != nil { + t.Fatalf("Failed to enqueue old append job: %v", err) + } + + err = rq.EnqueueProof("zk_update_queue", recentUpdateJob) + if err != nil { + t.Fatalf("Failed to enqueue recent update job: %v", err) + } + + err = rq.EnqueueProof("zk_append_queue", recentAppendJob) + if err != nil { + t.Fatalf("Failed to enqueue recent append job: %v", err) + } + + // Verify initial state + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get initial queue stats: %v", err) + } + if stats["zk_update_queue"] != 2 { + t.Errorf("Expected zk_update_queue to have 2 jobs initially, got %d", stats["zk_update_queue"]) + } + if stats["zk_append_queue"] != 2 { + t.Errorf("Expected zk_append_queue to have 2 jobs initially, got %d", stats["zk_append_queue"]) + } + + // Run cleanup + err = rq.CleanupOldRequests() + if err != nil { + t.Errorf("CleanupOldRequests failed: %v", err) + } + + // Verify cleanup results + stats, err = rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats after cleanup: %v", err) + } + + // Should have 1 job remaining in each queue (the recent ones) + if stats["zk_update_queue"] != 1 { + t.Errorf("Expected zk_update_queue to have 1 job after cleanup, got %d", stats["zk_update_queue"]) + } + if stats["zk_append_queue"] != 1 { + t.Errorf("Expected zk_append_queue to have 1 job after cleanup, got %d", stats["zk_append_queue"]) + } + + // Verify the remaining jobs are the recent ones by checking they can be dequeued + dequeuedUpdate, err := rq.DequeueProof("zk_update_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue remaining update job: %v", err) + } + if dequeuedUpdate == nil { + t.Errorf("Expected to find remaining update job") + } else if dequeuedUpdate.ID != recentUpdateJob.ID { + t.Errorf("Expected remaining job to be recent job, got ID %s instead of %s", dequeuedUpdate.ID, recentUpdateJob.ID) + } + + dequeuedAppend, err := rq.DequeueProof("zk_append_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue remaining append job: %v", err) + } + if dequeuedAppend == nil { + t.Errorf("Expected to find remaining append job") + } else if dequeuedAppend.ID != recentAppendJob.ID { + t.Errorf("Expected remaining job to be recent job, got ID %s instead of %s", dequeuedAppend.ID, recentAppendJob.ID) + } +} + +func createTestJob(jobID, circuitType string) *server.ProofJob { + var payload json.RawMessage + + switch circuitType { + case "batch-update": + payload = json.RawMessage(`{"height": 32, "batch_size": 10, "old_root": "0", "new_root": "1", "leaves": []}`) + case "batch-append-with-proofs": + payload = json.RawMessage(`{"height": 32, "batch_size": 10, "old_root": "0", "new_root": "1", "leaves": [], "merkle_proofs": []}`) + case "batch-address-append": + payload = json.RawMessage(`{"tree_height": 40, "batch_size": 10, "old_root": "0", "new_root": "1", "addresses": []}`) + default: + payload = json.RawMessage(`{"state_merkle_tree_root": "0", "state_merkle_tree_next_index": 0}`) + } + + return &server.ProofJob{ + ID: jobID, + Type: "zk_proof", + Payload: payload, + CreatedAt: time.Now(), + } +} + +func TestRedisQueueConnection(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + err := rq.Client.Ping(context.Background()).Err() + if err != nil { + t.Errorf("Redis ping failed: %v", err) + } +} + +func TestQueueStats(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + + expectedQueues := []string{ + "zk_update_queue", + "zk_append_queue", + "zk_address_append_queue", + "zk_update_processing_queue", + "zk_append_processing_queue", + "zk_address_append_processing_queue", + "zk_failed_queue", + "zk_results_queue", + } + + for _, queue := range expectedQueues { + if _, exists := stats[queue]; !exists { + t.Errorf("Expected queue %s not found in stats", queue) + } + if stats[queue] != int64(0) { + t.Errorf("Expected queue %s to be empty, got %d", queue, stats[queue]) + } + } +} + +func TestEnqueueToUpdateQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + job := createTestJob("test-update-1", "batch-update") + + err := rq.EnqueueProof("zk_update_queue", job) + if err != nil { + t.Errorf("Failed to enqueue proof: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_update_queue"] != int64(1) { + t.Errorf("Expected zk_update_queue to have 1 job, got %d", stats["zk_update_queue"]) + } +} + +func TestEnqueueToAppendQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + job := createTestJob("test-append-1", "batch-append-with-proofs") + + err := rq.EnqueueProof("zk_append_queue", job) + if err != nil { + t.Errorf("Failed to enqueue proof: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_append_queue"] != int64(1) { + t.Errorf("Expected zk_append_queue to have 1 job, got %d", stats["zk_append_queue"]) + } +} + +func TestEnqueueToAddressAppendQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + job := createTestJob("test-address-append-1", "batch-address-append") + + err := rq.EnqueueProof("zk_address_append_queue", job) + if err != nil { + t.Errorf("Failed to enqueue proof: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_address_append_queue"] != int64(1) { + t.Errorf("Expected zk_address_append_queue to have 1 job, got %d", stats["zk_address_append_queue"]) + } +} + +func TestDequeueFromUpdateQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + originalJob := createTestJob("test-dequeue-update", "batch-update") + + err := rq.EnqueueProof("zk_update_queue", originalJob) + if err != nil { + t.Fatalf("Failed to enqueue proof: %v", err) + } + + dequeuedJob, err := rq.DequeueProof("zk_update_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue proof: %v", err) + } + if dequeuedJob == nil { + t.Errorf("Expected to dequeue a job, got nil") + } + if dequeuedJob.ID != originalJob.ID { + t.Errorf("Expected job ID %s, got %s", originalJob.ID, dequeuedJob.ID) + } + if dequeuedJob.Type != originalJob.Type { + t.Errorf("Expected job type %s, got %s", originalJob.Type, dequeuedJob.Type) + } +} + +func TestDequeueFromAppendQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + originalJob := createTestJob("test-dequeue-append", "batch-append-with-proofs") + + err := rq.EnqueueProof("zk_append_queue", originalJob) + if err != nil { + t.Fatalf("Failed to enqueue proof: %v", err) + } + + dequeuedJob, err := rq.DequeueProof("zk_append_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue proof: %v", err) + } + if dequeuedJob == nil { + t.Errorf("Expected to dequeue a job, got nil") + } + if dequeuedJob.ID != originalJob.ID { + t.Errorf("Expected job ID %s, got %s", originalJob.ID, dequeuedJob.ID) + } +} + +func TestDequeueFromAddressAppendQueue(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + originalJob := createTestJob("test-dequeue-address-append", "batch-address-append") + + err := rq.EnqueueProof("zk_address_append_queue", originalJob) + if err != nil { + t.Fatalf("Failed to enqueue proof: %v", err) + } + + dequeuedJob, err := rq.DequeueProof("zk_address_append_queue", 1*time.Second) + if err != nil { + t.Errorf("Failed to dequeue proof: %v", err) + } + if dequeuedJob == nil { + t.Errorf("Expected to dequeue a job, got nil") + } + if dequeuedJob.ID != originalJob.ID { + t.Errorf("Expected job ID %s, got %s", originalJob.ID, dequeuedJob.ID) + } +} + +func TestDequeueTimeout(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + start := time.Now() + job, err := rq.DequeueProof("zk_update_queue", 500*time.Millisecond) + duration := time.Since(start) + + if err != nil { + t.Errorf("Dequeue failed: %v", err) + } + if job != nil { + t.Errorf("Expected nil job from empty queue, got %v", job) + } + if duration < 400*time.Millisecond { + t.Errorf("Timeout duration too short: %v", duration) + } + if duration > 1*time.Second { + t.Errorf("Timeout duration too long: %v", duration) + } +} + +func TestQueueNameForCircuitType(t *testing.T) { + tests := []struct { + circuitType string + expectedQueue string + }{ + {string(prover.BatchUpdateCircuitType), "zk_update_queue"}, + {string(prover.BatchAppendWithProofsCircuitType), "zk_append_queue"}, + {string(prover.BatchAddressAppendCircuitType), "zk_address_append_queue"}, + {string(prover.InclusionCircuitType), "zk_update_queue"}, // Default to update queue + {string(prover.NonInclusionCircuitType), "zk_update_queue"}, // Default to update queue + {string(prover.CombinedCircuitType), "zk_update_queue"}, // Default to update queue + } + + for _, test := range tests { + t.Run(fmt.Sprintf("CircuitType_%s", test.circuitType), func(t *testing.T) { + var circuitType prover.CircuitType + switch test.circuitType { + case string(prover.BatchUpdateCircuitType): + circuitType = prover.BatchUpdateCircuitType + case string(prover.BatchAppendWithProofsCircuitType): + circuitType = prover.BatchAppendWithProofsCircuitType + case string(prover.BatchAddressAppendCircuitType): + circuitType = prover.BatchAddressAppendCircuitType + case string(prover.InclusionCircuitType): + circuitType = prover.InclusionCircuitType + case string(prover.NonInclusionCircuitType): + circuitType = prover.NonInclusionCircuitType + case string(prover.CombinedCircuitType): + circuitType = prover.CombinedCircuitType + } + + queueName := server.GetQueueNameForCircuit(circuitType) + if queueName != test.expectedQueue { + t.Errorf("Expected queue %s for circuit type %s, got %s", test.expectedQueue, test.circuitType, queueName) + } + }) + } +} + +func TestMultipleJobsInDifferentQueues(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + updateJob := createTestJob("update-job", "batch-update") + appendJob := createTestJob("append-job", "batch-append-with-proofs") + addressAppendJob := createTestJob("address-append-job", "batch-address-append") + + err := rq.EnqueueProof("zk_update_queue", updateJob) + if err != nil { + t.Fatalf("Failed to enqueue update job: %v", err) + } + + err = rq.EnqueueProof("zk_append_queue", appendJob) + if err != nil { + t.Fatalf("Failed to enqueue append job: %v", err) + } + + err = rq.EnqueueProof("zk_address_append_queue", addressAppendJob) + if err != nil { + t.Fatalf("Failed to enqueue address append job: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + + if stats["zk_update_queue"] != int64(1) { + t.Errorf("Expected zk_update_queue to have 1 job, got %d", stats["zk_update_queue"]) + } + if stats["zk_append_queue"] != int64(1) { + t.Errorf("Expected zk_append_queue to have 1 job, got %d", stats["zk_append_queue"]) + } + if stats["zk_address_append_queue"] != int64(1) { + t.Errorf("Expected zk_address_append_queue to have 1 job, got %d", stats["zk_address_append_queue"]) + } + + dequeuedUpdate, err := rq.DequeueProof("zk_update_queue", 1*time.Second) + if err != nil { + t.Fatalf("Failed to dequeue from update queue: %v", err) + } + if dequeuedUpdate.ID != updateJob.ID { + t.Errorf("Expected update job ID %s, got %s", updateJob.ID, dequeuedUpdate.ID) + } + + dequeuedAppend, err := rq.DequeueProof("zk_append_queue", 1*time.Second) + if err != nil { + t.Fatalf("Failed to dequeue from append queue: %v", err) + } + if dequeuedAppend.ID != appendJob.ID { + t.Errorf("Expected append job ID %s, got %s", appendJob.ID, dequeuedAppend.ID) + } + + dequeuedAddressAppend, err := rq.DequeueProof("zk_address_append_queue", 1*time.Second) + if err != nil { + t.Fatalf("Failed to dequeue from address append queue: %v", err) + } + if dequeuedAddressAppend.ID != addressAppendJob.ID { + t.Errorf("Expected address append job ID %s, got %s", addressAppendJob.ID, dequeuedAddressAppend.ID) + } +} + +func TestJobResultStorage(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + jobID := "test-result-job" + + mockResult := map[string]interface{}{ + "proof": "mock-proof-data", + "status": "completed", + } + + err := rq.StoreResult(jobID, mockResult) + if err != nil { + t.Errorf("Failed to store result: %v", err) + } + + result, err := rq.GetResult(jobID) + if err != nil { + t.Errorf("Failed to retrieve result: %v", err) + } + if result == nil { + t.Errorf("Expected result, got nil") + } + + if _, ok := result.(map[string]interface{}); !ok { + t.Errorf("Expected result to be map[string]interface{}, got %T", result) + } +} + +func TestResultCleanup(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + for i := 0; i < 1005; i++ { + job := &server.ProofJob{ + ID: fmt.Sprintf("cleanup-job-%d", i), + Type: "result", + Payload: json.RawMessage(`{"test": "data"}`), + CreatedAt: time.Now(), + } + err := rq.EnqueueProof("zk_results_queue", job) + if err != nil { + t.Fatalf("Failed to enqueue cleanup job %d: %v", i, err) + } + } + + err := rq.CleanupOldResults() + if err != nil { + t.Errorf("Failed to cleanup old results: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_results_queue"] != int64(1000) { + t.Errorf("Expected results queue to have 1000 jobs after cleanup, got %d", stats["zk_results_queue"]) + } +} + +func TestWorkerCreation(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + var psv1 []*prover.ProvingSystemV1 + var psv2 []*prover.ProvingSystemV2 + + updateWorker := server.NewUpdateQueueWorker(rq, psv1, psv2) + if updateWorker == nil { + t.Errorf("Expected update worker to be created, got nil") + } + + appendWorker := server.NewAppendQueueWorker(rq, psv1, psv2) + if appendWorker == nil { + t.Errorf("Expected append worker to be created, got nil") + } + + addressAppendWorker := server.NewAddressAppendQueueWorker(rq, psv1, psv2) + if addressAppendWorker == nil { + t.Errorf("Expected address append worker to be created, got nil") + } + + var _ server.QueueWorker = updateWorker + var _ server.QueueWorker = appendWorker + var _ server.QueueWorker = addressAppendWorker +} + +func TestJobProcessingFlow(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + jobID := "test-processing-flow" + job := createTestJob(jobID, "batch-update") + + err := rq.EnqueueProof("zk_update_queue", job) + if err != nil { + t.Fatalf("Failed to enqueue job: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_update_queue"] != int64(1) { + t.Errorf("Expected zk_update_queue to have 1 job, got %d", stats["zk_update_queue"]) + } + + dequeuedJob, err := rq.DequeueProof("zk_update_queue", 1*time.Second) + if err != nil { + t.Fatalf("Failed to dequeue job: %v", err) + } + if dequeuedJob.ID != jobID { + t.Errorf("Expected job ID %s, got %s", jobID, dequeuedJob.ID) + } + + processingJob := &server.ProofJob{ + ID: jobID + "_processing", + Type: "processing", + Payload: job.Payload, + CreatedAt: time.Now(), + } + err = rq.EnqueueProof("zk_update_processing_queue", processingJob) + if err != nil { + t.Fatalf("Failed to enqueue processing job: %v", err) + } + + stats, err = rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_update_processing_queue"] != int64(1) { + t.Errorf("Expected zk_update_processing_queue to have 1 job, got %d", stats["zk_update_processing_queue"]) + } + + resultJob := &server.ProofJob{ + ID: jobID, + Type: "result", + Payload: json.RawMessage(`{"proof": "completed", "public_inputs": []}`), + CreatedAt: time.Now(), + } + err = rq.EnqueueProof("zk_results_queue", resultJob) + if err != nil { + t.Fatalf("Failed to enqueue result job: %v", err) + } + + stats, err = rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_results_queue"] != int64(1) { + t.Errorf("Expected zk_results_queue to have 1 job, got %d", stats["zk_results_queue"]) + } +} + +func TestFailedJobStatusDetails(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + jobID := uuid.New().String() + + originalJob := createTestJob(jobID, "batch-update") + errorMessage := "Proof generation failed: Invalid merkle tree state" + + failureDetails := map[string]interface{}{ + "original_job": originalJob, + "error": errorMessage, + "failed_at": time.Now(), + } + + failedData, err := json.Marshal(failureDetails) + if err != nil { + t.Fatalf("Failed to marshal failure details: %v", err) + } + + failedJob := &server.ProofJob{ + ID: jobID + "_failed", + Type: "failed", + Payload: json.RawMessage(failedData), + CreatedAt: time.Now(), + } + + err = rq.EnqueueProof("zk_failed_queue", failedJob) + if err != nil { + t.Fatalf("Failed to enqueue failed job: %v", err) + } + + stats, err := rq.GetQueueStats() + if err != nil { + t.Fatalf("Failed to get queue stats: %v", err) + } + if stats["zk_failed_queue"] != int64(1) { + t.Errorf("Expected zk_failed_queue to have 1 job, got %d", stats["zk_failed_queue"]) + } + + items, err := rq.Client.LRange(rq.Ctx, "zk_failed_queue", 0, -1).Result() + if err != nil { + t.Fatalf("Failed to get failed queue items: %v", err) + } + + if len(items) != 1 { + t.Fatalf("Expected 1 item in failed queue, got %d", len(items)) + } + + var retrievedJob server.ProofJob + err = json.Unmarshal([]byte(items[0]), &retrievedJob) + if err != nil { + t.Fatalf("Failed to unmarshal failed job: %v", err) + } + + var parsedFailureDetails map[string]interface{} + err = json.Unmarshal(retrievedJob.Payload, &parsedFailureDetails) + if err != nil { + t.Fatalf("Failed to parse failure details: %v", err) + } + + if retrievedError, ok := parsedFailureDetails["error"].(string); !ok { + t.Errorf("Expected error field in failure details") + } else if retrievedError != errorMessage { + t.Errorf("Expected error message '%s', got '%s'", errorMessage, retrievedError) + } + + if _, ok := parsedFailureDetails["failed_at"]; !ok { + t.Errorf("Expected failed_at field in failure details") + } + + if _, ok := parsedFailureDetails["original_job"]; !ok { + t.Errorf("Expected original_job field in failure details") + } +} + +func TestFailedJobStatusHTTPEndpoint(t *testing.T) { + rq := setupRedisQueue(t) + defer teardownRedisQueue(t, rq) + + var psv1 []*prover.ProvingSystemV1 + var psv2 []*prover.ProvingSystemV2 + + config := &server.EnhancedConfig{ + ProverAddress: "localhost:8082", + MetricsAddress: "localhost:9997", + Queue: &server.QueueConfig{ + RedisURL: TestRedisURL, + Enabled: true, + }, + } + + serverJob := server.RunEnhanced(config, rq, []string{}, prover.FullTest, psv1, psv2) + defer serverJob.RequestStop() + + time.Sleep(100 * time.Millisecond) + + jobID := uuid.New().String() + errorMessage := "HTTP Test: Proof generation failed due to invalid input parameters" + + originalJob := createTestJob(jobID, "batch-update") + + failureDetails := map[string]interface{}{ + "original_job": originalJob, + "error": errorMessage, + "failed_at": time.Now().Format(time.RFC3339), + } + + failedData, err := json.Marshal(failureDetails) + if err != nil { + t.Fatalf("Failed to marshal failure details: %v", err) + } + + failedJob := &server.ProofJob{ + ID: jobID + "_failed", + Type: "failed", + Payload: json.RawMessage(failedData), + CreatedAt: time.Now(), + } + + err = rq.EnqueueProof("zk_failed_queue", failedJob) + if err != nil { + t.Fatalf("Failed to enqueue failed job: %v", err) + } + + statusURL := fmt.Sprintf("http://%s/prove/status?job_id=%s", config.ProverAddress, jobID) + resp, err := http.Get(statusURL) + if err != nil { + t.Fatalf("Failed to make HTTP request: %v", err) + } + defer resp.Body.Close() + + // Read response body + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + var statusResponse map[string]interface{} + err = json.Unmarshal(body, &statusResponse) + if err != nil { + t.Fatalf("Failed to parse JSON response: %v", err) + } + + if status, ok := statusResponse["status"].(string); !ok || status != "failed" { + t.Errorf("Expected status 'failed', got %v", statusResponse["status"]) + } + + if message, ok := statusResponse["message"].(string); !ok { + t.Errorf("Expected message field in response") + } else if !contains(message, errorMessage) { + t.Errorf("Expected message to contain '%s', got '%s'", errorMessage, message) + } + + if errorField, ok := statusResponse["error"].(string); !ok { + t.Errorf("Expected error field in response") + } else if errorField != errorMessage { + t.Errorf("Expected error field to be '%s', got '%s'", errorMessage, errorField) + } + + if _, ok := statusResponse["failed_at"]; !ok { + t.Errorf("Expected failed_at field in response") + } + + if jobIDField, ok := statusResponse["job_id"].(string); !ok || jobIDField != jobID { + t.Errorf("Expected job_id to be '%s', got %v", jobID, statusResponse["job_id"]) + } +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || strings.Contains(s, substr))) +} + +func TestWorkerSelectionLogic(t *testing.T) { + circuits := []string{"update", "append-with-proofs", "inclusion"} + + if !containsCircuit(circuits, "update") { + t.Errorf("Expected circuits to contain 'update'") + } + + if !containsCircuit(circuits, "append-with-proofs") { + t.Errorf("Expected circuits to contain 'append-with-proofs'") + } + + if !containsCircuit(circuits, "inclusion") { + t.Errorf("Expected circuits to contain 'inclusion'") + } + + if containsCircuit(circuits, "address-append") { + t.Errorf("Expected circuits to NOT contain 'address-append'") + } + + if containsCircuit(circuits, "non-existent") { + t.Errorf("Expected circuits to NOT contain 'non-existent'") + } + + emptyCircuits := []string{} + if containsCircuit(emptyCircuits, "update") { + t.Errorf("Expected empty circuits to NOT contain 'update'") + } + + testCases := []struct { + name string + circuits []string + expectUpdate bool + expectAppend bool + expectAddress bool + }{ + { + name: "Update only", + circuits: []string{"update"}, + expectUpdate: true, + expectAppend: false, + expectAddress: false, + }, + { + name: "Append only", + circuits: []string{"append-with-proofs"}, + expectUpdate: false, + expectAppend: true, + expectAddress: false, + }, + { + name: "Address append only", + circuits: []string{"address-append"}, + expectUpdate: false, + expectAppend: false, + expectAddress: true, + }, + { + name: "Multiple circuits", + circuits: []string{"update", "append-with-proofs"}, + expectUpdate: true, + expectAppend: true, + expectAddress: false, + }, + { + name: "All batch circuits", + circuits: []string{"update", "append-with-proofs", "address-append"}, + expectUpdate: true, + expectAppend: true, + expectAddress: true, + }, + { + name: "Test circuits", + circuits: []string{"update-test", "append-with-proofs-test", "address-append-test"}, + expectUpdate: true, + expectAppend: true, + expectAddress: true, + }, + { + name: "Non-batch circuits only", + circuits: []string{"inclusion", "non-inclusion"}, + expectUpdate: false, + expectAppend: false, + expectAddress: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + shouldStartUpdate := containsCircuit(tc.circuits, "update") || containsCircuit(tc.circuits, "update-test") + shouldStartAppend := containsCircuit(tc.circuits, "append-with-proofs") || containsCircuit(tc.circuits, "append-with-proofs-test") + shouldStartAddress := containsCircuit(tc.circuits, "address-append") || containsCircuit(tc.circuits, "address-append-test") + + if shouldStartUpdate != tc.expectUpdate { + t.Errorf("Expected update worker: %v, got: %v", tc.expectUpdate, shouldStartUpdate) + } + + if shouldStartAppend != tc.expectAppend { + t.Errorf("Expected append worker: %v, got: %v", tc.expectAppend, shouldStartAppend) + } + + if shouldStartAddress != tc.expectAddress { + t.Errorf("Expected address append worker: %v, got: %v", tc.expectAddress, shouldStartAddress) + } + }) + } +} + +func containsCircuit(circuits []string, circuit string) bool { + for _, c := range circuits { + if c == circuit { + return true + } + } + return false +} + +func TestBatchOperationsAlwaysUseQueue(t *testing.T) { + batchTests := []struct { + circuitType prover.CircuitType + expectedQueue string + }{ + {prover.BatchUpdateCircuitType, "zk_update_queue"}, + {prover.BatchAppendWithProofsCircuitType, "zk_append_queue"}, + {prover.BatchAddressAppendCircuitType, "zk_address_append_queue"}, + } + + for _, test := range batchTests { + t.Run(fmt.Sprintf("BatchOperation_%s", string(test.circuitType)), func(t *testing.T) { + queueName := server.GetQueueNameForCircuit(test.circuitType) + if queueName != test.expectedQueue { + t.Errorf("Expected circuit type %s to route to %s, got %s", + string(test.circuitType), test.expectedQueue, queueName) + } + }) + } + + nonBatchTests := []prover.CircuitType{ + prover.InclusionCircuitType, + prover.NonInclusionCircuitType, + prover.CombinedCircuitType, + } + + for _, circuitType := range nonBatchTests { + t.Run(fmt.Sprintf("NonBatchOperation_%s", string(circuitType)), func(t *testing.T) { + queueName := server.GetQueueNameForCircuit(circuitType) + expectedQueue := "zk_update_queue" + if queueName != expectedQueue { + t.Errorf("Expected circuit type %s to route to %s, got %s", + string(circuitType), expectedQueue, queueName) + } + }) + } +} diff --git a/prover/server/scripts/download_keys_docker.sh b/prover/server/scripts/download_keys_docker.sh new file mode 100755 index 0000000000..ec3eff1c62 --- /dev/null +++ b/prover/server/scripts/download_keys_docker.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash + +set -e + +ROOT_DIR="$(git rev-parse --show-toplevel)" +KEYS_DIR="${ROOT_DIR}/prover/server/proving-keys" +BASE_URL="https://light.fra1.digitaloceanspaces.com/proving-keys-06-03-25" +CHECKSUM_URL="${BASE_URL}/CHECKSUM" +MAX_RETRIES=3 +RETRY_DELAY=5 + +mkdir -p "$KEYS_DIR" + +download_file() { + local url="$1" + local output="$2" + local attempt=1 + + while [ $attempt -le $MAX_RETRIES ]; do + echo "Downloading $url (attempt $attempt/$MAX_RETRIES)" + if curl -L \ + --fail \ + -H "Accept: */*" \ + -H "Accept-Encoding: identity" \ + -A "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" \ + --connect-timeout 30 \ + --max-time 300 \ + --output "$output" \ + "$url"; then + return 0 + fi + + echo "Download failed. Retrying in $RETRY_DELAY seconds..." + rm -f "$output" + attempt=$((attempt + 1)) + [ $attempt -le $MAX_RETRIES ] && sleep $RETRY_DELAY + done + return 1 +} + +verify_checksum() { + local file="$1" + local checksum_file="$2" + local expected + local actual + + if command -v sha256sum >/dev/null 2>&1; then + CHECKSUM_CMD="sha256sum" + else + CHECKSUM_CMD="shasum -a 256" + fi + + expected=$(grep "${file##*/}" "$checksum_file" | cut -d' ' -f1) + actual=$($CHECKSUM_CMD "$file" | cut -d' ' -f1) + + echo "Expected checksum: $expected" + echo "Actual checksum: $actual" + + [ "$expected" = "$actual" ] +} + +CHECKSUM_FILE="${KEYS_DIR}/CHECKSUM" +if ! download_file "$CHECKSUM_URL" "$CHECKSUM_FILE"; then + echo "Failed to download checksum file" + exit 1 +fi + +echo "Content of CHECKSUM file:" +cat "$CHECKSUM_FILE" + +SUFFIXES=( + "inclusion_32:1 2 3 4 8" + "mainnet_inclusion_26:1 2 3 4 8" + "non-inclusion_26:1 2" + "non-inclusion_40:1 2 3 4 8" + "combined_26:1_1 1_2 2_1 2_2 3_1 3_2 4_1 4_2" + "combined_32_40:1_1 1_2 1_3 1_4 2_1 2_2 2_3 2_4 3_1 3_2 3_3 3_4 4_1 4_2 4_3 4_4" +) + +for group in "${SUFFIXES[@]}"; do + base=${group%:*} + suffixes=${group#*:} + + for suffix in $suffixes; do + for ext in key vkey; do + file="${base}_${suffix}.${ext}" + output="${KEYS_DIR}/${file}" + + if [ -f "$output" ] && verify_checksum "$output" "$CHECKSUM_FILE"; then + echo "Skipping $file (already downloaded and verified)" + continue + fi + + if download_file "${BASE_URL}/${file}" "$output"; then + echo "Verifying checksum for $file..." + if ! verify_checksum "$output" "$CHECKSUM_FILE"; then + echo "Checksum verification failed for $file" + rm -f "$output" + exit 1 + fi + echo "Successfully downloaded and verified $file" + else + echo "Failed to download $file" + exit 1 + fi + done + done +done + +echo "Docker-specific proving keys downloaded and verified successfully" diff --git a/prover/server/server/queue.go b/prover/server/server/queue.go index 63f9bd5e79..834b2e4d19 100644 --- a/prover/server/server/queue.go +++ b/prover/server/server/queue.go @@ -78,7 +78,7 @@ func (rq *RedisQueue) DequeueProof(queueName string, timeout time.Duration) (*Pr func (rq *RedisQueue) GetQueueStats() (map[string]int64, error) { stats := make(map[string]int64) - queues := []string{"zk_proof_queue", "zk_priority_queue", "zk_processing_queue", "zk_failed_queue", "zk_results_queue"} + queues := []string{"zk_update_queue", "zk_append_queue", "zk_address_append_queue", "zk_update_processing_queue", "zk_append_processing_queue", "zk_address_append_processing_queue", "zk_failed_queue", "zk_results_queue"} for _, queue := range queues { length, err := rq.Client.LLen(rq.Ctx, queue).Result() @@ -187,3 +187,74 @@ func (rq *RedisQueue) CleanupOldResults() error { return nil } + +func (rq *RedisQueue) CleanupOldRequests() error { + cutoffTime := time.Now().Add(-30 * time.Minute) + + // Queues to clean up old requests from + queuesToClean := []string{ + "zk_update_queue", + "zk_append_queue", + "zk_address_append_queue", + } + + totalRemoved := int64(0) + + for _, queueName := range queuesToClean { + removed, err := rq.cleanupOldRequestsFromQueue(queueName, cutoffTime) + if err != nil { + logging.Logger().Error(). + Err(err). + Str("queue", queueName). + Msg("Failed to cleanup old requests from queue") + continue + } + totalRemoved += removed + } + + if totalRemoved > 0 { + logging.Logger().Info(). + Int64("removed_items", totalRemoved). + Time("cutoff_time", cutoffTime). + Msg("Cleaned up old proof requests") + } + + return nil +} + +func (rq *RedisQueue) cleanupOldRequestsFromQueue(queueName string, cutoffTime time.Time) (int64, error) { + items, err := rq.Client.LRange(rq.Ctx, queueName, 0, -1).Result() + if err != nil { + return 0, fmt.Errorf("failed to get queue items: %w", err) + } + + var removedCount int64 + + for _, item := range items { + var job ProofJob + if json.Unmarshal([]byte(item), &job) == nil { + if job.CreatedAt.Before(cutoffTime) { + // Remove this old job + count, err := rq.Client.LRem(rq.Ctx, queueName, 1, item).Result() + if err != nil { + logging.Logger().Error(). + Err(err). + Str("job_id", job.ID). + Str("queue", queueName). + Msg("Failed to remove old job") + continue + } + if count > 0 { + removedCount++ + logging.Logger().Debug(). + Str("job_id", job.ID). + Str("queue", queueName). + Time("created_at", job.CreatedAt). + Msg("Removed old proof request") + } + } + } + } + + return removedCount, nil +} diff --git a/prover/server/server/queue_job.go b/prover/server/server/queue_job.go index 1f526f61f8..45ea6de9a7 100644 --- a/prover/server/server/queue_job.go +++ b/prover/server/server/queue_job.go @@ -13,34 +13,80 @@ type ProofJob struct { Type string `json:"type"` Payload json.RawMessage `json:"payload"` CreatedAt time.Time `json:"created_at"` - Priority int `json:"priority,omitempty"` } -type QueueWorker struct { - queue *RedisQueue - provingSystemsV1 []*prover.ProvingSystemV1 - provingSystemsV2 []*prover.ProvingSystemV2 - workerID int - stopChan chan struct{} +type QueueWorker interface { + Start() + Stop() } -func NewQueueWorker(workerID int, redisQueue *RedisQueue, psv1 []*prover.ProvingSystemV1, psv2 []*prover.ProvingSystemV2) *QueueWorker { - return &QueueWorker{ - queue: redisQueue, - provingSystemsV1: psv1, - provingSystemsV2: psv2, - workerID: workerID, - stopChan: make(chan struct{}), +type BaseQueueWorker struct { + queue *RedisQueue + provingSystemsV1 []*prover.ProvingSystemV1 + provingSystemsV2 []*prover.ProvingSystemV2 + stopChan chan struct{} + queueName string + processingQueueName string +} + +type UpdateQueueWorker struct { + *BaseQueueWorker +} + +type AppendQueueWorker struct { + *BaseQueueWorker +} + +type AddressAppendQueueWorker struct { + *BaseQueueWorker +} + +func NewUpdateQueueWorker(redisQueue *RedisQueue, psv1 []*prover.ProvingSystemV1, psv2 []*prover.ProvingSystemV2) *UpdateQueueWorker { + return &UpdateQueueWorker{ + BaseQueueWorker: &BaseQueueWorker{ + queue: redisQueue, + provingSystemsV1: psv1, + provingSystemsV2: psv2, + stopChan: make(chan struct{}), + queueName: "zk_update_queue", + processingQueueName: "zk_update_processing_queue", + }, } } -func (w *QueueWorker) Start() { - logging.Logger().Info().Int("worker_id", w.workerID).Msg("Starting queue worker") +func NewAppendQueueWorker(redisQueue *RedisQueue, psv1 []*prover.ProvingSystemV1, psv2 []*prover.ProvingSystemV2) *AppendQueueWorker { + return &AppendQueueWorker{ + BaseQueueWorker: &BaseQueueWorker{ + queue: redisQueue, + provingSystemsV1: psv1, + provingSystemsV2: psv2, + stopChan: make(chan struct{}), + queueName: "zk_append_queue", + processingQueueName: "zk_append_processing_queue", + }, + } +} + +func NewAddressAppendQueueWorker(redisQueue *RedisQueue, psv1 []*prover.ProvingSystemV1, psv2 []*prover.ProvingSystemV2) *AddressAppendQueueWorker { + return &AddressAppendQueueWorker{ + BaseQueueWorker: &BaseQueueWorker{ + queue: redisQueue, + provingSystemsV1: psv1, + provingSystemsV2: psv2, + stopChan: make(chan struct{}), + queueName: "zk_address_append_queue", + processingQueueName: "zk_address_append_processing_queue", + }, + } +} + +func (w *BaseQueueWorker) Start() { + logging.Logger().Info().Str("queue", w.queueName).Msg("Starting queue worker") for { select { case <-w.stopChan: - logging.Logger().Info().Int("worker_id", w.workerID).Msg("Queue worker stopping") + logging.Logger().Info().Str("queue", w.queueName).Msg("Queue worker stopping") return default: w.processJobs() @@ -48,27 +94,18 @@ func (w *QueueWorker) Start() { } } -func (w *QueueWorker) Stop() { +func (w *BaseQueueWorker) Stop() { close(w.stopChan) } -func (w *QueueWorker) processJobs() { - job, err := w.queue.DequeueProof("zk_priority_queue", 1*time.Second) +func (w *BaseQueueWorker) processJobs() { + job, err := w.queue.DequeueProof(w.queueName, 5*time.Second) if err != nil { - logging.Logger().Error().Err(err).Msg("Error dequeuing from priority queue") + logging.Logger().Error().Err(err).Str("queue", w.queueName).Msg("Error dequeuing from queue") time.Sleep(2 * time.Second) return } - if job == nil { - job, err = w.queue.DequeueProof("zk_proof_queue", 5*time.Second) - if err != nil { - logging.Logger().Error().Err(err).Msg("Error dequeuing from regular queue") - time.Sleep(2 * time.Second) - return - } - } - if job == nil { time.Sleep(1 * time.Second) return @@ -76,8 +113,8 @@ func (w *QueueWorker) processJobs() { logging.Logger().Info(). Str("job_id", job.ID). - Int("worker_id", w.workerID). Str("job_type", job.Type). + Str("queue", w.queueName). Msg("Processing proof job") processingJob := &ProofJob{ @@ -86,7 +123,7 @@ func (w *QueueWorker) processJobs() { Payload: job.Payload, CreatedAt: time.Now(), } - w.queue.EnqueueProof("zk_processing_queue", processingJob) + w.queue.EnqueueProof(w.processingQueueName, processingJob) err = w.processProofJob(job) w.removeFromProcessingQueue(job.ID) @@ -95,14 +132,38 @@ func (w *QueueWorker) processJobs() { logging.Logger().Error(). Err(err). Str("job_id", job.ID). - Int("worker_id", w.workerID). + Str("queue", w.queueName). Msg("Failed to process proof job") w.addToFailedQueue(job, err) } } -func (w *QueueWorker) processProofJob(job *ProofJob) error { +func (w *UpdateQueueWorker) Start() { + w.BaseQueueWorker.Start() +} + +func (w *UpdateQueueWorker) Stop() { + w.BaseQueueWorker.Stop() +} + +func (w *AppendQueueWorker) Start() { + w.BaseQueueWorker.Start() +} + +func (w *AppendQueueWorker) Stop() { + w.BaseQueueWorker.Stop() +} + +func (w *AddressAppendQueueWorker) Start() { + w.BaseQueueWorker.Start() +} + +func (w *AddressAppendQueueWorker) Stop() { + w.BaseQueueWorker.Stop() +} + +func (w *BaseQueueWorker) processProofJob(job *ProofJob) error { proofRequestMeta, err := prover.ParseProofRequestMeta(job.Payload) if err != nil { return fmt.Errorf("failed to parse proof request: %w", err) @@ -143,7 +204,7 @@ func (w *QueueWorker) processProofJob(job *ProofJob) error { return w.queue.StoreResult(job.ID, proof) } -func (w *QueueWorker) processInclusionProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { +func (w *BaseQueueWorker) processInclusionProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { var ps *prover.ProvingSystemV1 for _, provingSystem := range w.provingSystemsV1 { if provingSystem.InclusionNumberOfCompressedAccounts == uint32(meta.NumInputs) && @@ -176,7 +237,7 @@ func (w *QueueWorker) processInclusionProof(payload json.RawMessage, meta prover return nil, fmt.Errorf("unsupported version: %d", meta.Version) } -func (w *QueueWorker) processNonInclusionProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { +func (w *BaseQueueWorker) processNonInclusionProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { var ps *prover.ProvingSystemV1 for _, provingSystem := range w.provingSystemsV1 { if provingSystem.NonInclusionNumberOfCompressedAccounts == uint32(meta.NumAddresses) && @@ -208,7 +269,7 @@ func (w *QueueWorker) processNonInclusionProof(payload json.RawMessage, meta pro return nil, fmt.Errorf("unsupported address tree height: %d", meta.AddressTreeHeight) } -func (w *QueueWorker) processCombinedProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { +func (w *BaseQueueWorker) processCombinedProof(payload json.RawMessage, meta prover.ProofRequestMeta) (*prover.Proof, error) { var ps *prover.ProvingSystemV1 for _, provingSystem := range w.provingSystemsV1 { if provingSystem.InclusionNumberOfCompressedAccounts == meta.NumInputs && @@ -241,7 +302,7 @@ func (w *QueueWorker) processCombinedProof(payload json.RawMessage, meta prover. return nil, fmt.Errorf("unsupported address tree height: %d", meta.AddressTreeHeight) } -func (w *QueueWorker) processBatchUpdateProof(payload json.RawMessage) (*prover.Proof, error) { +func (w *BaseQueueWorker) processBatchUpdateProof(payload json.RawMessage) (*prover.Proof, error) { var params prover.BatchUpdateParameters if err := json.Unmarshal(payload, ¶ms); err != nil { return nil, fmt.Errorf("failed to unmarshal batch update parameters: %w", err) @@ -258,7 +319,7 @@ func (w *QueueWorker) processBatchUpdateProof(payload json.RawMessage) (*prover. return nil, fmt.Errorf("no proving system found for batch update with height %d and batch size %d", params.Height, params.BatchSize) } -func (w *QueueWorker) processBatchAppendWithProofsProof(payload json.RawMessage) (*prover.Proof, error) { +func (w *BaseQueueWorker) processBatchAppendWithProofsProof(payload json.RawMessage) (*prover.Proof, error) { var params prover.BatchAppendWithProofsParameters if err := json.Unmarshal(payload, ¶ms); err != nil { return nil, fmt.Errorf("failed to unmarshal batch append parameters: %w", err) @@ -275,7 +336,7 @@ func (w *QueueWorker) processBatchAppendWithProofsProof(payload json.RawMessage) return nil, fmt.Errorf("no proving system found for batch append with height %d and batch size %d", params.Height, params.BatchSize) } -func (w *QueueWorker) processBatchAddressAppendProof(payload json.RawMessage) (*prover.Proof, error) { +func (w *BaseQueueWorker) processBatchAddressAppendProof(payload json.RawMessage) (*prover.Proof, error) { var params prover.BatchAddressAppendParameters if err := json.Unmarshal(payload, ¶ms); err != nil { return nil, fmt.Errorf("failed to unmarshal batch address append parameters: %w", err) @@ -292,29 +353,28 @@ func (w *QueueWorker) processBatchAddressAppendProof(payload json.RawMessage) (* return nil, fmt.Errorf("no proving system found for batch address append with height %d and batch size %d", params.TreeHeight, params.BatchSize) } -func (w *QueueWorker) removeFromProcessingQueue(jobID string) { - processingQueueLength, _ := w.queue.Client.LLen(w.queue.Ctx, "zk_processing_queue").Result() +func (w *BaseQueueWorker) removeFromProcessingQueue(jobID string) { + processingQueueLength, _ := w.queue.Client.LLen(w.queue.Ctx, w.processingQueueName).Result() for i := int64(0); i < processingQueueLength; i++ { - item, err := w.queue.Client.LIndex(w.queue.Ctx, "zk_processing_queue", i).Result() + item, err := w.queue.Client.LIndex(w.queue.Ctx, w.processingQueueName, i).Result() if err != nil { continue } var job ProofJob if json.Unmarshal([]byte(item), &job) == nil && job.ID == jobID+"_processing" { - w.queue.Client.LRem(w.queue.Ctx, "zk_processing_queue", 1, item) + w.queue.Client.LRem(w.queue.Ctx, w.processingQueueName, 1, item) break } } } -func (w *QueueWorker) addToFailedQueue(job *ProofJob, err error) { +func (w *BaseQueueWorker) addToFailedQueue(job *ProofJob, err error) { failedJob := map[string]interface{}{ "original_job": job, "error": err.Error(), "failed_at": time.Now(), - "worker_id": w.workerID, } failedData, _ := json.Marshal(failedJob) diff --git a/prover/server/server/server.go b/prover/server/server/server.go index 3730b0b4e0..461bfbf7eb 100644 --- a/prover/server/server/server.go +++ b/prover/server/server/server.go @@ -98,20 +98,48 @@ func (handler proofStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque Msg("Job found but not completed") response := map[string]interface{}{ - "job_id": jobID, - "status": jobStatus, - "message": getStatusMessage(jobStatus), - } - - if jobInfo != nil { - if createdAt, ok := jobInfo["created_at"]; ok { - response["created_at"] = createdAt - } - if circuitType, ok := jobInfo["circuit_type"]; ok { - response["circuit_type"] = circuitType + "job_id": jobID, + "status": jobStatus, + } + + // Handle failed jobs specially - extract actual error details + if jobStatus == "failed" && jobInfo != nil { + if payloadRaw, ok := jobInfo["payload"]; ok { + if payloadStr, ok := payloadRaw.(string); ok { + var failureDetails map[string]interface{} + if err := json.Unmarshal([]byte(payloadStr), &failureDetails); err == nil { + if errorMsg, ok := failureDetails["error"].(string); ok { + response["message"] = fmt.Sprintf("Job processing failed: %s", errorMsg) + response["error"] = errorMsg + } + if failedAt, ok := failureDetails["failed_at"]; ok { + response["failed_at"] = failedAt + } + if originalJob, ok := failureDetails["original_job"].(map[string]interface{}); ok { + if circuitType, ok := originalJob["circuit_type"]; ok { + response["circuit_type"] = circuitType + } + } + } else { + response["message"] = "Job processing failed. Unable to parse failure details." + } + } else { + response["message"] = "Job processing failed. Unable to access failure details." + } + } else { + response["message"] = "Job processing failed. No failure details available." } - if priority, ok := jobInfo["priority"]; ok { - response["priority"] = priority + } else { + // Use generic message for non-failed jobs + response["message"] = getStatusMessage(jobStatus) + + if jobInfo != nil { + if createdAt, ok := jobInfo["created_at"]; ok { + response["created_at"] = createdAt + } + if circuitType, ok := jobInfo["circuit_type"]; ok { + response["circuit_type"] = circuitType + } } } @@ -141,15 +169,27 @@ func getStatusMessage(status string) string { } func (handler proofStatusHandler) checkJobExistsDetailed(jobID string) (bool, string, map[string]interface{}) { - if job, found := handler.findJobInQueue("zk_proof_queue", jobID); found { + if job, found := handler.findJobInQueue("zk_update_queue", jobID); found { + return true, "queued", job + } + + if job, found := handler.findJobInQueue("zk_append_queue", jobID); found { return true, "queued", job } - if job, found := handler.findJobInQueue("zk_priority_queue", jobID); found { + if job, found := handler.findJobInQueue("zk_address_append_queue", jobID); found { return true, "queued", job } - if job, found := handler.findJobInQueue("zk_processing_queue", jobID); found { + if job, found := handler.findJobInQueue("zk_update_processing_queue", jobID); found { + return true, "processing", job + } + + if job, found := handler.findJobInQueue("zk_append_processing_queue", jobID); found { + return true, "processing", job + } + + if job, found := handler.findJobInQueue("zk_address_append_processing_queue", jobID); found { return true, "processing", job } @@ -180,10 +220,12 @@ func (handler proofStatusHandler) findJobInQueue(queueName, jobID string) (map[s jobInfo := map[string]interface{}{ "created_at": job.CreatedAt, - "priority": job.Priority, } + // Include payload for all jobs, especially important for failed jobs if len(job.Payload) > 0 { + jobInfo["payload"] = string(job.Payload) + var meta map[string]interface{} if json.Unmarshal(job.Payload, &meta) == nil { if circuitType, ok := meta["circuit_type"]; ok { @@ -247,49 +289,47 @@ func (handler proveHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { forceAsync := r.Header.Get("X-Async") == "true" || r.URL.Query().Get("async") == "true" forceSync := r.Header.Get("X-Sync") == "true" || r.URL.Query().Get("sync") == "true" - priority := r.Header.Get("X-Priority") == "true" || r.URL.Query().Get("priority") == "true" - shouldUseQueue := handler.shouldUseQueueForCircuit(proofRequestMeta.CircuitType, forceAsync, forceSync, priority) + shouldUseQueue := handler.shouldUseQueueForCircuit(proofRequestMeta.CircuitType, forceAsync, forceSync) logging.Logger().Info(). Str("circuit_type", string(proofRequestMeta.CircuitType)). Bool("force_async", forceAsync). Bool("force_sync", forceSync). - Bool("priority", priority). Bool("use_queue", shouldUseQueue). Bool("queue_available", handler.enableQueue && handler.redisQueue != nil). Msg("Processing prove request") if shouldUseQueue && handler.enableQueue && handler.redisQueue != nil { - handler.handleAsyncProof(w, r, buf, priority, proofRequestMeta) + handler.handleAsyncProof(w, r, buf, proofRequestMeta) } else { handler.handleSyncProof(w, r, buf, proofRequestMeta) } } -func (handler proveHandler) shouldUseQueueForCircuit(circuitType prover.CircuitType, forceAsync, forceSync, priority bool) bool { - if forceAsync { - return true - } - if forceSync { +func (handler proveHandler) shouldUseQueueForCircuit(circuitType prover.CircuitType, forceAsync, forceSync bool) bool { + if !handler.enableQueue || handler.redisQueue == nil { return false } - if priority { + // Always use queue for batch operations when queue is available + // This prevents cross-contamination in clustered deployments + if circuitType == prover.BatchUpdateCircuitType || + circuitType == prover.BatchAppendWithProofsCircuitType || + circuitType == prover.BatchAddressAppendCircuitType { return true } - if !handler.enableQueue || handler.redisQueue == nil { - return false + // For non-batch operations, respect sync/async preferences + if forceAsync { + return true } - - if circuitType == prover.InclusionCircuitType || - circuitType == prover.NonInclusionCircuitType || - circuitType == prover.CombinedCircuitType { + if forceSync { return false } - return true + // Non-batch operations default to local processing + return false } type queueStatsHandler struct { @@ -310,8 +350,8 @@ func (handler queueStatsHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques response := map[string]interface{}{ "queues": stats, - "total_pending": stats["zk_proof_queue"] + stats["zk_priority_queue"], - "total_active": stats["zk_processing_queue"], + "total_pending": stats["zk_update_queue"] + stats["zk_append_queue"] + stats["zk_address_append_queue"], + "total_active": stats["zk_update_processing_queue"] + stats["zk_append_processing_queue"] + stats["zk_address_append_processing_queue"], "total_failed": stats["zk_failed_queue"], "timestamp": time.Now().Unix(), } @@ -372,24 +412,15 @@ func RunEnhanced(config *EnhancedConfig, redisQueue *RedisQueue, circuits []stri } jobID := uuid.New().String() - priority := r.Header.Get("X-Priority") == "true" job := &ProofJob{ ID: jobID, Type: "zk_proof", Payload: json.RawMessage(buf), CreatedAt: time.Now(), - Priority: 0, - } - - if priority { - job.Priority = 1 } - queueName := "zk_proof_queue" - if priority { - queueName = "zk_priority_queue" - } + queueName := GetQueueNameForCircuit(proofRequestMeta.CircuitType) err = redisQueue.EnqueueProof(queueName, job) if err != nil { @@ -402,7 +433,6 @@ func RunEnhanced(config *EnhancedConfig, redisQueue *RedisQueue, circuits []stri "status": "queued", "queue": queueName, "circuit_type": string(proofRequestMeta.CircuitType), - "priority": priority, "message": fmt.Sprintf("Job queued in %s", queueName), } @@ -419,7 +449,6 @@ func RunEnhanced(config *EnhancedConfig, redisQueue *RedisQueue, circuits []stri "Authorization", "X-Async", "X-Sync", - "X-Priority", }), handlers.AllowedOrigins([]string{"*"}), handlers.AllowedMethods([]string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}), @@ -510,7 +539,7 @@ func spawnServerJob(server *http.Server, label string) RunningJob { type healthHandler struct { } -func (handler proveHandler) handleAsyncProof(w http.ResponseWriter, r *http.Request, buf []byte, priority bool, meta prover.ProofRequestMeta) { +func (handler proveHandler) handleAsyncProof(w http.ResponseWriter, r *http.Request, buf []byte, meta prover.ProofRequestMeta) { jobID := uuid.New().String() job := &ProofJob{ @@ -518,17 +547,9 @@ func (handler proveHandler) handleAsyncProof(w http.ResponseWriter, r *http.Requ Type: "zk_proof", Payload: json.RawMessage(buf), CreatedAt: time.Now(), - Priority: 0, - } - - if priority { - job.Priority = 1 } - queueName := "zk_proof_queue" - if priority { - queueName = "zk_priority_queue" - } + queueName := GetQueueNameForCircuit(meta.CircuitType) err := handler.redisQueue.EnqueueProof(queueName, job) if err != nil { @@ -555,7 +576,6 @@ func (handler proveHandler) handleAsyncProof(w http.ResponseWriter, r *http.Requ "job_id": jobID, "status": "queued", "circuit_type": string(meta.CircuitType), - "priority": priority, "queue": queueName, "estimated_time": estimatedTime, "status_url": fmt.Sprintf("/prove/status?job_id=%s", jobID), @@ -653,6 +673,19 @@ func (handler proveHandler) isBatchOperation(circuitType prover.CircuitType) boo } } +func GetQueueNameForCircuit(circuitType prover.CircuitType) string { + switch circuitType { + case prover.BatchUpdateCircuitType: + return "zk_update_queue" + case prover.BatchAppendWithProofsCircuitType: + return "zk_append_queue" + case prover.BatchAddressAppendCircuitType: + return "zk_address_append_queue" + default: + return "zk_update_queue" + } +} + func (handler proveHandler) getEstimatedTime(circuitType prover.CircuitType) string { switch circuitType { case prover.InclusionCircuitType: