feat: reprocess all command
This commit is contained in:
parent
1e71b94fdb
commit
30f7ae1e07
@ -178,6 +178,13 @@ func (a *App) ReprocessDataset(ctx context.Context, id uuid.UUID) (domain.Datase
|
||||
return a.datasets.Reprocess(ctx, id)
|
||||
}
|
||||
|
||||
// ReprocessAllDatasets re-enqueues the processing job for every dataset,
|
||||
// restarting each one's asynchronous pipeline. It returns how many jobs were
|
||||
// enqueued and a per-dataset map of any failures.
|
||||
func (a *App) ReprocessAllDatasets(ctx context.Context) (int, map[uuid.UUID]error, error) {
|
||||
return a.datasets.ReprocessAll(ctx)
|
||||
}
|
||||
|
||||
// Close releases all resources in reverse order of acquisition.
|
||||
func (a *App) Close() {
|
||||
if a.rabbit != nil {
|
||||
|
||||
@ -9,19 +9,23 @@ import (
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var reprocessAll bool
|
||||
|
||||
var reprocessCmd = &cobra.Command{
|
||||
Use: "reprocess <dataset-id>",
|
||||
Short: "Re-enqueue the processing job for an uploaded dataset",
|
||||
Use: "reprocess [dataset-id]",
|
||||
Short: "Re-enqueue the processing job for uploaded datasets",
|
||||
Long: "Re-publish the RabbitMQ message that drives an uploaded dataset's\n" +
|
||||
"asynchronous processing, selecting the right step from its file type\n" +
|
||||
"(vector_with_kato -> parse, vector -> properties, raster -> cog).\n\n" +
|
||||
"Example:\n" +
|
||||
" gis reprocess 06818b2b-1fc5-47d9-a764-db2d4cb3df75",
|
||||
Args: cobra.ExactArgs(1),
|
||||
"Pass a dataset id to reprocess a single dataset, or --all to reprocess\n" +
|
||||
"every dataset.\n\n" +
|
||||
"Examples:\n" +
|
||||
" gis reprocess 06818b2b-1fc5-47d9-a764-db2d4cb3df75\n" +
|
||||
" gis reprocess --all",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
id, err := uuid.Parse(args[0])
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid dataset id %q: %w", args[0], err)
|
||||
if reprocessAll == (len(args) == 1) {
|
||||
return fmt.Errorf("provide exactly one of <dataset-id> or --all")
|
||||
}
|
||||
|
||||
ctx, cancel := signalContext()
|
||||
@ -33,6 +37,28 @@ var reprocessCmd = &cobra.Command{
|
||||
}
|
||||
defer application.Close()
|
||||
|
||||
if reprocessAll {
|
||||
enqueued, failures, err := application.ReprocessAllDatasets(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for id, ferr := range failures {
|
||||
application.Log.Error("failed to re-enqueue dataset processing",
|
||||
"dataset_id", id, "error", ferr)
|
||||
}
|
||||
application.Log.Info("re-enqueued all dataset processing",
|
||||
"enqueued", enqueued, "failed", len(failures))
|
||||
if len(failures) > 0 {
|
||||
return fmt.Errorf("%d of %d datasets failed to re-enqueue", len(failures), enqueued+len(failures))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := uuid.Parse(args[0])
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid dataset id %q: %w", args[0], err)
|
||||
}
|
||||
|
||||
dataset, err := application.ReprocessDataset(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -42,3 +68,7 @@ var reprocessCmd = &cobra.Command{
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
reprocessCmd.Flags().BoolVar(&reprocessAll, "all", false, "reprocess every dataset")
|
||||
}
|
||||
|
||||
@ -253,6 +253,35 @@ func (s *DatasetService) Reprocess(ctx context.Context, id uuid.UUID) (domain.Da
|
||||
return dataset, nil
|
||||
}
|
||||
|
||||
// ReprocessAll re-enqueues the processing job for every dataset, restarting each
|
||||
// one's asynchronous pipeline from the step appropriate to its file type. It
|
||||
// paginates through all datasets and continues past individual failures,
|
||||
// returning how many jobs were enqueued and a per-dataset map of any failures.
|
||||
func (s *DatasetService) ReprocessAll(ctx context.Context) (enqueued int, failures map[uuid.UUID]error, err error) {
|
||||
failures = make(map[uuid.UUID]error)
|
||||
for offset := 0; ; {
|
||||
summaries, err := s.repo.ListSummaries(ctx, nil, MaxPageSize, offset)
|
||||
if err != nil {
|
||||
return enqueued, failures, err
|
||||
}
|
||||
if len(summaries) == 0 {
|
||||
break
|
||||
}
|
||||
for _, sum := range summaries {
|
||||
if e := s.enqueueProcessing(ctx, domain.Dataset{ID: sum.ID, FileType: sum.FileType}); e != nil {
|
||||
failures[sum.ID] = e
|
||||
continue
|
||||
}
|
||||
enqueued++
|
||||
}
|
||||
if len(summaries) < MaxPageSize {
|
||||
break
|
||||
}
|
||||
offset += len(summaries)
|
||||
}
|
||||
return enqueued, failures, nil
|
||||
}
|
||||
|
||||
// ExtractProperties reads a plain vector dataset's attribute table and spatial
|
||||
// geometry and stores them (the attribute table as a JSON array of row objects
|
||||
// in the properties column, the dissolved feature geometry in the geometry
|
||||
|
||||
Loading…
Reference in New Issue
Block a user