diff --git a/internal/app/app.go b/internal/app/app.go index 04f3bb0..a2e2b2b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -22,6 +22,7 @@ import ( transporthttp "gis/internal/transport/http" "github.com/go-playground/validator/v10" + "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" ) @@ -171,6 +172,12 @@ func (a *App) ExampleConsumer() *rabbitmq.Consumer { // Publisher returns the RabbitMQ publisher. func (a *App) Publisher() *rabbitmq.Publisher { return a.publisher } +// ReprocessDataset re-enqueues the processing job for the dataset with the given +// id, restarting its asynchronous pipeline. +func (a *App) ReprocessDataset(ctx context.Context, id uuid.UUID) (domain.Dataset, error) { + return a.datasets.Reprocess(ctx, id) +} + // Close releases all resources in reverse order of acquisition. func (a *App) Close() { if a.rabbit != nil { diff --git a/internal/cli/reprocess.go b/internal/cli/reprocess.go new file mode 100644 index 0000000..ce0b18b --- /dev/null +++ b/internal/cli/reprocess.go @@ -0,0 +1,44 @@ +package cli + +import ( + "fmt" + + "gis/internal/app" + + "github.com/google/uuid" + "github.com/spf13/cobra" +) + +var reprocessCmd = &cobra.Command{ + Use: "reprocess ", + Short: "Re-enqueue the processing job for an uploaded dataset", + Long: "Re-publish the RabbitMQ message that drives an uploaded dataset's\n" + + "asynchronous processing, selecting the right step from its file type\n" + + "(vector_with_kato -> parse, vector -> properties, raster -> cog).\n\n" + + "Example:\n" + + " gis reprocess 06818b2b-1fc5-47d9-a764-db2d4cb3df75", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + id, err := uuid.Parse(args[0]) + if err != nil { + return fmt.Errorf("invalid dataset id %q: %w", args[0], err) + } + + ctx, cancel := signalContext() + defer cancel() + + application, err := app.New(ctx) + if err != nil { + return err + } + defer application.Close() + + dataset, err := application.ReprocessDataset(ctx, id) + if err != nil { + return err + } + application.Log.Info("re-enqueued dataset processing", + "dataset_id", dataset.ID, "file_type", dataset.FileType, "status", dataset.Status) + return nil + }, +} diff --git a/internal/cli/root.go b/internal/cli/root.go index 13793a9..32a1de3 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -28,7 +28,7 @@ func Execute() { } func init() { - rootCmd.AddCommand(serveCmd, workerCmd, migrateCmd) + rootCmd.AddCommand(serveCmd, workerCmd, migrateCmd, reprocessCmd) } // signalContext returns a context cancelled on SIGINT or SIGTERM. diff --git a/internal/service/dataset.go b/internal/service/dataset.go index a3377cd..e0d38f2 100644 --- a/internal/service/dataset.go +++ b/internal/service/dataset.go @@ -211,18 +211,40 @@ func (s *DatasetService) Upload(ctx context.Context, in UploadInput) (domain.Dat // Kick off the appropriate async job per file type. If enqueueing fails the // row exists, so record the failure rather than leaving it stuck. - var enqueueErr error - switch in.FileType { - case domain.FileTypeVectorWithKato: - enqueueErr = s.jobs.EnqueueParse(ctx, dataset.ID) - case domain.FileTypeVector: - enqueueErr = s.jobs.EnqueueProperties(ctx, dataset.ID) - case domain.FileTypeRaster: - enqueueErr = s.jobs.EnqueueConvert(ctx, dataset.ID) + if err := s.enqueueProcessing(ctx, dataset); err != nil { + _ = s.repo.MarkParseFailed(ctx, dataset.ID, "failed to enqueue processing: "+err.Error()) + return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err) } - if enqueueErr != nil { - _ = s.repo.MarkParseFailed(ctx, dataset.ID, "failed to enqueue processing: "+enqueueErr.Error()) - return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", enqueueErr) + return dataset, nil +} + +// enqueueProcessing schedules the appropriate async job for a dataset based on +// its file type: vector_with_kato is parsed for column selection, plain vector +// has its attribute table extracted into properties, and raster is converted to +// a COG. +func (s *DatasetService) enqueueProcessing(ctx context.Context, d domain.Dataset) error { + switch d.FileType { + case domain.FileTypeVectorWithKato: + return s.jobs.EnqueueParse(ctx, d.ID) + case domain.FileTypeVector: + return s.jobs.EnqueueProperties(ctx, d.ID) + case domain.FileTypeRaster: + return s.jobs.EnqueueConvert(ctx, d.ID) + default: + return fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, d.FileType) + } +} + +// Reprocess re-enqueues the processing job for an existing dataset, restarting +// its asynchronous pipeline from the step appropriate to its file type. Useful +// for retrying after a transient failure or a worker that was behind the schema. +func (s *DatasetService) Reprocess(ctx context.Context, id uuid.UUID) (domain.Dataset, error) { + dataset, err := s.repo.GetByID(ctx, id) + if err != nil { + return domain.Dataset{}, err + } + if err := s.enqueueProcessing(ctx, dataset); err != nil { + return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err) } return dataset, nil }