653 lines
21 KiB
Go
653 lines
21 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"gis/internal/domain"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// maxParseBytes caps how much of a file is read into memory for parsing.
|
|
const maxParseBytes = 256 << 20 // 256 MiB
|
|
|
|
// DatasetRepository is the persistence behaviour DatasetService needs.
|
|
type DatasetRepository interface {
|
|
Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error)
|
|
GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error)
|
|
ListSummaries(ctx context.Context, categoryID *uuid.UUID, limit, offset int) ([]domain.DatasetSummary, error)
|
|
Count(ctx context.Context, categoryID *uuid.UUID) (int, error)
|
|
Delete(ctx context.Context, id uuid.UUID) error
|
|
MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error
|
|
MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error
|
|
MarkReady(ctx context.Context, id uuid.UUID) error
|
|
MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error
|
|
SetProperties(ctx context.Context, id uuid.UUID, properties []byte) error
|
|
SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error)
|
|
ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error
|
|
ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error)
|
|
CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error)
|
|
}
|
|
|
|
// Pagination defaults for dataset listings.
|
|
const (
|
|
DefaultPageSize = 20
|
|
MaxPageSize = 100
|
|
)
|
|
|
|
// DatasetPage is a page of dataset summaries with pagination metadata.
|
|
type DatasetPage struct {
|
|
Items []domain.DatasetSummary
|
|
Page int
|
|
PageSize int
|
|
Total int
|
|
}
|
|
|
|
// ObjectStore is the object-storage behaviour DatasetService needs.
|
|
type ObjectStore interface {
|
|
Put(ctx context.Context, key string, r io.Reader, size int64, contentType string) error
|
|
Get(ctx context.Context, key string) (io.ReadCloser, error)
|
|
Remove(ctx context.Context, key string) error
|
|
}
|
|
|
|
// categoryReader lets the dataset service verify a category exists before upload
|
|
// and resolve a category code to its id for list filtering.
|
|
type categoryReader interface {
|
|
GetByID(ctx context.Context, id uuid.UUID) (domain.Category, error)
|
|
GetByCode(ctx context.Context, code string) (domain.Category, error)
|
|
}
|
|
|
|
// JobEnqueuer schedules asynchronous dataset jobs.
|
|
type JobEnqueuer interface {
|
|
EnqueueParse(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueProperties(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueExtract(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueConvert(ctx context.Context, datasetID uuid.UUID) error
|
|
}
|
|
|
|
// ColumnParser detects attribute columns from a file's raw bytes.
|
|
type ColumnParser func(filename string, data []byte) ([]domain.AttributeColumn, error)
|
|
|
|
// RowParser reads every attribute row from a file's raw bytes as name->value maps.
|
|
type RowParser func(filename string, data []byte) ([]map[string]string, error)
|
|
|
|
// RasterConverter converts a raster file to a Cloud-Optimized GeoTIFF and reads
|
|
// its footprint. It operates on local file paths.
|
|
type RasterConverter interface {
|
|
ToCOG(ctx context.Context, srcPath, dstPath string) error
|
|
Footprint(ctx context.Context, srcPath string) ([]byte, error)
|
|
}
|
|
|
|
// UploadInput carries everything needed to store a new dataset.
|
|
type UploadInput struct {
|
|
CategoryID uuid.UUID
|
|
Name string
|
|
Description *string
|
|
Unit *string
|
|
Meta json.RawMessage
|
|
Automated bool
|
|
Filename string
|
|
FileType domain.FileType
|
|
ContentType string
|
|
Size int64
|
|
Reader io.Reader
|
|
}
|
|
|
|
// DatasetService implements dataset business rules and object storage handling.
|
|
type DatasetService struct {
|
|
repo DatasetRepository
|
|
store ObjectStore
|
|
categories categoryReader
|
|
jobs JobEnqueuer
|
|
parseColumns ColumnParser
|
|
parseRows RowParser
|
|
converter RasterConverter
|
|
}
|
|
|
|
// NewDatasetService wires the dataset repository, object store, category reader
|
|
// (for parent validation), the job enqueuer, the column/row parsers, and the
|
|
// raster converter.
|
|
func NewDatasetService(
|
|
repo DatasetRepository,
|
|
store ObjectStore,
|
|
categories categoryReader,
|
|
jobs JobEnqueuer,
|
|
parseColumns ColumnParser,
|
|
parseRows RowParser,
|
|
converter RasterConverter,
|
|
) *DatasetService {
|
|
return &DatasetService{
|
|
repo: repo,
|
|
store: store,
|
|
categories: categories,
|
|
jobs: jobs,
|
|
parseColumns: parseColumns,
|
|
parseRows: parseRows,
|
|
converter: converter,
|
|
}
|
|
}
|
|
|
|
// Upload validates input, stores the object, and persists the dataset. If the
|
|
// database write fails after upload, the stored object is removed.
|
|
func (s *DatasetService) Upload(ctx context.Context, in UploadInput) (domain.Dataset, error) {
|
|
if !in.FileType.Valid() {
|
|
return domain.Dataset{}, fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, in.FileType)
|
|
}
|
|
|
|
ext := strings.ToLower(filepath.Ext(in.Filename))
|
|
if !domain.ExtensionAllowedFor(in.FileType, ext) {
|
|
return domain.Dataset{}, fmt.Errorf("%w: extension %q is not allowed for file_type %q (allowed: %s)",
|
|
domain.ErrValidation, ext, in.FileType, strings.Join(domain.AllowedExtensions(in.FileType), ", "))
|
|
}
|
|
|
|
// Sniff the file's leading bytes to reject mislabeled uploads up front, then
|
|
// reconstruct the full stream for storage.
|
|
head := make([]byte, 512)
|
|
n, err := io.ReadFull(in.Reader, head)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
return domain.Dataset{}, fmt.Errorf("read upload: %w", err)
|
|
}
|
|
head = head[:n]
|
|
if err := domain.ValidateFileContent(ext, head); err != nil {
|
|
return domain.Dataset{}, fmt.Errorf("%w: %s", domain.ErrValidation, err)
|
|
}
|
|
content := io.MultiReader(bytes.NewReader(head), in.Reader)
|
|
|
|
if _, err := s.categories.GetByID(ctx, in.CategoryID); err != nil {
|
|
if errors.Is(err, domain.ErrNotFound) {
|
|
return domain.Dataset{}, fmt.Errorf("%w: category does not exist", domain.ErrValidation)
|
|
}
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
name := in.Name
|
|
if name == "" {
|
|
name = in.Filename
|
|
}
|
|
|
|
// Every uploaded file is processed asynchronously: vector_with_kato is parsed
|
|
// for column selection; plain vector has its attribute table extracted into
|
|
// properties; raster is converted to a COG.
|
|
status := domain.DatasetStatusProcessing
|
|
if in.FileType == domain.FileTypeVectorWithKato {
|
|
status = domain.DatasetStatusParsing
|
|
}
|
|
|
|
storageKey := fmt.Sprintf("%s/%s", uuid.New().String(), in.Filename)
|
|
if err := s.store.Put(ctx, storageKey, content, in.Size, in.ContentType); err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
dataset, err := s.repo.Create(ctx, domain.Dataset{
|
|
CategoryID: in.CategoryID,
|
|
Name: name,
|
|
Description: in.Description,
|
|
Unit: in.Unit,
|
|
Meta: in.Meta,
|
|
Automated: in.Automated,
|
|
Status: status,
|
|
Filename: in.Filename,
|
|
StorageKey: storageKey,
|
|
FileType: in.FileType,
|
|
SizeBytes: in.Size,
|
|
ContentType: in.ContentType,
|
|
})
|
|
if err != nil {
|
|
// Compensate: the row was not written, so the object would be orphaned.
|
|
_ = s.store.Remove(ctx, storageKey)
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
// Kick off the appropriate async job per file type. If enqueueing fails the
|
|
// row exists, so record the failure rather than leaving it stuck.
|
|
var enqueueErr error
|
|
switch in.FileType {
|
|
case domain.FileTypeVectorWithKato:
|
|
enqueueErr = s.jobs.EnqueueParse(ctx, dataset.ID)
|
|
case domain.FileTypeVector:
|
|
enqueueErr = s.jobs.EnqueueProperties(ctx, dataset.ID)
|
|
case domain.FileTypeRaster:
|
|
enqueueErr = s.jobs.EnqueueConvert(ctx, dataset.ID)
|
|
}
|
|
if enqueueErr != nil {
|
|
_ = s.repo.MarkParseFailed(ctx, dataset.ID, "failed to enqueue processing: "+enqueueErr.Error())
|
|
return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", enqueueErr)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// ExtractProperties reads a plain vector dataset's attribute table and stores it
|
|
// (as a JSON array of row objects) in the properties column, then marks the
|
|
// dataset ready. Invoked by the worker. Parse failures are recorded; storage
|
|
// failures are returned for retry.
|
|
func (s *DatasetService) ExtractProperties(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVector {
|
|
return nil // only plain vector populates properties
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient
|
|
}
|
|
|
|
rows, err := s.parseRows(dataset.Filename, data)
|
|
if err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
var properties []byte
|
|
if hasAttributeData(rows) {
|
|
if properties, err = json.Marshal(rows); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return s.repo.SetProperties(ctx, id, properties)
|
|
}
|
|
|
|
// hasAttributeData reports whether any row carries at least one attribute.
|
|
func hasAttributeData(rows []map[string]string) bool {
|
|
for _, row := range rows {
|
|
if len(row) > 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ConvertToCOG converts a raster dataset to a Cloud-Optimized GeoTIFF, stores it
|
|
// under a new key, records the footprint geometry, and marks the dataset ready.
|
|
// Invoked by the worker. Conversion failures are recorded; storage failures are
|
|
// returned for retry.
|
|
func (s *DatasetService) ConvertToCOG(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeRaster {
|
|
return nil // nothing to convert
|
|
}
|
|
|
|
srcPath, cleanupSrc, err := s.downloadToTemp(ctx, dataset.StorageKey, "gis-src-*.tif")
|
|
if err != nil {
|
|
return fmt.Errorf("download raster %s: %w", id, err) // transient
|
|
}
|
|
defer cleanupSrc()
|
|
|
|
dstPath := srcPath + ".cog.tif"
|
|
defer os.Remove(dstPath)
|
|
|
|
footprint, _ := s.converter.Footprint(ctx, srcPath) // best-effort
|
|
|
|
if err := s.converter.ToCOG(ctx, srcPath, dstPath); err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
cogKey := deriveCOGKey(dataset.StorageKey)
|
|
if err := s.uploadFile(ctx, cogKey, dstPath, "image/tiff"); err != nil {
|
|
return fmt.Errorf("upload cog %s: %w", id, err) // transient
|
|
}
|
|
return s.repo.MarkConverted(ctx, id, cogKey, footprint)
|
|
}
|
|
|
|
// downloadToTemp streams an object to a temp file and returns its path and a
|
|
// cleanup func.
|
|
func (s *DatasetService) downloadToTemp(ctx context.Context, key, pattern string) (string, func(), error) {
|
|
obj, err := s.store.Get(ctx, key)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
defer obj.Close()
|
|
|
|
f, err := os.CreateTemp("", pattern)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
if _, err := io.Copy(f, obj); err != nil {
|
|
f.Close()
|
|
os.Remove(f.Name())
|
|
return "", nil, err
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
os.Remove(f.Name())
|
|
return "", nil, err
|
|
}
|
|
return f.Name(), func() { os.Remove(f.Name()) }, nil
|
|
}
|
|
|
|
// uploadFile streams a local file to the object store.
|
|
func (s *DatasetService) uploadFile(ctx context.Context, key, filePath, contentType string) error {
|
|
f, err := os.Open(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
info, err := f.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.store.Put(ctx, key, f, info.Size(), contentType)
|
|
}
|
|
|
|
// deriveCOGKey places the COG alongside the original under a cog/ prefix.
|
|
func deriveCOGKey(storageKey string) string {
|
|
return path.Join(path.Dir(storageKey), "cog", path.Base(storageKey))
|
|
}
|
|
|
|
// Parse reads a vector_with_kato dataset's file, detects its attribute columns,
|
|
// and moves it to awaiting_mapping. It is invoked by the worker. Permanent
|
|
// parse failures are recorded on the dataset (and not retried); transient
|
|
// failures are returned to the caller.
|
|
func (s *DatasetService) Parse(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVectorWithKato {
|
|
return nil // nothing to parse
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient; allow retry
|
|
}
|
|
|
|
cols, err := s.parseColumns(dataset.Filename, data)
|
|
if err != nil {
|
|
// Permanent: the file could not be parsed. Record and stop.
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error())
|
|
}
|
|
return s.repo.MarkParsed(ctx, id, cols)
|
|
}
|
|
|
|
func (s *DatasetService) fetchObject(ctx context.Context, key string) ([]byte, error) {
|
|
obj, err := s.store.Get(ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer obj.Close()
|
|
return io.ReadAll(io.LimitReader(obj, maxParseBytes))
|
|
}
|
|
|
|
// MappingInput carries the user's KATO column choice and year-column mapping.
|
|
type MappingInput struct {
|
|
KatoColumn string
|
|
YearColumns []domain.YearColumn
|
|
}
|
|
|
|
// SaveMapping validates the KATO column and year mapping against the dataset's
|
|
// detected columns and marks the dataset ready.
|
|
func (s *DatasetService) SaveMapping(ctx context.Context, id uuid.UUID, in MappingInput) (domain.Dataset, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVectorWithKato {
|
|
return domain.Dataset{}, fmt.Errorf("%w: mapping only applies to vector_with_kato datasets", domain.ErrValidation)
|
|
}
|
|
if dataset.Status != domain.DatasetStatusAwaitingMapping && dataset.Status != domain.DatasetStatusReady {
|
|
return domain.Dataset{}, fmt.Errorf("%w: dataset is not ready for mapping (status %q)", domain.ErrConflict, dataset.Status)
|
|
}
|
|
|
|
known := make(map[string]struct{}, len(dataset.AttributeColumns))
|
|
for _, c := range dataset.AttributeColumns {
|
|
known[c.Name] = struct{}{}
|
|
}
|
|
if _, ok := known[in.KatoColumn]; !ok {
|
|
return domain.Dataset{}, fmt.Errorf("%w: kato_column %q is not among the detected columns", domain.ErrValidation, in.KatoColumn)
|
|
}
|
|
if len(in.YearColumns) == 0 {
|
|
return domain.Dataset{}, fmt.Errorf("%w: at least one year column mapping is required", domain.ErrValidation)
|
|
}
|
|
for _, yc := range in.YearColumns {
|
|
if _, ok := known[yc.Column]; !ok {
|
|
return domain.Dataset{}, fmt.Errorf("%w: year column %q is not among the detected columns", domain.ErrValidation, yc.Column)
|
|
}
|
|
if _, err := time.Parse("2006-01-02", yc.Date); err != nil {
|
|
return domain.Dataset{}, fmt.Errorf("%w: invalid date %q for column %q (want YYYY-MM-DD)", domain.ErrValidation, yc.Date, yc.Column)
|
|
}
|
|
}
|
|
|
|
dataset, err = s.repo.SaveMapping(ctx, id, in.KatoColumn, in.YearColumns)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
if err := s.jobs.EnqueueExtract(ctx, id); err != nil {
|
|
_ = s.repo.MarkParseFailed(ctx, id, "failed to enqueue extraction: "+err.Error())
|
|
return domain.Dataset{}, fmt.Errorf("enqueue extract: %w", err)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// Extract reads a mapped dataset's file, unpivots its attribute table into
|
|
// observations keyed by KATO code and date, and marks the dataset ready. It is
|
|
// invoked by the worker. Permanent failures (unparsable file) are recorded;
|
|
// transient failures (storage/DB) are returned for retry.
|
|
func (s *DatasetService) Extract(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.KatoColumn == nil || len(dataset.YearColumns) == 0 {
|
|
return fmt.Errorf("dataset %s has no mapping to extract", id)
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient
|
|
}
|
|
|
|
rows, err := s.parseRows(dataset.Filename, data)
|
|
if err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
obs := buildObservations(id, *dataset.KatoColumn, dataset.YearColumns, rows)
|
|
if err := s.repo.ReplaceObservations(ctx, id, obs); err != nil {
|
|
return err // transient
|
|
}
|
|
return s.repo.MarkReady(ctx, id)
|
|
}
|
|
|
|
// buildObservations unpivots rows into observations. Rows without a KATO code
|
|
// are skipped; duplicate (kato, date) pairs keep the last value. Numeric cells
|
|
// populate Value, others ValueText.
|
|
func buildObservations(datasetID uuid.UUID, katoColumn string, years []domain.YearColumn, rows []map[string]string) []domain.Observation {
|
|
obs := make([]domain.Observation, 0, len(rows)*len(years))
|
|
index := make(map[string]int)
|
|
|
|
for _, row := range rows {
|
|
kato := strings.TrimSpace(row[katoColumn])
|
|
if kato == "" {
|
|
continue
|
|
}
|
|
for _, yc := range years {
|
|
o := domain.Observation{DatasetID: datasetID, KatoCode: kato, Date: yc.Date}
|
|
if raw := strings.TrimSpace(row[yc.Column]); raw != "" {
|
|
if f, err := strconv.ParseFloat(raw, 64); err == nil {
|
|
o.Value = &f
|
|
} else {
|
|
o.ValueText = &raw
|
|
}
|
|
}
|
|
key := kato + "\x00" + yc.Date
|
|
if i, ok := index[key]; ok {
|
|
obs[i] = o
|
|
} else {
|
|
index[key] = len(obs)
|
|
obs = append(obs, o)
|
|
}
|
|
}
|
|
}
|
|
return obs
|
|
}
|
|
|
|
// ObservationPage is a page of observations with pagination metadata.
|
|
type ObservationPage struct {
|
|
Items []domain.Observation
|
|
Page int
|
|
PageSize int
|
|
Total int
|
|
}
|
|
|
|
// ListObservations returns a page of a dataset's observations, optionally
|
|
// filtered by KATO code.
|
|
func (s *DatasetService) ListObservations(ctx context.Context, id uuid.UUID, katoCode *string, page, pageSize int) (ObservationPage, error) {
|
|
if _, err := s.repo.GetByID(ctx, id); err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 {
|
|
pageSize = DefaultPageSize
|
|
}
|
|
if pageSize > MaxPageSize {
|
|
pageSize = MaxPageSize
|
|
}
|
|
|
|
items, err := s.repo.ListObservations(ctx, id, katoCode, pageSize, (page-1)*pageSize)
|
|
if err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
total, err := s.repo.CountObservations(ctx, id, katoCode)
|
|
if err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
return ObservationPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil
|
|
}
|
|
|
|
// Get returns a dataset by id.
|
|
func (s *DatasetService) Get(ctx context.Context, id uuid.UUID) (domain.Dataset, error) {
|
|
return s.repo.GetByID(ctx, id)
|
|
}
|
|
|
|
// Status-wait bounds and polling cadence for long polling.
|
|
const (
|
|
DefaultStatusWait = 25 * time.Second
|
|
MaxStatusWait = 60 * time.Second
|
|
statusPollInterval = 1 * time.Second
|
|
)
|
|
|
|
// DatasetStatusInfo is the minimal status view returned by long polling.
|
|
type DatasetStatusInfo struct {
|
|
ID uuid.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
ParseError *string `json:"parse_error"`
|
|
}
|
|
|
|
// WaitForStatus implements long polling. If current is empty or already differs
|
|
// from the dataset's status it returns immediately; otherwise it waits (up to
|
|
// wait, clamped to MaxStatusWait) for the status to change, returning the latest
|
|
// status on change or on timeout.
|
|
func (s *DatasetService) WaitForStatus(ctx context.Context, id uuid.UUID, current string, wait time.Duration) (DatasetStatusInfo, error) {
|
|
if wait <= 0 {
|
|
wait = DefaultStatusWait
|
|
}
|
|
if wait > MaxStatusWait {
|
|
wait = MaxStatusWait
|
|
}
|
|
deadline := time.Now().Add(wait)
|
|
|
|
for {
|
|
d, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return DatasetStatusInfo{}, err
|
|
}
|
|
if current == "" || d.Status != current || !time.Now().Before(deadline) {
|
|
return DatasetStatusInfo{ID: d.ID, Status: d.Status, ParseError: d.ParseError}, nil
|
|
}
|
|
|
|
sleep := statusPollInterval
|
|
if rem := time.Until(deadline); rem < sleep {
|
|
sleep = rem
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return DatasetStatusInfo{}, ctx.Err()
|
|
case <-time.After(sleep):
|
|
}
|
|
}
|
|
}
|
|
|
|
// ListSummaries returns a page of dataset summaries, optionally filtered to a
|
|
// category by id and/or by code. page is 1-based; page and pageSize are clamped
|
|
// to sane bounds. When categoryCode is set it is resolved to its category id; an
|
|
// unknown code yields an empty page.
|
|
func (s *DatasetService) ListSummaries(ctx context.Context, categoryID *uuid.UUID, categoryCode *string, page, pageSize int) (DatasetPage, error) {
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 {
|
|
pageSize = DefaultPageSize
|
|
}
|
|
if pageSize > MaxPageSize {
|
|
pageSize = MaxPageSize
|
|
}
|
|
|
|
if categoryCode != nil {
|
|
category, err := s.categories.GetByCode(ctx, *categoryCode)
|
|
if err != nil {
|
|
if errors.Is(err, domain.ErrNotFound) {
|
|
return DatasetPage{Items: []domain.DatasetSummary{}, Page: page, PageSize: pageSize}, nil
|
|
}
|
|
return DatasetPage{}, err
|
|
}
|
|
categoryID = &category.ID
|
|
}
|
|
|
|
items, err := s.repo.ListSummaries(ctx, categoryID, pageSize, (page-1)*pageSize)
|
|
if err != nil {
|
|
return DatasetPage{}, err
|
|
}
|
|
total, err := s.repo.Count(ctx, categoryID)
|
|
if err != nil {
|
|
return DatasetPage{}, err
|
|
}
|
|
return DatasetPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil
|
|
}
|
|
|
|
// Download returns the dataset metadata and a reader for its stored object. The
|
|
// caller must close the reader.
|
|
func (s *DatasetService) Download(ctx context.Context, id uuid.UUID) (domain.Dataset, io.ReadCloser, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, nil, err
|
|
}
|
|
obj, err := s.store.Get(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return domain.Dataset{}, nil, err
|
|
}
|
|
return dataset, obj, nil
|
|
}
|
|
|
|
// Delete removes the dataset row and its stored object.
|
|
func (s *DatasetService) Delete(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := s.repo.Delete(ctx, id); err != nil {
|
|
return err
|
|
}
|
|
if err := s.store.Remove(ctx, dataset.StorageKey); err != nil {
|
|
// The row is already gone; surface the object-store failure to the caller.
|
|
return err
|
|
}
|
|
return nil
|
|
}
|