956 lines
33 KiB
Go
956 lines
33 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"gis/internal/domain"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// maxParseBytes caps how much of a file is read into memory for parsing.
|
|
const maxParseBytes = 256 << 20 // 256 MiB
|
|
|
|
// DatasetRepository is the persistence behaviour DatasetService needs.
|
|
type DatasetRepository interface {
|
|
Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error)
|
|
GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error)
|
|
ListSummaries(ctx context.Context, filter domain.DatasetFilter, limit, offset int) ([]domain.DatasetSummary, error)
|
|
Count(ctx context.Context, filter domain.DatasetFilter) (int, error)
|
|
Delete(ctx context.Context, id uuid.UUID) error
|
|
MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error
|
|
MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error
|
|
MarkReady(ctx context.Context, id uuid.UUID, geometry []byte) error
|
|
MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error
|
|
SetProperties(ctx context.Context, id uuid.UUID, properties, geometry []byte) error
|
|
SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error)
|
|
ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error
|
|
ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error)
|
|
CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error)
|
|
ListAllObservations(ctx context.Context, datasetID uuid.UUID) ([]domain.Observation, error)
|
|
DistrictGeometriesByKato(ctx context.Context, katos []string) (map[string]domain.District, error)
|
|
}
|
|
|
|
// Pagination defaults for dataset listings.
|
|
const (
|
|
DefaultPageSize = 20
|
|
MaxPageSize = 100
|
|
)
|
|
|
|
// DatasetPage is a page of dataset summaries with pagination metadata.
|
|
type DatasetPage struct {
|
|
Items []domain.DatasetSummary
|
|
Page int
|
|
PageSize int
|
|
Total int
|
|
}
|
|
|
|
// ObjectStore is the object-storage behaviour DatasetService needs.
|
|
type ObjectStore interface {
|
|
Put(ctx context.Context, key string, r io.Reader, size int64, contentType string) error
|
|
Get(ctx context.Context, key string) (io.ReadCloser, error)
|
|
Remove(ctx context.Context, key string) error
|
|
}
|
|
|
|
// categoryReader lets the dataset service verify a category exists before upload
|
|
// and resolve a category code to its id for list filtering.
|
|
type categoryReader interface {
|
|
GetByID(ctx context.Context, id uuid.UUID) (domain.Category, error)
|
|
GetByCode(ctx context.Context, code string) (domain.Category, error)
|
|
}
|
|
|
|
// JobEnqueuer schedules asynchronous dataset jobs.
|
|
type JobEnqueuer interface {
|
|
EnqueueParse(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueProperties(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueExtract(ctx context.Context, datasetID uuid.UUID) error
|
|
EnqueueConvert(ctx context.Context, datasetID uuid.UUID) error
|
|
}
|
|
|
|
// ColumnParser detects attribute columns from a file's raw bytes.
|
|
type ColumnParser func(filename string, data []byte) ([]domain.AttributeColumn, error)
|
|
|
|
// RowParser reads every attribute row from a file's raw bytes as name->value maps.
|
|
type RowParser func(filename string, data []byte) ([]map[string]string, error)
|
|
|
|
// RasterConverter converts a raster file to a Cloud-Optimized GeoTIFF, reads its
|
|
// footprint, and dissolves a vector file's features into a single geometry. It
|
|
// operates on local file paths.
|
|
type RasterConverter interface {
|
|
ToCOG(ctx context.Context, srcPath, dstPath string) error
|
|
Footprint(ctx context.Context, srcPath string) ([]byte, error)
|
|
// VectorGeometry returns the combined geometry of a vector file's features as
|
|
// GeoJSON in EPSG:4326 (nil if the file has no features).
|
|
VectorGeometry(ctx context.Context, srcPath string) ([]byte, error)
|
|
}
|
|
|
|
// UploadInput carries everything needed to store a new dataset.
|
|
type UploadInput struct {
|
|
CategoryID uuid.UUID
|
|
Name string
|
|
Description *string
|
|
Unit *string
|
|
Meta json.RawMessage
|
|
Automated bool
|
|
Filename string
|
|
FileType domain.FileType
|
|
ContentType string
|
|
Size int64
|
|
Reader io.Reader
|
|
}
|
|
|
|
// DatasetService implements dataset business rules and object storage handling.
|
|
type DatasetService struct {
|
|
repo DatasetRepository
|
|
store ObjectStore
|
|
categories categoryReader
|
|
jobs JobEnqueuer
|
|
parseColumns ColumnParser
|
|
parseRows RowParser
|
|
converter RasterConverter
|
|
}
|
|
|
|
// NewDatasetService wires the dataset repository, object store, category reader
|
|
// (for parent validation), the job enqueuer, the column/row parsers, and the
|
|
// raster converter.
|
|
func NewDatasetService(
|
|
repo DatasetRepository,
|
|
store ObjectStore,
|
|
categories categoryReader,
|
|
jobs JobEnqueuer,
|
|
parseColumns ColumnParser,
|
|
parseRows RowParser,
|
|
converter RasterConverter,
|
|
) *DatasetService {
|
|
return &DatasetService{
|
|
repo: repo,
|
|
store: store,
|
|
categories: categories,
|
|
jobs: jobs,
|
|
parseColumns: parseColumns,
|
|
parseRows: parseRows,
|
|
converter: converter,
|
|
}
|
|
}
|
|
|
|
// Upload validates input, stores the object, and persists the dataset. If the
|
|
// database write fails after upload, the stored object is removed.
|
|
func (s *DatasetService) Upload(ctx context.Context, in UploadInput) (domain.Dataset, error) {
|
|
if !in.FileType.Valid() {
|
|
return domain.Dataset{}, fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, in.FileType)
|
|
}
|
|
|
|
ext := strings.ToLower(filepath.Ext(in.Filename))
|
|
if !domain.ExtensionAllowedFor(in.FileType, ext) {
|
|
return domain.Dataset{}, fmt.Errorf("%w: extension %q is not allowed for file_type %q (allowed: %s)",
|
|
domain.ErrValidation, ext, in.FileType, strings.Join(domain.AllowedExtensions(in.FileType), ", "))
|
|
}
|
|
|
|
// Sniff the file's leading bytes to reject mislabeled uploads up front, then
|
|
// reconstruct the full stream for storage.
|
|
head := make([]byte, 512)
|
|
n, err := io.ReadFull(in.Reader, head)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
return domain.Dataset{}, fmt.Errorf("read upload: %w", err)
|
|
}
|
|
head = head[:n]
|
|
if err := domain.ValidateFileContent(ext, head); err != nil {
|
|
return domain.Dataset{}, fmt.Errorf("%w: %s", domain.ErrValidation, err)
|
|
}
|
|
content := io.MultiReader(bytes.NewReader(head), in.Reader)
|
|
|
|
if _, err := s.categories.GetByID(ctx, in.CategoryID); err != nil {
|
|
if errors.Is(err, domain.ErrNotFound) {
|
|
return domain.Dataset{}, fmt.Errorf("%w: category does not exist", domain.ErrValidation)
|
|
}
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
name := in.Name
|
|
if name == "" {
|
|
name = in.Filename
|
|
}
|
|
|
|
// Every uploaded file is processed asynchronously: vector_with_kato is parsed
|
|
// for column selection; plain vector has its attribute table extracted into
|
|
// properties; raster is converted to a COG.
|
|
status := domain.DatasetStatusProcessing
|
|
if in.FileType == domain.FileTypeVectorWithKato {
|
|
status = domain.DatasetStatusParsing
|
|
}
|
|
|
|
storageKey := fmt.Sprintf("%s/%s", uuid.New().String(), in.Filename)
|
|
if err := s.store.Put(ctx, storageKey, content, in.Size, in.ContentType); err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
dataset, err := s.repo.Create(ctx, domain.Dataset{
|
|
CategoryID: in.CategoryID,
|
|
Name: name,
|
|
Description: in.Description,
|
|
Unit: in.Unit,
|
|
Meta: in.Meta,
|
|
Automated: in.Automated,
|
|
Status: status,
|
|
Filename: in.Filename,
|
|
StorageKey: storageKey,
|
|
FileType: in.FileType,
|
|
SizeBytes: in.Size,
|
|
ContentType: in.ContentType,
|
|
})
|
|
if err != nil {
|
|
// Compensate: the row was not written, so the object would be orphaned.
|
|
_ = s.store.Remove(ctx, storageKey)
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
// Kick off the appropriate async job per file type. If enqueueing fails the
|
|
// row exists, so record the failure rather than leaving it stuck.
|
|
if err := s.enqueueProcessing(ctx, dataset); err != nil {
|
|
_ = s.repo.MarkParseFailed(ctx, dataset.ID, "failed to enqueue processing: "+err.Error())
|
|
return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// enqueueProcessing schedules the appropriate async job for a dataset based on
|
|
// its file type: vector_with_kato is parsed for column selection, plain vector
|
|
// has its attribute table extracted into properties, and raster is converted to
|
|
// a COG.
|
|
func (s *DatasetService) enqueueProcessing(ctx context.Context, d domain.Dataset) error {
|
|
switch d.FileType {
|
|
case domain.FileTypeVectorWithKato:
|
|
return s.jobs.EnqueueParse(ctx, d.ID)
|
|
case domain.FileTypeVector:
|
|
return s.jobs.EnqueueProperties(ctx, d.ID)
|
|
case domain.FileTypeRaster:
|
|
return s.jobs.EnqueueConvert(ctx, d.ID)
|
|
default:
|
|
return fmt.Errorf("%w: unknown file_type %q", domain.ErrValidation, d.FileType)
|
|
}
|
|
}
|
|
|
|
// Reprocess re-enqueues the processing job for an existing dataset, restarting
|
|
// its asynchronous pipeline from the step appropriate to its file type. Useful
|
|
// for retrying after a transient failure or a worker that was behind the schema.
|
|
func (s *DatasetService) Reprocess(ctx context.Context, id uuid.UUID) (domain.Dataset, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
if err := s.enqueueProcessing(ctx, dataset); err != nil {
|
|
return domain.Dataset{}, fmt.Errorf("enqueue processing: %w", err)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// ReprocessAll re-enqueues the processing job for every dataset, restarting each
|
|
// one's asynchronous pipeline from the step appropriate to its file type. It
|
|
// paginates through all datasets and continues past individual failures,
|
|
// returning how many jobs were enqueued and a per-dataset map of any failures.
|
|
func (s *DatasetService) ReprocessAll(ctx context.Context) (enqueued int, failures map[uuid.UUID]error, err error) {
|
|
failures = make(map[uuid.UUID]error)
|
|
for offset := 0; ; {
|
|
summaries, err := s.repo.ListSummaries(ctx, domain.DatasetFilter{}, MaxPageSize, offset)
|
|
if err != nil {
|
|
return enqueued, failures, err
|
|
}
|
|
if len(summaries) == 0 {
|
|
break
|
|
}
|
|
for _, sum := range summaries {
|
|
if e := s.enqueueProcessing(ctx, domain.Dataset{ID: sum.ID, FileType: sum.FileType}); e != nil {
|
|
failures[sum.ID] = e
|
|
continue
|
|
}
|
|
enqueued++
|
|
}
|
|
if len(summaries) < MaxPageSize {
|
|
break
|
|
}
|
|
offset += len(summaries)
|
|
}
|
|
return enqueued, failures, nil
|
|
}
|
|
|
|
// ExtractProperties reads a plain vector dataset's attribute table and spatial
|
|
// geometry and stores them (the attribute table as a JSON array of row objects
|
|
// in the properties column, the dissolved feature geometry in the geometry
|
|
// column), then marks the dataset ready. Invoked by the worker. Parse failures
|
|
// are recorded; storage failures are returned for retry. Geometry extraction is
|
|
// best-effort: a failure leaves geometry unset rather than failing the job.
|
|
func (s *DatasetService) ExtractProperties(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVector {
|
|
return nil // only plain vector populates properties
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient
|
|
}
|
|
|
|
rows, err := s.parseRows(dataset.Filename, data)
|
|
if err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
var properties []byte
|
|
if hasAttributeData(rows) {
|
|
if properties, err = json.Marshal(rows); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
geometry := s.vectorGeometry(ctx, dataset.Filename, data)
|
|
return s.repo.SetProperties(ctx, id, properties, geometry)
|
|
}
|
|
|
|
// vectorGeometry dissolves a vector file's features into a single GeoJSON
|
|
// geometry via the converter. It writes the in-memory bytes to a temp file
|
|
// (preserving the extension so the converter detects the format) because the
|
|
// converter operates on file paths. Best-effort: any failure yields nil.
|
|
func (s *DatasetService) vectorGeometry(ctx context.Context, filename string, data []byte) []byte {
|
|
ext := strings.ToLower(filepath.Ext(filename))
|
|
f, err := os.CreateTemp("", "gis-vec-*"+ext)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer os.Remove(f.Name())
|
|
if _, err := f.Write(data); err != nil {
|
|
f.Close()
|
|
return nil
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
return nil
|
|
}
|
|
geom, _ := s.converter.VectorGeometry(ctx, f.Name())
|
|
return geom
|
|
}
|
|
|
|
// hasAttributeData reports whether any row carries at least one attribute.
|
|
func hasAttributeData(rows []map[string]string) bool {
|
|
for _, row := range rows {
|
|
if len(row) > 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ConvertToCOG converts a raster dataset to a Cloud-Optimized GeoTIFF, stores it
|
|
// under a new key, records the footprint geometry, and marks the dataset ready.
|
|
// Invoked by the worker. Conversion failures are recorded; storage failures are
|
|
// returned for retry.
|
|
func (s *DatasetService) ConvertToCOG(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeRaster {
|
|
return nil // nothing to convert
|
|
}
|
|
|
|
srcPath, cleanupSrc, err := s.downloadToTemp(ctx, dataset.StorageKey, "gis-src-*.tif")
|
|
if err != nil {
|
|
return fmt.Errorf("download raster %s: %w", id, err) // transient
|
|
}
|
|
defer cleanupSrc()
|
|
|
|
dstPath := srcPath + ".cog.tif"
|
|
defer os.Remove(dstPath)
|
|
|
|
footprint, _ := s.converter.Footprint(ctx, srcPath) // best-effort
|
|
|
|
if err := s.converter.ToCOG(ctx, srcPath, dstPath); err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
cogKey := deriveCOGKey(dataset.StorageKey)
|
|
if err := s.uploadFile(ctx, cogKey, dstPath, "image/tiff"); err != nil {
|
|
return fmt.Errorf("upload cog %s: %w", id, err) // transient
|
|
}
|
|
return s.repo.MarkConverted(ctx, id, cogKey, footprint)
|
|
}
|
|
|
|
// downloadToTemp streams an object to a temp file and returns its path and a
|
|
// cleanup func.
|
|
func (s *DatasetService) downloadToTemp(ctx context.Context, key, pattern string) (string, func(), error) {
|
|
obj, err := s.store.Get(ctx, key)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
defer obj.Close()
|
|
|
|
f, err := os.CreateTemp("", pattern)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
if _, err := io.Copy(f, obj); err != nil {
|
|
f.Close()
|
|
os.Remove(f.Name())
|
|
return "", nil, err
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
os.Remove(f.Name())
|
|
return "", nil, err
|
|
}
|
|
return f.Name(), func() { os.Remove(f.Name()) }, nil
|
|
}
|
|
|
|
// uploadFile streams a local file to the object store.
|
|
func (s *DatasetService) uploadFile(ctx context.Context, key, filePath, contentType string) error {
|
|
f, err := os.Open(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
info, err := f.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.store.Put(ctx, key, f, info.Size(), contentType)
|
|
}
|
|
|
|
// deriveCOGKey places the COG alongside the original under a cog/ prefix.
|
|
func deriveCOGKey(storageKey string) string {
|
|
return path.Join(path.Dir(storageKey), "cog", path.Base(storageKey))
|
|
}
|
|
|
|
// Parse reads a vector_with_kato dataset's file, detects its attribute columns,
|
|
// and moves it to awaiting_mapping. It is invoked by the worker. Permanent
|
|
// parse failures are recorded on the dataset (and not retried); transient
|
|
// failures are returned to the caller.
|
|
func (s *DatasetService) Parse(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVectorWithKato {
|
|
return nil // nothing to parse
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient; allow retry
|
|
}
|
|
|
|
cols, err := s.parseColumns(dataset.Filename, data)
|
|
if err != nil {
|
|
// Permanent: the file could not be parsed. Record and stop.
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error())
|
|
}
|
|
return s.repo.MarkParsed(ctx, id, cols)
|
|
}
|
|
|
|
func (s *DatasetService) fetchObject(ctx context.Context, key string) ([]byte, error) {
|
|
obj, err := s.store.Get(ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer obj.Close()
|
|
return io.ReadAll(io.LimitReader(obj, maxParseBytes))
|
|
}
|
|
|
|
// MappingInput carries the user's KATO column choice and year-column mapping.
|
|
type MappingInput struct {
|
|
KatoColumn string
|
|
YearColumns []domain.YearColumn
|
|
}
|
|
|
|
// SaveMapping validates the KATO column and year mapping against the dataset's
|
|
// detected columns and marks the dataset ready.
|
|
func (s *DatasetService) SaveMapping(ctx context.Context, id uuid.UUID, in MappingInput) (domain.Dataset, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
if dataset.FileType != domain.FileTypeVectorWithKato {
|
|
return domain.Dataset{}, fmt.Errorf("%w: mapping only applies to vector_with_kato datasets", domain.ErrValidation)
|
|
}
|
|
if dataset.Status != domain.DatasetStatusAwaitingMapping && dataset.Status != domain.DatasetStatusReady {
|
|
return domain.Dataset{}, fmt.Errorf("%w: dataset is not ready for mapping (status %q)", domain.ErrConflict, dataset.Status)
|
|
}
|
|
|
|
known := make(map[string]struct{}, len(dataset.AttributeColumns))
|
|
for _, c := range dataset.AttributeColumns {
|
|
known[c.Name] = struct{}{}
|
|
}
|
|
if _, ok := known[in.KatoColumn]; !ok {
|
|
return domain.Dataset{}, fmt.Errorf("%w: kato_column %q is not among the detected columns", domain.ErrValidation, in.KatoColumn)
|
|
}
|
|
if len(in.YearColumns) == 0 {
|
|
return domain.Dataset{}, fmt.Errorf("%w: at least one year column mapping is required", domain.ErrValidation)
|
|
}
|
|
for _, yc := range in.YearColumns {
|
|
if _, ok := known[yc.Column]; !ok {
|
|
return domain.Dataset{}, fmt.Errorf("%w: year column %q is not among the detected columns", domain.ErrValidation, yc.Column)
|
|
}
|
|
if _, err := time.Parse("2006-01-02", yc.Date); err != nil {
|
|
return domain.Dataset{}, fmt.Errorf("%w: invalid date %q for column %q (want YYYY-MM-DD)", domain.ErrValidation, yc.Date, yc.Column)
|
|
}
|
|
}
|
|
|
|
dataset, err = s.repo.SaveMapping(ctx, id, in.KatoColumn, in.YearColumns)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
if err := s.jobs.EnqueueExtract(ctx, id); err != nil {
|
|
_ = s.repo.MarkParseFailed(ctx, id, "failed to enqueue extraction: "+err.Error())
|
|
return domain.Dataset{}, fmt.Errorf("enqueue extract: %w", err)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// Extract reads a mapped dataset's file, unpivots its attribute table into
|
|
// observations keyed by KATO code and date, records the dissolved feature
|
|
// geometry, and marks the dataset ready. It is invoked by the worker. Permanent
|
|
// failures (unparsable file) are recorded; transient failures (storage/DB) are
|
|
// returned for retry. Geometry extraction is best-effort: a failure leaves
|
|
// geometry unset rather than failing the job.
|
|
func (s *DatasetService) Extract(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dataset.KatoColumn == nil || len(dataset.YearColumns) == 0 {
|
|
return fmt.Errorf("dataset %s has no mapping to extract", id)
|
|
}
|
|
|
|
data, err := s.fetchObject(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return fmt.Errorf("read dataset %s: %w", id, err) // transient
|
|
}
|
|
|
|
rows, err := s.parseRows(dataset.Filename, data)
|
|
if err != nil {
|
|
return s.repo.MarkParseFailed(ctx, id, err.Error()) // permanent
|
|
}
|
|
|
|
obs := buildObservations(id, *dataset.KatoColumn, dataset.YearColumns, rows)
|
|
if err := s.repo.ReplaceObservations(ctx, id, obs); err != nil {
|
|
return err // transient
|
|
}
|
|
|
|
geometry := s.vectorGeometry(ctx, dataset.Filename, data)
|
|
return s.repo.MarkReady(ctx, id, geometry)
|
|
}
|
|
|
|
// buildObservations unpivots rows into observations. Rows without a KATO code
|
|
// are skipped; duplicate (kato, date) pairs keep the last value. Numeric cells
|
|
// populate Value, others ValueText.
|
|
func buildObservations(datasetID uuid.UUID, katoColumn string, years []domain.YearColumn, rows []map[string]string) []domain.Observation {
|
|
obs := make([]domain.Observation, 0, len(rows)*len(years))
|
|
index := make(map[string]int)
|
|
|
|
for _, row := range rows {
|
|
kato := strings.TrimSpace(row[katoColumn])
|
|
if kato == "" {
|
|
continue
|
|
}
|
|
for _, yc := range years {
|
|
o := domain.Observation{DatasetID: datasetID, KatoCode: kato, Date: yc.Date}
|
|
if raw := strings.TrimSpace(row[yc.Column]); raw != "" {
|
|
if f, err := strconv.ParseFloat(raw, 64); err == nil {
|
|
o.Value = &f
|
|
} else {
|
|
o.ValueText = &raw
|
|
}
|
|
}
|
|
key := kato + "\x00" + yc.Date
|
|
if i, ok := index[key]; ok {
|
|
obs[i] = o
|
|
} else {
|
|
index[key] = len(obs)
|
|
obs = append(obs, o)
|
|
}
|
|
}
|
|
}
|
|
return obs
|
|
}
|
|
|
|
// ObservationPage is a page of observations with pagination metadata.
|
|
type ObservationPage struct {
|
|
Items []domain.Observation
|
|
Page int
|
|
PageSize int
|
|
Total int
|
|
}
|
|
|
|
// ListObservations returns a page of a dataset's observations, optionally
|
|
// filtered by KATO code.
|
|
func (s *DatasetService) ListObservations(ctx context.Context, id uuid.UUID, katoCode *string, page, pageSize int) (ObservationPage, error) {
|
|
if _, err := s.repo.GetByID(ctx, id); err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 {
|
|
pageSize = DefaultPageSize
|
|
}
|
|
if pageSize > MaxPageSize {
|
|
pageSize = MaxPageSize
|
|
}
|
|
|
|
items, err := s.repo.ListObservations(ctx, id, katoCode, pageSize, (page-1)*pageSize)
|
|
if err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
total, err := s.repo.CountObservations(ctx, id, katoCode)
|
|
if err != nil {
|
|
return ObservationPage{}, err
|
|
}
|
|
return ObservationPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil
|
|
}
|
|
|
|
// GeoJSON assembles a GeoJSON FeatureCollection (RFC 7946) for a vector or
|
|
// vector_with_kato dataset.
|
|
//
|
|
// A plain vector dataset has no KATO mapping or observations, so the result is a
|
|
// single geometry-only Feature wrapping the dataset's own (dissolved) geometry,
|
|
// with empty properties (or an empty collection when the dataset has no
|
|
// geometry).
|
|
//
|
|
// A vector_with_kato dataset is built from its observations: when it carries its
|
|
// own (dissolved) geometry the observations are taken to describe that whole
|
|
// geometry, yielding a single Feature whose properties hold only the
|
|
// observations keyed by KATO code (each KATO mapping to its date->value pairs);
|
|
// otherwise one Feature is emitted per KATO, its boundary taken from the
|
|
// districts table and its observation values placed into the Feature's
|
|
// properties keyed by date. KATO codes with no matching district are skipped.
|
|
//
|
|
// Only ready datasets are served; a dataset still being processed yields a
|
|
// conflict.
|
|
func (s *DatasetService) GeoJSON(ctx context.Context, id uuid.UUID) (domain.FeatureCollection, error) {
|
|
dataset, err := s.loadGeoJSONDataset(ctx, id, true)
|
|
if err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
|
|
// Plain vector: no KATO mapping or observations. Return the dataset's own
|
|
// geometry as a single Feature, exposing the extracted attribute table (e.g.
|
|
// a GeoPackage's table data) as the Feature's top-level properties.
|
|
if dataset.FileType == domain.FileTypeVector {
|
|
fc := domain.FeatureCollection{Type: domain.GeoJSONFeatureCollection, Features: []domain.Feature{}}
|
|
if hasGeometry(dataset.Geometry) {
|
|
fc.Features = append(fc.Features, domain.Feature{
|
|
Type: domain.GeoJSONFeature,
|
|
Geometry: dataset.Geometry,
|
|
Properties: vectorFeatureProperties(dataset.Properties),
|
|
})
|
|
}
|
|
return fc, nil
|
|
}
|
|
|
|
obs, err := s.repo.ListAllObservations(ctx, id)
|
|
if err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
grouped, order := groupObservationsByKato(obs)
|
|
|
|
// The dataset has its own geometry (the dissolved union of all features): the
|
|
// observations describe that whole geometry, so emit a single Feature wrapping
|
|
// it whose properties hold only the observations, keyed by KATO code.
|
|
if hasGeometry(dataset.Geometry) {
|
|
props := make(map[string]any, len(order))
|
|
for _, kato := range order {
|
|
props[kato] = grouped[kato]
|
|
}
|
|
return domain.FeatureCollection{
|
|
Type: domain.GeoJSONFeatureCollection,
|
|
Features: []domain.Feature{{
|
|
Type: domain.GeoJSONFeature,
|
|
Geometry: dataset.Geometry,
|
|
Properties: props,
|
|
}},
|
|
}, nil
|
|
}
|
|
|
|
// No geometry: build one Feature per KATO from the districts table.
|
|
features, err := s.districtFeatures(ctx, grouped, order)
|
|
if err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
return domain.FeatureCollection{Type: domain.GeoJSONFeatureCollection, Features: features}, nil
|
|
}
|
|
|
|
// KatoGeoJSON assembles a GeoJSON FeatureCollection (RFC 7946) for a
|
|
// vector_with_kato dataset by always joining the districts table on KATO code,
|
|
// ignoring any geometry the dataset carries. One Feature is emitted per KATO,
|
|
// its boundary taken from the districts table and its observation values placed
|
|
// into the Feature's properties keyed by date (alongside `kato` and `name`).
|
|
// KATO codes with no matching district are skipped. Plain vector datasets are
|
|
// not supported (they have no KATO observations). Only ready datasets are
|
|
// served; a dataset still being processed yields a conflict.
|
|
func (s *DatasetService) KatoGeoJSON(ctx context.Context, id uuid.UUID) (domain.FeatureCollection, error) {
|
|
if _, err := s.loadGeoJSONDataset(ctx, id, false); err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
obs, err := s.repo.ListAllObservations(ctx, id)
|
|
if err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
grouped, order := groupObservationsByKato(obs)
|
|
features, err := s.districtFeatures(ctx, grouped, order)
|
|
if err != nil {
|
|
return domain.FeatureCollection{}, err
|
|
}
|
|
return domain.FeatureCollection{Type: domain.GeoJSONFeatureCollection, Features: features}, nil
|
|
}
|
|
|
|
// loadGeoJSONDataset fetches a dataset for a GeoJSON endpoint and validates that
|
|
// it is ready and of a supported file type. vector_with_kato is always
|
|
// accepted; plain vector is accepted only when allowVector is true (the
|
|
// .kato.geojson endpoint requires KATO observations, which plain vector lacks).
|
|
func (s *DatasetService) loadGeoJSONDataset(ctx context.Context, id uuid.UUID, allowVector bool) (domain.Dataset, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, err
|
|
}
|
|
|
|
supported := dataset.FileType == domain.FileTypeVectorWithKato ||
|
|
(allowVector && dataset.FileType == domain.FileTypeVector)
|
|
if !supported {
|
|
allowed := "vector_with_kato"
|
|
if allowVector {
|
|
allowed = "vector and vector_with_kato"
|
|
}
|
|
return domain.Dataset{}, fmt.Errorf("%w: geojson is only available for %s datasets", domain.ErrValidation, allowed)
|
|
}
|
|
if dataset.Status != domain.DatasetStatusReady {
|
|
return domain.Dataset{}, fmt.Errorf("%w: dataset is not ready (status %q)", domain.ErrConflict, dataset.Status)
|
|
}
|
|
return dataset, nil
|
|
}
|
|
|
|
// districtFeatures builds one Feature per KATO from the districts table, placing
|
|
// the grouped observation values into each Feature's properties keyed by date
|
|
// (alongside `kato` and `name`). KATO codes with no matching district are
|
|
// skipped. order drives the deterministic feature order.
|
|
func (s *DatasetService) districtFeatures(ctx context.Context, grouped map[string]map[string]any, order []string) ([]domain.Feature, error) {
|
|
districts, err := s.repo.DistrictGeometriesByKato(ctx, order)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
features := make([]domain.Feature, 0, len(order))
|
|
for _, kato := range order {
|
|
dist, ok := districts[kato]
|
|
if !ok {
|
|
continue // skip KATO codes with no district boundary
|
|
}
|
|
props := map[string]any{"kato": kato, "name": dist.Name}
|
|
for date, value := range grouped[kato] {
|
|
props[date] = value
|
|
}
|
|
features = append(features, domain.Feature{
|
|
Type: domain.GeoJSONFeature,
|
|
Geometry: dist.Geometry,
|
|
Properties: props,
|
|
})
|
|
}
|
|
return features, nil
|
|
}
|
|
|
|
// hasGeometry reports whether a dataset's GeoJSON geometry (as produced by
|
|
// ST_AsGeoJSON) is a real geometry rather than absent or JSON null.
|
|
func hasGeometry(geom json.RawMessage) bool {
|
|
t := bytes.TrimSpace(geom)
|
|
return len(t) > 0 && !bytes.Equal(t, []byte("null"))
|
|
}
|
|
|
|
// vectorFeatureProperties turns a plain vector dataset's stored attribute table
|
|
// into a Feature's properties object. The table is persisted as a JSON array of
|
|
// row objects: a single row becomes the top-level properties directly, multiple
|
|
// rows are kept under a "rows" key (so no data is lost while the value stays a
|
|
// valid GeoJSON properties object), and an empty/absent table yields {}.
|
|
func vectorFeatureProperties(raw json.RawMessage) map[string]any {
|
|
if len(bytes.TrimSpace(raw)) == 0 {
|
|
return map[string]any{}
|
|
}
|
|
var rows []map[string]any
|
|
if err := json.Unmarshal(raw, &rows); err == nil {
|
|
switch len(rows) {
|
|
case 0:
|
|
return map[string]any{}
|
|
case 1:
|
|
return rows[0]
|
|
default:
|
|
return map[string]any{"rows": rows}
|
|
}
|
|
}
|
|
// Fallback: the column already holds a plain object.
|
|
var obj map[string]any
|
|
if err := json.Unmarshal(raw, &obj); err == nil {
|
|
return obj
|
|
}
|
|
return map[string]any{}
|
|
}
|
|
|
|
// groupObservationsByKato groups observations by KATO code into date->value
|
|
// maps. The value is the numeric Value when present, otherwise ValueText,
|
|
// otherwise nil (an empty cell). order lists KATO codes in first-seen order so
|
|
// the resulting feature order is deterministic.
|
|
func groupObservationsByKato(obs []domain.Observation) (map[string]map[string]any, []string) {
|
|
grouped := make(map[string]map[string]any)
|
|
order := make([]string, 0)
|
|
for _, o := range obs {
|
|
values, ok := grouped[o.KatoCode]
|
|
if !ok {
|
|
values = make(map[string]any)
|
|
grouped[o.KatoCode] = values
|
|
order = append(order, o.KatoCode)
|
|
}
|
|
values[o.Date] = observationValue(o)
|
|
}
|
|
return grouped, order
|
|
}
|
|
|
|
// observationValue returns the typed cell value: numeric Value, else ValueText,
|
|
// else nil.
|
|
func observationValue(o domain.Observation) any {
|
|
switch {
|
|
case o.Value != nil:
|
|
return *o.Value
|
|
case o.ValueText != nil:
|
|
return *o.ValueText
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Get returns a dataset by id.
|
|
func (s *DatasetService) Get(ctx context.Context, id uuid.UUID) (domain.Dataset, error) {
|
|
return s.repo.GetByID(ctx, id)
|
|
}
|
|
|
|
// Status-wait bounds and polling cadence for long polling.
|
|
const (
|
|
DefaultStatusWait = 25 * time.Second
|
|
MaxStatusWait = 60 * time.Second
|
|
statusPollInterval = 1 * time.Second
|
|
)
|
|
|
|
// DatasetStatusInfo is the minimal status view returned by long polling.
|
|
type DatasetStatusInfo struct {
|
|
ID uuid.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
ParseError *string `json:"parse_error"`
|
|
}
|
|
|
|
// WaitForStatus implements long polling. If current is empty or already differs
|
|
// from the dataset's status it returns immediately; otherwise it waits (up to
|
|
// wait, clamped to MaxStatusWait) for the status to change, returning the latest
|
|
// status on change or on timeout.
|
|
func (s *DatasetService) WaitForStatus(ctx context.Context, id uuid.UUID, current string, wait time.Duration) (DatasetStatusInfo, error) {
|
|
if wait <= 0 {
|
|
wait = DefaultStatusWait
|
|
}
|
|
if wait > MaxStatusWait {
|
|
wait = MaxStatusWait
|
|
}
|
|
deadline := time.Now().Add(wait)
|
|
|
|
for {
|
|
d, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return DatasetStatusInfo{}, err
|
|
}
|
|
if current == "" || d.Status != current || !time.Now().Before(deadline) {
|
|
return DatasetStatusInfo{ID: d.ID, Status: d.Status, ParseError: d.ParseError}, nil
|
|
}
|
|
|
|
sleep := statusPollInterval
|
|
if rem := time.Until(deadline); rem < sleep {
|
|
sleep = rem
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return DatasetStatusInfo{}, ctx.Err()
|
|
case <-time.After(sleep):
|
|
}
|
|
}
|
|
}
|
|
|
|
// ListSummaries returns a page of dataset summaries matching filter, always
|
|
// ordered by created_at descending. page is 1-based; page and pageSize are
|
|
// clamped to sane bounds. When categoryCode is set it is resolved to its
|
|
// category id (overriding filter.CategoryID); an unknown code yields an empty
|
|
// page.
|
|
func (s *DatasetService) ListSummaries(ctx context.Context, filter domain.DatasetFilter, categoryCode *string, page, pageSize int) (DatasetPage, error) {
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 {
|
|
pageSize = DefaultPageSize
|
|
}
|
|
if pageSize > MaxPageSize {
|
|
pageSize = MaxPageSize
|
|
}
|
|
|
|
if categoryCode != nil {
|
|
category, err := s.categories.GetByCode(ctx, *categoryCode)
|
|
if err != nil {
|
|
if errors.Is(err, domain.ErrNotFound) {
|
|
return DatasetPage{Items: []domain.DatasetSummary{}, Page: page, PageSize: pageSize}, nil
|
|
}
|
|
return DatasetPage{}, err
|
|
}
|
|
filter.CategoryID = &category.ID
|
|
}
|
|
|
|
items, err := s.repo.ListSummaries(ctx, filter, pageSize, (page-1)*pageSize)
|
|
if err != nil {
|
|
return DatasetPage{}, err
|
|
}
|
|
total, err := s.repo.Count(ctx, filter)
|
|
if err != nil {
|
|
return DatasetPage{}, err
|
|
}
|
|
return DatasetPage{Items: items, Page: page, PageSize: pageSize, Total: total}, nil
|
|
}
|
|
|
|
// Download returns the dataset metadata and a reader for its stored object. The
|
|
// caller must close the reader.
|
|
func (s *DatasetService) Download(ctx context.Context, id uuid.UUID) (domain.Dataset, io.ReadCloser, error) {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return domain.Dataset{}, nil, err
|
|
}
|
|
obj, err := s.store.Get(ctx, dataset.StorageKey)
|
|
if err != nil {
|
|
return domain.Dataset{}, nil, err
|
|
}
|
|
return dataset, obj, nil
|
|
}
|
|
|
|
// Delete removes the dataset row and its stored object.
|
|
func (s *DatasetService) Delete(ctx context.Context, id uuid.UUID) error {
|
|
dataset, err := s.repo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := s.repo.Delete(ctx, id); err != nil {
|
|
return err
|
|
}
|
|
if err := s.store.Remove(ctx, dataset.StorageKey); err != nil {
|
|
// The row is already gone; surface the object-store failure to the caller.
|
|
return err
|
|
}
|
|
return nil
|
|
}
|