package service import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "os" "path" "path/filepath" "strconv" "strings" "time" "gis/internal/domain" "github.com/google/uuid" ) // maxParseBytes caps how much of a file is read into memory for parsing. const maxParseBytes = 256 << 20 // 256 MiB // DatasetRepository is the persistence behaviour DatasetService needs. type DatasetRepository interface { Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error) GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error) ListSummaries(ctx context.Context, categoryID *uuid.UUID, limit, offset int) ([]domain.DatasetSummary, error) Count(ctx context.Context, categoryID *uuid.UUID) (int, error) Delete(ctx context.Context, id uuid.UUID) error MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error MarkReady(ctx context.Context, id uuid.UUID) error MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error SetProperties(ctx context.Context, id uuid.UUID, properties []byte) error SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error) ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error) CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error) } // Pagination defaults for dataset listings. const ( DefaultPageSize = 20 MaxPageSize = 100 ) // DatasetPage is a page of dataset summaries with pagination metadata. type DatasetPage struct { Items []domain.DatasetSummary Page int PageSize int Total int } // ObjectStore is the object-storage behaviour DatasetService needs. type ObjectStore interface { Put(ctx context.Context, key string, r io.Reader, size int64, contentType string) error Get(ctx context.Context, key string) (io.ReadCloser, error) Remove(ctx context.Context, key string) error } // categoryReader lets the dataset service verify a category exists before upload // and resolve a category code to its id for list filtering. type categoryReader interface { GetByID(ctx context.Context, id uuid.UUID) (domain.Category, error) GetByCode(ctx context.Context, code string) (domain.Category, error) } // JobEnqueuer schedules asynchronous dataset jobs. type JobEnqueuer interface { EnqueueParse(ctx context.Context, datasetID uuid.UUID) error EnqueueProperties(ctx context.Context, datasetID uuid.UUID) error EnqueueExtract(ctx context.Context, datasetID uuid.UUID) error EnqueueConvert(ctx context.Context, datasetID uuid.UUID) error } // ColumnParser detects attribute columns from a file's raw bytes. type ColumnParser func(filename string, data []byte) ([]domain.AttributeColumn, error) // RowParser reads every attribute row from a file's raw bytes as name->value maps. type RowParser func(filename string, data []byte) ([]map[string]string, error) // RasterConverter converts a raster file to a Cloud-Optimized GeoTIFF and reads // its footprint. It operates on local file paths. type RasterConverter interface { ToCOG(ctx context.Context, srcPath, dstPath string) error Footprint(ctx context.Context, srcPath string) ([]byte, error) } // UploadInput carries everything needed to store a new dataset. type UploadInput struct { CategoryID uuid.UUID Name string Description *string Unit *string Meta json.RawMessage Automated bool Filename string FileType domain.FileType ContentType string Size int64 Reader io.Reader } // DatasetService implements dataset business rules and object storage handling. type DatasetService struct { repo DatasetRepository store ObjectStore categories categoryReader jobs JobEnqueuer parseColumns ColumnParser parseRows RowParser converter RasterConverter } // NewDatasetService wires the dataset repository, object store, category reader // (for parent validation), the job enqueuer, the column/row parsers, and the // raster converter. func NewDatasetService( repo DatasetRepository, store ObjectStore, categories categoryReader, jobs JobEnqueuer, parseColumns ColumnParser, parseRows RowParser, converter RasterConverter, ) *DatasetService { return &DatasetService{ repo: repo, store: store, categories: categories, jobs: jobs, parseColumns: parseColumns, parseRows: parseRows, converter: converter, } } // Upload validates input, stores the object, and persists the dataset. If the // database write fails after upload, the stored object is removed. func (s *DatasetService) Upload(ctx context.Context, in UploadInput) (domain.Dataset, error) { if !in.FileType.Valid() { return domain.Dataset{}, fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, in.FileType) } ext := strings.ToLower(filepath.Ext(in.Filename)) if !domain.ExtensionAllowedFor(in.FileType, ext) { return domain.Dataset{}, fmt.Errorf("%w: extension %q is not allowed for file_type %q (allowed: %s)", domain.ErrValidation, ext, in.FileType, strings.Join(domain.AllowedExtensions(in.FileType), ", ")) } // Sniff the file's leading bytes to reject mislabeled uploads up front, then // reconstruct the full stream for storage. head := make([]byte, 512) n, err := io.ReadFull(in.Reader, head) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { return domain.Dataset{}, fmt.Errorf("read upload: %w", err) } head = head[:n] if err := domain.ValidateFileContent(ext, head); err != nil { return domain.Dataset{}, fmt.Errorf("%w: %s", domain.ErrValidation, err) } content := io.MultiReader(bytes.NewReader(head), in.Reader) if _, err := s.categories.GetByID(ctx, in.CategoryID); err != nil { if errors.Is(err, domain.ErrNotFound) { return domain.Dataset{}, fmt.Errorf("%w: category does not exist", domain.ErrValidation) } return domain.Dataset{}, err } name := in.Name if name == "" { name = in.Filename } // Every uploaded file is processed asynchronously: vector_with_kato is parsed // for column selection; plain vector has its attribute table extracted into // properties; raster is converted to a COG. status := domain.DatasetStatusProcessing if in.FileType == domain.FileTypeVectorWithKato { status = domain.DatasetStatusParsing } storageKey := fmt.Sprintf("%s/%s", uuid.New().String(), in.Filename) if err := s.store.Put(ctx, storageKey, content, in.Size, in.ContentType); err != nil { return domain.Dataset{}, err } dataset, err := s.repo.Create(ctx, domain.Dataset{ CategoryID: in.CategoryID, Name: name, Description: in.Description, Unit: in.Unit, Meta: in.Meta, Automated: in.Automated, Status: status, Filename: in.Filename, StorageKey: storageKey, FileType: in.FileType, SizeBytes: in.Size, ContentType: in.ContentType, }) if err != nil { // Compensate: the row was not written, so the object would be orphaned. _ = s.store.Remove(ctx, storageKey) return domain.Dataset{}, err } // Kick off the appropriate async job per file type. If enqueueing fails the // row exists, so record the failure rather than leaving it stuck. if err := s.enqueueProcessing(ctx, dataset); err != nil { _ = s.repo.MarkParseFailed(ctx, dataset.ID, "failed to enqueue processing: "+err.Error()) return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err) } return dataset, nil } // enqueueProcessing schedules the appropriate async job for a dataset based on // its file type: vector_with_kato is parsed for column selection, plain vector // has its attribute table extracted into properties, and raster is converted to // a COG. func (s *DatasetService) enqueueProcessing(ctx context.Context, d domain.Dataset) error { switch d.FileType { case domain.FileTypeVectorWithKato: return s.jobs.EnqueueParse(ctx, d.ID) case domain.FileTypeVector: return s.jobs.EnqueueProperties(ctx, d.ID) case domain.FileTypeRaster: return s.jobs.EnqueueConvert(ctx, d.ID) default: return fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, d.FileType) } } // Reprocess re-enqueues the processing job for an existing dataset, restarting // its asynchronous pipeline from the step appropriate to its file type. Useful // for retrying after a transient failure or a worker that was behind the schema. func (s *DatasetService) Reprocess(ctx context.Context, id uuid.UUID) (domain.Dataset, error) { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return domain.Dataset{}, err } if err := s.enqueueProcessing(ctx, dataset); err != nil { return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err) } return dataset, nil } // ExtractProperties reads a plain vector dataset's attribute table and stores it // (as a JSON array of row objects) in the properties column, then marks the // dataset ready. Invoked by the worker. Parse failures are recorded; storage // failures are returned for retry. func (s *DatasetService) ExtractProperties(ctx context.Context, id uuid.UUID) error { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return err } if dataset.FileType != domain.FileTypeVector { return nil // only plain vector populates properties } data, err := s.fetchObject(ctx, dataset.StorageKey) if err != nil { return fmt.Errorf("read dataset %s: %w", id, err) // transient } rows, err := s.parseRows(dataset.Filename, data) if err != nil { return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent } var properties []byte if hasAttributeData(rows) { if properties, err = json.Marshal(rows); err != nil { return err } } return s.repo.SetProperties(ctx, id, properties) } // hasAttributeData reports whether any row carries at least one attribute. func hasAttributeData(rows []map[string]string) bool { for _, row := range rows { if len(row) > 0 { return true } } return false } // ConvertToCOG converts a raster dataset to a Cloud-Optimized GeoTIFF, stores it // under a new key, records the footprint geometry, and marks the dataset ready. // Invoked by the worker. Conversion failures are recorded; storage failures are // returned for retry. func (s *DatasetService) ConvertToCOG(ctx context.Context, id uuid.UUID) error { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return err } if dataset.FileType != domain.FileTypeRaster { return nil // nothing to convert } srcPath, cleanupSrc, err := s.downloadToTemp(ctx, dataset.StorageKey, "gis-src-*.tif") if err != nil { return fmt.Errorf("download raster %s: %w", id, err) // transient } defer cleanupSrc() dstPath := srcPath + ".cog.tif" defer os.Remove(dstPath) footprint, _ := s.converter.Footprint(ctx, srcPath) // best-effort if err := s.converter.ToCOG(ctx, srcPath, dstPath); err != nil { return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent } cogKey := deriveCOGKey(dataset.StorageKey) if err := s.uploadFile(ctx, cogKey, dstPath, "image/tiff"); err != nil { return fmt.Errorf("upload cog %s: %w", id, err) // transient } return s.repo.MarkConverted(ctx, id, cogKey, footprint) } // downloadToTemp streams an object to a temp file and returns its path and a // cleanup func. func (s *DatasetService) downloadToTemp(ctx context.Context, key, pattern string) (string, func(), error) { obj, err := s.store.Get(ctx, key) if err != nil { return "", nil, err } defer obj.Close() f, err := os.CreateTemp("", pattern) if err != nil { return "", nil, err } if _, err := io.Copy(f, obj); err != nil { f.Close() os.Remove(f.Name()) return "", nil, err } if err := f.Close(); err != nil { os.Remove(f.Name()) return "", nil, err } return f.Name(), func() { os.Remove(f.Name()) }, nil } // uploadFile streams a local file to the object store. func (s *DatasetService) uploadFile(ctx context.Context, key, filePath, contentType string) error { f, err := os.Open(filePath) if err != nil { return err } defer f.Close() info, err := f.Stat() if err != nil { return err } return s.store.Put(ctx, key, f, info.Size(), contentType) } // deriveCOGKey places the COG alongside the original under a cog/ prefix. func deriveCOGKey(storageKey string) string { return path.Join(path.Dir(storageKey), "cog", path.Base(storageKey)) } // Parse reads a vector_with_kato dataset's file, detects its attribute columns, // and moves it to awaiting_mapping. It is invoked by the worker. Permanent // parse failures are recorded on the dataset (and not retried); transient // failures are returned to the caller. func (s *DatasetService) Parse(ctx context.Context, id uuid.UUID) error { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return err } if dataset.FileType != domain.FileTypeVectorWithKato { return nil // nothing to parse } data, err := s.fetchObject(ctx, dataset.StorageKey) if err != nil { return fmt.Errorf("read dataset %s: %w", id, err) // transient; allow retry } cols, err := s.parseColumns(dataset.Filename, data) if err != nil { // Permanent: the file could not be parsed. Record and stop. return s.repo.MarkParseFailed(ctx, id, err.Error()) } return s.repo.MarkParsed(ctx, id, cols) } func (s *DatasetService) fetchObject(ctx context.Context, key string) ([]byte, error) { obj, err := s.store.Get(ctx, key) if err != nil { return nil, err } defer obj.Close() return io.ReadAll(io.LimitReader(obj, maxParseBytes)) } // MappingInput carries the user's KATO column choice and year-column mapping. type MappingInput struct { KatoColumn string YearColumns []domain.YearColumn } // SaveMapping validates the KATO column and year mapping against the dataset's // detected columns and marks the dataset ready. func (s *DatasetService) SaveMapping(ctx context.Context, id uuid.UUID, in MappingInput) (domain.Dataset, error) { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return domain.Dataset{}, err } if dataset.FileType != domain.FileTypeVectorWithKato { return domain.Dataset{}, fmt.Errorf("%w: mapping only applies to vector_with_kato datasets", domain.ErrValidation) } if dataset.Status != domain.DatasetStatusAwaitingMapping && dataset.Status != domain.DatasetStatusReady { return domain.Dataset{}, fmt.Errorf("%w: dataset is not ready for mapping (status %q)", domain.ErrConflict, dataset.Status) } known := make(map[string]struct{}, len(dataset.AttributeColumns)) for _, c := range dataset.AttributeColumns { known[c.Name] = struct{}{} } if _, ok := known[in.KatoColumn]; !ok { return domain.Dataset{}, fmt.Errorf("%w: kato_column %q is not among the detected columns", domain.ErrValidation, in.KatoColumn) } if len(in.YearColumns) == 0 { return domain.Dataset{}, fmt.Errorf("%w: at least one year column mapping is required", domain.ErrValidation) } for _, yc := range in.YearColumns { if _, ok := known[yc.Column]; !ok { return domain.Dataset{}, fmt.Errorf("%w: year column %q is not among the detected columns", domain.ErrValidation, yc.Column) } if _, err := time.Parse("2006-01-02", yc.Date); err != nil { return domain.Dataset{}, fmt.Errorf("%w: invalid date %q for column %q (want YYYY-MM-DD)", domain.ErrValidation, yc.Date, yc.Column) } } dataset, err = s.repo.SaveMapping(ctx, id, in.KatoColumn, in.YearColumns) if err != nil { return domain.Dataset{}, err } if err := s.jobs.EnqueueExtract(ctx, id); err != nil { _ = s.repo.MarkParseFailed(ctx, id, "failed to enqueue extraction: "+err.Error()) return domain.Dataset{}, fmt.Errorf("enqueue extract: %w", err) } return dataset, nil } // Extract reads a mapped dataset's file, unpivots its attribute table into // observations keyed by KATO code and date, and marks the dataset ready. It is // invoked by the worker. Permanent failures (unparsable file) are recorded; // transient failures (storage/DB) are returned for retry. func (s *DatasetService) Extract(ctx context.Context, id uuid.UUID) error { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return err } if dataset.KatoColumn == nil || len(dataset.YearColumns) == 0 { return fmt.Errorf("dataset %s has no mapping to extract", id) } data, err := s.fetchObject(ctx, dataset.StorageKey) if err != nil { return fmt.Errorf("read dataset %s: %w", id, err) // transient } rows, err := s.parseRows(dataset.Filename, data) if err != nil { return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent } obs := buildObservations(id, *dataset.KatoColumn, dataset.YearColumns, rows) if err := s.repo.ReplaceObservations(ctx, id, obs); err != nil { return err // transient } return s.repo.MarkReady(ctx, id) } // buildObservations unpivots rows into observations. Rows without a KATO code // are skipped; duplicate (kato, date) pairs keep the last value. Numeric cells // populate Value, others ValueText. func buildObservations(datasetID uuid.UUID, katoColumn string, years []domain.YearColumn, rows []map[string]string) []domain.Observation { obs := make([]domain.Observation, 0, len(rows)*len(years)) index := make(map[string]int) for _, row := range rows { kato := strings.TrimSpace(row[katoColumn]) if kato == "" { continue } for _, yc := range years { o := domain.Observation{DatasetID: datasetID, KatoCode: kato, Date: yc.Date} if raw := strings.TrimSpace(row[yc.Column]); raw != "" { if f, err := strconv.ParseFloat(raw, 64); err == nil { o.Value = &f } else { o.ValueText = &raw } } key := kato + "\x00" + yc.Date if i, ok := index[key]; ok { obs[i] = o } else { index[key] = len(obs) obs = append(obs, o) } } } return obs } // ObservationPage is a page of observations with pagination metadata. type ObservationPage struct { Items []domain.Observation Page int PageSize int Total int } // ListObservations returns a page of a dataset's observations, optionally // filtered by KATO code. func (s *DatasetService) ListObservations(ctx context.Context, id uuid.UUID, katoCode *string, page, pageSize int) (ObservationPage, error) { if _, err := s.repo.GetByID(ctx, id); err != nil { return ObservationPage{}, err } if page < 1 { page = 1 } if pageSize < 1 { pageSize = DefaultPageSize } if pageSize > MaxPageSize { pageSize = MaxPageSize } items, err := s.repo.ListObservations(ctx, id, katoCode, pageSize, (page-1)*pageSize) if err != nil { return ObservationPage{}, err } total, err := s.repo.CountObservations(ctx, id, katoCode) if err != nil { return ObservationPage{}, err } return ObservationPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil } // Get returns a dataset by id. func (s *DatasetService) Get(ctx context.Context, id uuid.UUID) (domain.Dataset, error) { return s.repo.GetByID(ctx, id) } // Status-wait bounds and polling cadence for long polling. const ( DefaultStatusWait = 25 * time.Second MaxStatusWait = 60 * time.Second statusPollInterval = 1 * time.Second ) // DatasetStatusInfo is the minimal status view returned by long polling. type DatasetStatusInfo struct { ID uuid.UUID `json:"id"` Status string `json:"status"` ParseError *string `json:"parse_error"` } // WaitForStatus implements long polling. If current is empty or already differs // from the dataset's status it returns immediately; otherwise it waits (up to // wait, clamped to MaxStatusWait) for the status to change, returning the latest // status on change or on timeout. func (s *DatasetService) WaitForStatus(ctx context.Context, id uuid.UUID, current string, wait time.Duration) (DatasetStatusInfo, error) { if wait <= 0 { wait = DefaultStatusWait } if wait > MaxStatusWait { wait = MaxStatusWait } deadline := time.Now().Add(wait) for { d, err := s.repo.GetByID(ctx, id) if err != nil { return DatasetStatusInfo{}, err } if current == "" || d.Status != current || !time.Now().Before(deadline) { return DatasetStatusInfo{ID: d.ID, Status: d.Status, ParseError: d.ParseError}, nil } sleep := statusPollInterval if rem := time.Until(deadline); rem < sleep { sleep = rem } select { case <-ctx.Done(): return DatasetStatusInfo{}, ctx.Err() case <-time.After(sleep): } } } // ListSummaries returns a page of dataset summaries, optionally filtered to a // category by id and/or by code. page is 1-based; page and pageSize are clamped // to sane bounds. When categoryCode is set it is resolved to its category id; an // unknown code yields an empty page. func (s *DatasetService) ListSummaries(ctx context.Context, categoryID *uuid.UUID, categoryCode *string, page, pageSize int) (DatasetPage, error) { if page < 1 { page = 1 } if pageSize < 1 { pageSize = DefaultPageSize } if pageSize > MaxPageSize { pageSize = MaxPageSize } if categoryCode != nil { category, err := s.categories.GetByCode(ctx, *categoryCode) if err != nil { if errors.Is(err, domain.ErrNotFound) { return DatasetPage{Items: []domain.DatasetSummary{}, Page: page, PageSize: pageSize}, nil } return DatasetPage{}, err } categoryID = &category.ID } items, err := s.repo.ListSummaries(ctx, categoryID, pageSize, (page-1)*pageSize) if err != nil { return DatasetPage{}, err } total, err := s.repo.Count(ctx, categoryID) if err != nil { return DatasetPage{}, err } return DatasetPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil } // Download returns the dataset metadata and a reader for its stored object. The // caller must close the reader. func (s *DatasetService) Download(ctx context.Context, id uuid.UUID) (domain.Dataset, io.ReadCloser, error) { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return domain.Dataset{}, nil, err } obj, err := s.store.Get(ctx, dataset.StorageKey) if err != nil { return domain.Dataset{}, nil, err } return dataset, obj, nil } // Delete removes the dataset row and its stored object. func (s *DatasetService) Delete(ctx context.Context, id uuid.UUID) error { dataset, err := s.repo.GetByID(ctx, id) if err != nil { return err } if err := s.repo.Delete(ctx, id); err != nil { return err } if err := s.store.Remove(ctx, dataset.StorageKey); err != nil { // The row is already gone; surface the object-store failure to the caller. return err } return nil }