diff --git a/internal/app/app.go b/internal/app/app.go index a2e2b2b..c335f4a 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -178,6 +178,13 @@ func (a *App) ReprocessDataset(ctx context.Context, id uuid.UUID) (domain.Datase return a.datasets.Reprocess(ctx, id) } +// ReprocessAllDatasets re-enqueues the processing job for every dataset, +// restarting each one's asynchronous pipeline. It returns how many jobs were +// enqueued and a per-dataset map of any failures. +func (a *App) ReprocessAllDatasets(ctx context.Context) (int, map[uuid.UUID]error, error) { + return a.datasets.ReprocessAll(ctx) +} + // Close releases all resources in reverse order of acquisition. func (a *App) Close() { if a.rabbit != nil { diff --git a/internal/cli/reprocess.go b/internal/cli/reprocess.go index ce0b18b..8213e30 100644 --- a/internal/cli/reprocess.go +++ b/internal/cli/reprocess.go @@ -9,19 +9,23 @@ import ( "github.com/spf13/cobra" ) +var reprocessAll bool + var reprocessCmd = &cobra.Command{ - Use: "reprocess ", - Short: "Re-enqueue the processing job for an uploaded dataset", + Use: "reprocess [dataset-id]", + Short: "Re-enqueue the processing job for uploaded datasets", Long: "Re-publish the RabbitMQ message that drives an uploaded dataset's\n" + "asynchronous processing, selecting the right step from its file type\n" + "(vector_with_kato -> parse, vector -> properties, raster -> cog).\n\n" + - "Example:\n" + - " gis reprocess 06818b2b-1fc5-47d9-a764-db2d4cb3df75", - Args: cobra.ExactArgs(1), + "Pass a dataset id to reprocess a single dataset, or --all to reprocess\n" + + "every dataset.\n\n" + + "Examples:\n" + + " gis reprocess 06818b2b-1fc5-47d9-a764-db2d4cb3df75\n" + + " gis reprocess --all", + Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - id, err := uuid.Parse(args[0]) - if err != nil { - return fmt.Errorf("invalid dataset id %q: %w", args[0], err) + if reprocessAll == (len(args) == 1) { + return fmt.Errorf("provide exactly one of or --all") } ctx, cancel := signalContext() @@ -33,6 +37,28 @@ var reprocessCmd = &cobra.Command{ } defer application.Close() + if reprocessAll { + enqueued, failures, err := application.ReprocessAllDatasets(ctx) + if err != nil { + return err + } + for id, ferr := range failures { + application.Log.Error("failed to re-enqueue dataset processing", + "dataset_id", id, "error", ferr) + } + application.Log.Info("re-enqueued all dataset processing", + "enqueued", enqueued, "failed", len(failures)) + if len(failures) > 0 { + return fmt.Errorf("%d of %d datasets failed to re-enqueue", len(failures), enqueued+len(failures)) + } + return nil + } + + id, err := uuid.Parse(args[0]) + if err != nil { + return fmt.Errorf("invalid dataset id %q: %w", args[0], err) + } + dataset, err := application.ReprocessDataset(ctx, id) if err != nil { return err @@ -42,3 +68,7 @@ var reprocessCmd = &cobra.Command{ return nil }, } + +func init() { + reprocessCmd.Flags().BoolVar(&reprocessAll, "all", false, "reprocess every dataset") +} diff --git a/internal/service/dataset.go b/internal/service/dataset.go index dc65273..a6ee1fb 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -253,6 +253,35 @@ func (s *DatasetService) Reprocess(ctx context.Context, id uuid.UUID) (domain.Da return dataset, nil } +// ReprocessAll re-enqueues the processing job for every dataset, restarting each +// one's asynchronous pipeline from the step appropriate to its file type. It +// paginates through all datasets and continues past individual failures, +// returning how many jobs were enqueued and a per-dataset map of any failures. +func (s *DatasetService) ReprocessAll(ctx context.Context) (enqueued int, failures map[uuid.UUID]error, err error) { + failures = make(map[uuid.UUID]error) + for offset := 0; ; { + summaries, err := s.repo.ListSummaries(ctx, nil, MaxPageSize, offset) + if err != nil { + return enqueued, failures, err + } + if len(summaries) == 0 { + break + } + for _, sum := range summaries { + if e := s.enqueueProcessing(ctx, domain.Dataset{ID: sum.ID, FileType: sum.FileType}); e != nil { + failures[sum.ID] = e + continue + } + enqueued++ + } + if len(summaries) < MaxPageSize { + break + } + offset += len(summaries) + } + return enqueued, failures, nil +} + // ExtractProperties reads a plain vector dataset's attribute table and spatial // geometry and stores them (the attribute table as a JSON array of row objects // in the properties column, the dissolved feature geometry in the geometry