diff --git a/internal/messaging/rabbitmq/parse_consumer.go b/internal/messaging/rabbitmq/parse_consumer.go index 640318a..138fe56 100644 --- a/internal/messaging/rabbitmq/parse_consumer.go +++ b/internal/messaging/rabbitmq/parse_consumer.go @@ -79,12 +79,16 @@ type DatasetProcessor interface { ExtractProperties(ctx context.Context, datasetID uuid.UUID) error Extract(ctx context.Context, datasetID uuid.UUID) error ConvertToCOG(ctx context.Context, datasetID uuid.UUID) error + // MarkFailed records a terminal processing failure on the dataset so it does + // not remain stuck in an in-progress status. + MarkFailed(ctx context.Context, datasetID uuid.UUID, reason string) error } // jobHandler dispatches a dataset job to one processor function. type jobHandler struct { name string fn func(ctx context.Context, id uuid.UUID) error + proc DatasetProcessor log *slog.Logger } @@ -94,25 +98,37 @@ func (h jobHandler) Handle(ctx context.Context, d amqp.Delivery) error { return fmt.Errorf("decode %s job: %w", h.name, err) } h.log.Info("processing dataset "+h.name, "dataset_id", job.DatasetID) - return h.fn(ctx, job.DatasetID) + + err := h.fn(ctx, job.DatasetID) + if err != nil { + // The delivery is nacked without requeue (see Consumer.dispatch), so the + // job will not be retried. Record the failure on the dataset itself; + // otherwise it stays stuck in an in-progress status forever (e.g. when a + // PostGIS insert returns an error mid-job). Best-effort: log if marking + // fails, but still return the original error so the delivery is nacked. + if markErr := h.proc.MarkFailed(ctx, job.DatasetID, err.Error()); markErr != nil { + h.log.Error("mark dataset failed", "dataset_id", job.DatasetID, "job", h.name, "error", markErr) + } + } + return err } // NewParseHandler returns a handler that parses datasets. func NewParseHandler(p DatasetProcessor, log *slog.Logger) Handler { - return jobHandler{name: "parse", fn: p.Parse, log: log} + return jobHandler{name: "parse", fn: p.Parse, proc: p, log: log} } // NewPropertiesHandler returns a handler that extracts a plain vector's table. func NewPropertiesHandler(p DatasetProcessor, log *slog.Logger) Handler { - return jobHandler{name: "properties", fn: p.ExtractProperties, log: log} + return jobHandler{name: "properties", fn: p.ExtractProperties, proc: p, log: log} } // NewExtractHandler returns a handler that extracts (unpivots) datasets. func NewExtractHandler(p DatasetProcessor, log *slog.Logger) Handler { - return jobHandler{name: "extract", fn: p.Extract, log: log} + return jobHandler{name: "extract", fn: p.Extract, proc: p, log: log} } // NewConvertHandler returns a handler that converts rasters to COGs. func NewConvertHandler(p DatasetProcessor, log *slog.Logger) Handler { - return jobHandler{name: "convert", fn: p.ConvertToCOG, log: log} + return jobHandler{name: "convert", fn: p.ConvertToCOG, proc: p, log: log} } diff --git a/internal/messaging/rabbitmq/parse_consumer_test.go b/internal/messaging/rabbitmq/parse_consumer_test.go new file mode 100644 index 0000000..2024847 --- /dev/null +++ b/internal/messaging/rabbitmq/parse_consumer_test.go @@ -0,0 +1,78 @@ +package rabbitmq + +import ( + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "testing" + + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" +) + +// fakeProcessor records MarkFailed calls and returns a preset error from jobs. +type fakeProcessor struct { + jobErr error + failedID uuid.UUID + failedWith string + failedCnt int +} + +func (f *fakeProcessor) Parse(context.Context, uuid.UUID) error { return f.jobErr } +func (f *fakeProcessor) ExtractProperties(context.Context, uuid.UUID) error { return f.jobErr } +func (f *fakeProcessor) Extract(context.Context, uuid.UUID) error { return f.jobErr } +func (f *fakeProcessor) ConvertToCOG(context.Context, uuid.UUID) error { return f.jobErr } + +func (f *fakeProcessor) MarkFailed(_ context.Context, id uuid.UUID, reason string) error { + f.failedCnt++ + f.failedID = id + f.failedWith = reason + return nil +} + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func delivery(t *testing.T, id uuid.UUID) amqp.Delivery { + t.Helper() + body, err := json.Marshal(DatasetJob{DatasetID: id}) + if err != nil { + t.Fatalf("marshal job: %v", err) + } + return amqp.Delivery{Body: body} +} + +func TestJobHandler_MarksDatasetFailedOnJobError(t *testing.T) { + id := uuid.New() + fp := &fakeProcessor{jobErr: errors.New("insert into postgis failed")} + h := NewPropertiesHandler(fp, discardLogger()) + + err := h.Handle(context.Background(), delivery(t, id)) + if err == nil { + t.Fatal("expected the job error to be returned so the delivery is nacked") + } + if fp.failedCnt != 1 { + t.Fatalf("expected MarkFailed to be called once, got %d", fp.failedCnt) + } + if fp.failedID != id { + t.Fatalf("MarkFailed called with wrong id: got %s want %s", fp.failedID, id) + } + if fp.failedWith != "insert into postgis failed" { + t.Fatalf("MarkFailed called with wrong reason: %q", fp.failedWith) + } +} + +func TestJobHandler_DoesNotMarkFailedOnSuccess(t *testing.T) { + fp := &fakeProcessor{jobErr: nil} + h := NewExtractHandler(fp, discardLogger()) + + if err := h.Handle(context.Background(), delivery(t, uuid.New())); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fp.failedCnt != 0 { + t.Fatalf("expected MarkFailed not to be called on success, got %d", fp.failedCnt) + } +} diff --git a/internal/service/dataset.go b/internal/service/dataset.go index a0f4795..29db6a0 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -457,6 +457,17 @@ func (s *DatasetService) Parse(ctx context.Context, id uuid.UUID) error { return s.repo.MarkParsed(ctx, id, cols) } +// MarkFailed records a terminal processing failure on a dataset, moving it out +// of any in-progress status. The worker calls this when a job returns an error +// (e.g. a PostGIS insert failure) so the dataset is not left stuck. Marking a +// dataset that no longer exists is not treated as an error. +func (s *DatasetService) MarkFailed(ctx context.Context, id uuid.UUID, reason string) error { + if err := s.repo.MarkParseFailed(ctx, id, reason); err != nil && !errors.Is(err, domain.ErrNotFound) { + return err + } + return nil +} + func (s *DatasetService) fetchObject(ctx context.Context, key string) ([]byte, error) { obj, err := s.store.Get(ctx, key) if err != nil {