diff --git a/forester/tests/e2e_v1_test.rs b/forester/tests/e2e_v1_test.rs index f2149f5457..a00202b550 100644 --- a/forester/tests/e2e_v1_test.rs +++ b/forester/tests/e2e_v1_test.rs @@ -234,7 +234,7 @@ async fn test_e2e_v1() { work_report_sender2, )); - const EXPECTED_EPOCHS: u64 = 3; // We expect to process 2 epochs (0 and 1) + const EXPECTED_EPOCHS: u64 = 2; // We expect to process 2 epochs (0 and 1) let mut processed_epochs = HashSet::new(); let mut total_processed = 0; diff --git a/forester/tests/e2e_v2_test.rs b/forester/tests/e2e_v2_test.rs index f60205f0ca..76bd4dd0b3 100644 --- a/forester/tests/e2e_v2_test.rs +++ b/forester/tests/e2e_v2_test.rs @@ -64,7 +64,7 @@ use crate::test_utils::{get_registration_phase_start_slot, init, wait_for_slot}; mod test_utils; const MINT_TO_NUM: u64 = 5; -const DEFAULT_TIMEOUT_SECONDS: u64 = 60 * 15; +const DEFAULT_TIMEOUT_SECONDS: u64 = 60 * 5; const COMPUTE_BUDGET_LIMIT: u32 = 1_000_000; #[derive(Debug, Clone, Copy, PartialEq)] @@ -741,7 +741,7 @@ async fn execute_test_transactions address_v1_counter: &mut u64, address_v2_counter: &mut u64, ) { - let mut iterations = 10; + let mut iterations = 4; if is_v2_state_test_enabled() { let batch_size = get_state_v2_batch_size(rpc, &env.v2_state_trees[0].merkle_tree).await as usize; diff --git a/prover/server/main.go b/prover/server/main.go index 4aee2071a8..886b2900bb 100644 --- a/prover/server/main.go +++ b/prover/server/main.go @@ -970,6 +970,22 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) { logging.Logger().Info().Msg("Startup cleanup of old proof requests completed") } + if err := redisQueue.CleanupStuckProcessingJobs(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup stuck processing jobs on startup") + } else { + logging.Logger().Info().Msg("Startup cleanup of stuck processing jobs completed") + } + + if err := redisQueue.CleanupOldFailedJobs(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old failed jobs on startup") + } else { + logging.Logger().Info().Msg("Startup cleanup of old failed jobs completed") + } + if err := redisQueue.CleanupOldResults(); err != nil { logging.Logger().Error(). Err(err). @@ -978,6 +994,31 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) { logging.Logger().Info().Msg("Startup cleanup of old results completed") } + if err := redisQueue.CleanupOldResultKeys(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old result keys on startup") + } else { + logging.Logger().Info().Msg("Startup cleanup of old result keys completed") + } + + go func() { + processingTicker := time.NewTicker(10 * time.Second) + defer processingTicker.Stop() + + logging.Logger().Info().Msg("Started stuck processing jobs cleanup routine (every 10 seconds)") + + for range processingTicker.C { + if err := redisQueue.CleanupStuckProcessingJobs(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup stuck processing jobs") + } else { + logging.Logger().Debug().Msg("Stuck processing jobs cleanup completed") + } + } + }() + // Start cleanup for old proof requests (every 10 minutes) go func() { requestTicker := time.NewTicker(10 * time.Minute) @@ -996,12 +1037,30 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) { } }() - // Start less frequent cleanup for old results (every 1 hour) + // Start cleanup for old failed jobs (every 30 minutes) + go func() { + failedTicker := time.NewTicker(30 * time.Minute) + defer failedTicker.Stop() + + logging.Logger().Info().Msg("Started old failed jobs cleanup routine (every 30 minutes)") + + for range failedTicker.C { + if err := redisQueue.CleanupOldFailedJobs(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old failed jobs") + } else { + logging.Logger().Debug().Msg("Old failed jobs cleanup completed") + } + } + }() + + // Start cleanup for old results (every 30 minutes) go func() { - resultTicker := time.NewTicker(1 * time.Hour) + resultTicker := time.NewTicker(30 * time.Minute) defer resultTicker.Stop() - logging.Logger().Info().Msg("Started old results cleanup routine (every 1 hour)") + logging.Logger().Info().Msg("Started old results cleanup routine (every 30 minutes)") for range resultTicker.C { if err := redisQueue.CleanupOldResults(); err != nil { @@ -1011,6 +1070,14 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) { } else { logging.Logger().Debug().Msg("Old results cleanup completed") } + + if err := redisQueue.CleanupOldResultKeys(); err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old result keys") + } else { + logging.Logger().Debug().Msg("Old result keys cleanup completed") + } } }() } diff --git a/prover/server/server/queue.go b/prover/server/server/queue.go index 834b2e4d19..b27361c935 100644 --- a/prover/server/server/queue.go +++ b/prover/server/server/queue.go @@ -92,6 +92,65 @@ func (rq *RedisQueue) GetQueueStats() (map[string]int64, error) { return stats, nil } +func (rq *RedisQueue) GetQueueHealth() (map[string]interface{}, error) { + stats, err := rq.GetQueueStats() + if err != nil { + return nil, err + } + + health := make(map[string]interface{}) + health["queue_lengths"] = stats + health["timestamp"] = time.Now().Unix() + + health["total_pending"] = stats["zk_update_queue"] + stats["zk_append_queue"] + stats["zk_address_append_queue"] + health["total_processing"] = stats["zk_update_processing_queue"] + stats["zk_append_processing_queue"] + stats["zk_address_append_processing_queue"] + health["total_failed"] = stats["zk_failed_queue"] + health["total_results"] = stats["zk_results_queue"] + + stuckJobs := rq.countStuckJobs() + health["stuck_jobs"] = stuckJobs + + healthStatus := "healthy" + if stuckJobs > 0 { + healthStatus = "degraded" + } + if health["total_failed"].(int64) > 50 { + healthStatus = "unhealthy" + } + health["status"] = healthStatus + + return health, nil +} + +func (rq *RedisQueue) countStuckJobs() int64 { + stuckTimeout := time.Now().Add(-2 * time.Minute) + processingQueues := []string{ + "zk_update_processing_queue", + "zk_append_processing_queue", + "zk_address_append_processing_queue", + } + + var totalStuck int64 + + for _, queueName := range processingQueues { + items, err := rq.Client.LRange(rq.Ctx, queueName, 0, -1).Result() + if err != nil { + continue + } + + for _, item := range items { + var job ProofJob + if json.Unmarshal([]byte(item), &job) == nil { + if job.CreatedAt.Before(stuckTimeout) { + totalStuck++ + } + } + } + } + + return totalStuck +} + func (rq *RedisQueue) GetResult(jobID string) (interface{}, error) { key := fmt.Sprintf("zk_result_%s", jobID) result, err := rq.Client.Get(rq.Ctx, key).Result() @@ -153,7 +212,7 @@ func (rq *RedisQueue) StoreResult(jobID string, result interface{}) error { } key := fmt.Sprintf("zk_result_%s", jobID) - err = rq.Client.Set(rq.Ctx, key, resultData, 24*time.Hour).Err() + err = rq.Client.Set(rq.Ctx, key, resultData, 1*time.Hour).Err() if err != nil { return fmt.Errorf("failed to store result: %w", err) } @@ -167,8 +226,24 @@ func (rq *RedisQueue) StoreResult(jobID string, result interface{}) error { } func (rq *RedisQueue) CleanupOldResults() error { - ctx := context.Background() + // Remove successful results older than 1 hour + cutoffTime := time.Now().Add(-1 * time.Hour) + removed, err := rq.cleanupOldRequestsFromQueue("zk_results_queue", cutoffTime) + if err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old results by time") + } + + if removed > 0 { + logging.Logger().Info(). + Int64("removed_results", removed). + Time("cutoff_time", cutoffTime). + Msg("Cleaned up old results by time") + } + + ctx := context.Background() length, err := rq.Client.LLen(ctx, "zk_results_queue").Result() if err != nil { return err @@ -182,7 +257,7 @@ func (rq *RedisQueue) CleanupOldResults() error { logging.Logger().Info(). Int64("removed_items", toRemove). - Msg("Cleaned up old results from queue") + Msg("Cleaned up old results from queue (length-based safety)") } return nil @@ -190,16 +265,16 @@ func (rq *RedisQueue) CleanupOldResults() error { func (rq *RedisQueue) CleanupOldRequests() error { cutoffTime := time.Now().Add(-30 * time.Minute) - + // Queues to clean up old requests from queuesToClean := []string{ "zk_update_queue", - "zk_append_queue", + "zk_append_queue", "zk_address_append_queue", } totalRemoved := int64(0) - + for _, queueName := range queuesToClean { removed, err := rq.cleanupOldRequestsFromQueue(queueName, cutoffTime) if err != nil { @@ -222,6 +297,220 @@ func (rq *RedisQueue) CleanupOldRequests() error { return nil } +func (rq *RedisQueue) CleanupOldResultKeys() error { + ctx := context.Background() + + keys, err := rq.Client.Keys(ctx, "zk_result_*").Result() + if err != nil { + return fmt.Errorf("failed to get result keys: %w", err) + } + + var removedCount int64 + + for _, key := range keys { + ttl, err := rq.Client.TTL(ctx, key).Result() + if err != nil { + continue + } + + if ttl == -1 || ttl < -time.Hour { + err = rq.Client.Del(ctx, key).Err() + if err != nil { + logging.Logger().Error(). + Err(err). + Str("key", key). + Msg("Failed to delete old result key") + } else { + removedCount++ + logging.Logger().Debug(). + Str("key", key). + Dur("ttl", ttl). + Msg("Removed old result key") + } + } + } + + if removedCount > 0 { + logging.Logger().Info(). + Int64("removed_keys", removedCount). + Msg("Cleaned up old result keys") + } + + return nil +} + +func (rq *RedisQueue) CleanupStuckProcessingJobs() error { + // Jobs stuck in processing for more than 2 minutes are considered stuck + processingTimeout := time.Now().Add(-2 * time.Minute) + + processingQueues := []string{ + "zk_update_processing_queue", + "zk_append_processing_queue", + "zk_address_append_processing_queue", + } + + totalRecovered := int64(0) + totalFailed := int64(0) + + for _, queueName := range processingQueues { + recovered, failed, err := rq.recoverStuckJobsFromQueue(queueName, processingTimeout) + if err != nil { + logging.Logger().Error(). + Err(err). + Str("queue", queueName). + Msg("Failed to recover stuck jobs from processing queue") + continue + } + totalRecovered += recovered + totalFailed += failed + } + + if totalRecovered > 0 || totalFailed > 0 { + logging.Logger().Info(). + Int64("recovered_jobs", totalRecovered). + Int64("failed_jobs", totalFailed). + Time("timeout_cutoff", processingTimeout). + Msg("Processed stuck jobs from processing queues") + } + + return nil +} + +func (rq *RedisQueue) CleanupOldFailedJobs() error { + cutoffTime := time.Now().Add(-1 * time.Hour) + + removed, err := rq.cleanupOldRequestsFromQueue("zk_failed_queue", cutoffTime) + if err != nil { + logging.Logger().Error(). + Err(err). + Msg("Failed to cleanup old failed jobs") + return err + } + + if removed > 0 { + logging.Logger().Info(). + Int64("removed_failed_jobs", removed). + Time("cutoff_time", cutoffTime). + Msg("Cleaned up old failed jobs") + } + + return nil +} + +func (rq *RedisQueue) recoverStuckJobsFromQueue(queueName string, timeoutCutoff time.Time) (int64, int64, error) { + items, err := rq.Client.LRange(rq.Ctx, queueName, 0, -1).Result() + if err != nil { + return 0, 0, fmt.Errorf("failed to get processing queue items: %w", err) + } + + var recoveredCount int64 + var failedCount int64 + + for _, item := range items { + var job ProofJob + if json.Unmarshal([]byte(item), &job) == nil { + if job.CreatedAt.Before(timeoutCutoff) { + count, err := rq.Client.LRem(rq.Ctx, queueName, 1, item).Result() + if err != nil { + logging.Logger().Error(). + Err(err). + Str("job_id", job.ID). + Str("queue", queueName). + Msg("Failed to remove stuck job from processing queue") + continue + } + + if count > 0 { + originalJobID := job.ID + if len(job.ID) > 11 && job.ID[len(job.ID)-11:] == "_processing" { + originalJobID = job.ID[:len(job.ID)-11] + } + + fiveMinutesAgo := time.Now().Add(-5 * time.Minute) + if job.CreatedAt.Before(fiveMinutesAgo) { + failureDetails := map[string]interface{}{ + "original_job": map[string]interface{}{ + "id": originalJobID, + "type": "zk_proof", + "payload": job.Payload, + "created_at": job.CreatedAt, + }, + "error": "Job timed out in processing queue (stuck for >5 minutes)", + "failed_at": time.Now(), + "timeout": true, + } + + failedData, _ := json.Marshal(failureDetails) + failedJob := &ProofJob{ + ID: originalJobID + "_failed", + Type: "failed", + Payload: json.RawMessage(failedData), + CreatedAt: time.Now(), + } + + err = rq.EnqueueProof("zk_failed_queue", failedJob) + if err != nil { + logging.Logger().Error(). + Err(err). + Str("job_id", originalJobID). + Msg("Failed to move timed out job to failed queue") + } else { + failedCount++ + logging.Logger().Warn(). + Str("job_id", originalJobID). + Str("processing_queue", queueName). + Time("stuck_since", job.CreatedAt). + Msg("Moved timed out job to failed queue (processing timeout >5min)") + } + } else { + originalQueue := getOriginalQueueFromProcessing(queueName) + if originalQueue != "" { + originalJob := &ProofJob{ + ID: originalJobID, + Type: "zk_proof", + Payload: job.Payload, + CreatedAt: job.CreatedAt, + } + + err = rq.EnqueueProof(originalQueue, originalJob) + if err != nil { + logging.Logger().Error(). + Err(err). + Str("job_id", originalJobID). + Str("target_queue", originalQueue). + Msg("Failed to recover stuck job") + } else { + recoveredCount++ + logging.Logger().Info(). + Str("job_id", originalJobID). + Str("from_queue", queueName). + Str("to_queue", originalQueue). + Time("stuck_since", job.CreatedAt). + Msg("Recovered stuck job") + } + } + } + } + } + } + } + + return recoveredCount, failedCount, nil +} + +func getOriginalQueueFromProcessing(processingQueueName string) string { + switch processingQueueName { + case "zk_update_processing_queue": + return "zk_update_queue" + case "zk_append_processing_queue": + return "zk_append_queue" + case "zk_address_append_processing_queue": + return "zk_address_append_queue" + default: + return "" + } +} + func (rq *RedisQueue) cleanupOldRequestsFromQueue(queueName string, cutoffTime time.Time) (int64, error) { items, err := rq.Client.LRange(rq.Ctx, queueName, 0, -1).Result() if err != nil { @@ -229,7 +518,7 @@ func (rq *RedisQueue) cleanupOldRequestsFromQueue(queueName string, cutoffTime t } var removedCount int64 - + for _, item := range items { var job ProofJob if json.Unmarshal([]byte(item), &job) == nil { diff --git a/prover/server/server/server.go b/prover/server/server/server.go index 7fa41b8ee4..b12ebdc6dd 100644 --- a/prover/server/server/server.go +++ b/prover/server/server/server.go @@ -360,6 +360,88 @@ func (handler queueStatsHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques json.NewEncoder(w).Encode(response) } +type queueHealthHandler struct { + redisQueue *RedisQueue +} + +func (handler queueHealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + health, err := handler.redisQueue.GetQueueHealth() + if err != nil { + unexpectedError(err).send(w) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(health) +} + +type queueCleanupHandler struct { + redisQueue *RedisQueue +} + +func (handler queueCleanupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + results := make(map[string]interface{}) + + if err := handler.redisQueue.CleanupOldRequests(); err != nil { + results["old_requests_cleanup"] = map[string]interface{}{ + "success": false, + "error": err.Error(), + } + } else { + results["old_requests_cleanup"] = map[string]interface{}{ + "success": true, + } + } + + if err := handler.redisQueue.CleanupStuckProcessingJobs(); err != nil { + results["stuck_jobs_cleanup"] = map[string]interface{}{ + "success": false, + "error": err.Error(), + } + } else { + results["stuck_jobs_cleanup"] = map[string]interface{}{ + "success": true, + } + } + + if err := handler.redisQueue.CleanupOldFailedJobs(); err != nil { + results["old_failed_cleanup"] = map[string]interface{}{ + "success": false, + "error": err.Error(), + } + } else { + results["old_failed_cleanup"] = map[string]interface{}{ + "success": true, + } + } + + if err := handler.redisQueue.CleanupOldResultKeys(); err != nil { + results["old_result_keys_cleanup"] = map[string]interface{}{ + "success": false, + "error": err.Error(), + } + } else { + results["old_result_keys_cleanup"] = map[string]interface{}{ + "success": true, + } + } + + results["timestamp"] = time.Now().Unix() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(results) +} + func RunWithQueue(config *Config, redisQueue *RedisQueue, circuits []string, runMode prover.RunMode, provingSystemsV1 []*prover.ProvingSystemV1, provingSystemsV2 []*prover.ProvingSystemV2) RunningJob { return RunEnhanced(&EnhancedConfig{ ProverAddress: config.ProverAddress, @@ -392,6 +474,8 @@ func RunEnhanced(config *EnhancedConfig, redisQueue *RedisQueue, circuits []stri if redisQueue != nil { proverMux.Handle("/prove/status", proofStatusHandler{redisQueue: redisQueue}) proverMux.Handle("/queue/stats", queueStatsHandler{redisQueue: redisQueue}) + proverMux.Handle("/queue/health", queueHealthHandler{redisQueue: redisQueue}) + proverMux.Handle("/queue/cleanup", queueCleanupHandler{redisQueue: redisQueue}) proverMux.HandleFunc("/queue/add", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost {