package postgres import ( "context" "encoding/json" "fmt" "strings" "time" "gis/internal/domain" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // DatasetRepository persists datasets in Postgres. type DatasetRepository struct { pool *pgxpool.Pool } // NewDatasetRepository returns a DatasetRepository backed by the given pool. func NewDatasetRepository(pool *pgxpool.Pool) *DatasetRepository { return &DatasetRepository{pool: pool} } // datasetColumns lists the dataset columns for SELECT and RETURNING. The // geometry is exposed as GeoJSON (jsonb) rather than its raw EWKB form, and a // bounding box array is derived for raster datasets only. const datasetColumns = `id, category_id, name, description, unit, filename, storage_key, cog_storage_key, file_type, size_bytes, content_type, properties, meta, automated, status, attribute_columns, kato_column, year_columns, parse_error, ST_AsGeoJSON(geometry)::jsonb AS geometry, CASE WHEN file_type = 'raster' AND geometry IS NOT NULL THEN ARRAY[ST_XMin(geometry), ST_YMin(geometry), ST_XMax(geometry), ST_YMax(geometry)] ELSE NULL END AS bbox, geojson, created_at, updated_at` func scanDataset(row pgx.Row) (domain.Dataset, error) { var d domain.Dataset err := row.Scan( &d.ID, &d.CategoryID, &d.Name, &d.Description, &d.Unit, &d.Filename, &d.StorageKey, &d.CogStorageKey, &d.FileType, &d.SizeBytes, &d.ContentType, &d.Properties, &d.Meta, &d.Automated, &d.Status, &d.AttributeColumns, &d.KatoColumn, &d.YearColumns, &d.ParseError, &d.Geometry, &d.BBox, &d.GeoJSON, &d.CreatedAt, &d.UpdatedAt, ) return d, err } // nullableJSON returns nil for empty JSON so the column is stored as SQL NULL // rather than an empty/invalid value. func nullableJSON(raw json.RawMessage) any { if len(raw) == 0 { return nil } return raw } // Create inserts a new dataset and returns the stored row. func (r *DatasetRepository) Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error) { row := r.pool.QueryRow(ctx, `INSERT INTO datasets (category_id, name, description, unit, filename, storage_key, file_type, size_bytes, content_type, properties, meta, automated, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING `+datasetColumns, d.CategoryID, d.Name, d.Description, d.Unit, d.Filename, d.StorageKey, d.FileType, d.SizeBytes, d.ContentType, nullableJSON(d.Properties), nullableJSON(d.Meta), d.Automated, d.Status, ) out, err := scanDataset(row) return out, mapError(err) } // MarkParsed stores the detected attribute columns and moves the dataset to // awaiting_mapping, clearing any previous parse error. func (r *DatasetRepository) MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error { tag, err := r.pool.Exec(ctx, `UPDATE datasets SET attribute_columns = $2, status = $3, parse_error = NULL, updated_at = now() WHERE id = $1`, id, cols, domain.DatasetStatusAwaitingMapping, ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // MarkParseFailed records a parse failure reason and sets the failed status. func (r *DatasetRepository) MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error { tag, err := r.pool.Exec(ctx, `UPDATE datasets SET status = $2, parse_error = $3, updated_at = now() WHERE id = $1`, id, domain.DatasetStatusFailed, reason, ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // SaveMapping stores the KATO column and year mapping, moves the dataset to // extracting, and returns the updated row. func (r *DatasetRepository) SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error) { row := r.pool.QueryRow(ctx, `UPDATE datasets SET kato_column = $2, year_columns = $3, status = $4, parse_error = NULL, updated_at = now() WHERE id = $1 RETURNING `+datasetColumns, id, katoColumn, years, domain.DatasetStatusExtracting, ) out, err := scanDataset(row) return out, mapError(err) } // MarkConverted stores the COG storage key, optionally sets the footprint // geometry (GeoJSON in EPSG:4326; nil keeps the existing geometry), and marks // the dataset ready. func (r *DatasetRepository) MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error { var fp any // nil -> SQL NULL -> CASE keeps existing geometry if len(footprint) > 0 { fp = string(footprint) } tag, err := r.pool.Exec(ctx, `UPDATE datasets SET cog_storage_key = $2, geometry = CASE WHEN $3::text IS NULL THEN geometry ELSE ST_SetSRID(ST_GeomFromGeoJSON($3), 4326) END, status = $4, parse_error = NULL, updated_at = now() WHERE id = $1`, id, cogKey, fp, domain.DatasetStatusReady, ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // SetProperties stores the extracted attribute table (nil -> NULL) and the // dissolved feature geometry (GeoJSON in EPSG:4326; nil keeps the existing // geometry), then marks the dataset ready. The geometry is reduced to the union // of all features via ST_UnaryUnion. func (r *DatasetRepository) SetProperties(ctx context.Context, id uuid.UUID, properties, geometry []byte) error { var geom any // nil -> SQL NULL -> CASE keeps existing geometry if len(geometry) > 0 { geom = string(geometry) } tag, err := r.pool.Exec(ctx, `UPDATE datasets SET properties = $2, geometry = CASE WHEN $3::text IS NULL THEN geometry ELSE ST_UnaryUnion(ST_SetSRID(ST_GeomFromGeoJSON($3), 4326)) END, status = $4, parse_error = NULL, updated_at = now() WHERE id = $1`, id, nullableJSON(json.RawMessage(properties)), geom, domain.DatasetStatusReady, ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // MarkReady sets the dataset status to ready, stores the dissolved feature // geometry (GeoJSON in EPSG:4326; nil keeps the existing geometry, reduced to // the union of all features via ST_UnaryUnion), and clears any error. func (r *DatasetRepository) MarkReady(ctx context.Context, id uuid.UUID, geometry []byte) error { var geom any // nil -> SQL NULL -> CASE keeps existing geometry if len(geometry) > 0 { geom = string(geometry) } tag, err := r.pool.Exec(ctx, `UPDATE datasets SET status = $2, geometry = CASE WHEN $3::text IS NULL THEN geometry ELSE ST_UnaryUnion(ST_SetSRID(ST_GeomFromGeoJSON($3), 4326)) END, parse_error = NULL, updated_at = now() WHERE id = $1`, id, domain.DatasetStatusReady, geom, ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // SetGeoJSON stores the pre-assembled GeoJSON FeatureCollection for a dataset // (nil -> NULL). It is written at processing time and served verbatim by the // .geojson endpoint. func (r *DatasetRepository) SetGeoJSON(ctx context.Context, id uuid.UUID, geojson []byte) error { tag, err := r.pool.Exec(ctx, `UPDATE datasets SET geojson = $2, updated_at = now() WHERE id = $1`, id, nullableJSON(json.RawMessage(geojson)), ) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil } // ReplaceObservations atomically replaces all observations for a dataset. func (r *DatasetRepository) ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error { tx, err := r.pool.Begin(ctx) if err != nil { return mapError(err) } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, `DELETE FROM dataset_observations WHERE dataset_id = $1`, datasetID); err != nil { return mapError(err) } if len(obs) > 0 { rows := make([][]any, len(obs)) for i, o := range obs { d, err := time.Parse("2006-01-02", o.Date) if err != nil { return fmt.Errorf("invalid observation date %q: %w", o.Date, err) } rows[i] = []any{datasetID, o.KatoCode, d, o.Value, o.ValueText} } _, err := tx.CopyFrom(ctx, pgx.Identifier{"dataset_observations"}, []string{"dataset_id", "kato_code", "date", "value", "value_text"}, pgx.CopyFromRows(rows), ) if err != nil { return mapError(err) } } return mapError(tx.Commit(ctx)) } const observationColumns = `id, dataset_id, kato_code, to_char(date, 'YYYY-MM-DD') AS date, value, value_text` func scanObservation(row pgx.Row) (domain.Observation, error) { var o domain.Observation err := row.Scan(&o.ID, &o.DatasetID, &o.KatoCode, &o.Date, &o.Value, &o.ValueText) return o, err } // ListObservations returns a page of observations for a dataset, optionally // filtered by KATO code, ordered by (kato_code, date). func (r *DatasetRepository) ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error) { base := `SELECT ` + observationColumns + ` FROM dataset_observations WHERE dataset_id = $1` var ( rows pgx.Rows err error ) if katoCode != nil { rows, err = r.pool.Query(ctx, base+` AND kato_code = $2 ORDER BY kato_code, date LIMIT $3 OFFSET $4`, datasetID, *katoCode, limit, offset) } else { rows, err = r.pool.Query(ctx, base+` ORDER BY kato_code, date LIMIT $2 OFFSET $3`, datasetID, limit, offset) } if err != nil { return nil, mapError(err) } defer rows.Close() out := make([]domain.Observation, 0) for rows.Next() { o, err := scanObservation(rows) if err != nil { return nil, mapError(err) } out = append(out, o) } return out, mapError(rows.Err()) } // CountObservations counts a dataset's observations, optionally filtered by KATO. func (r *DatasetRepository) CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error) { var n int var err error if katoCode != nil { err = r.pool.QueryRow(ctx, `SELECT count(*) FROM dataset_observations WHERE dataset_id = $1 AND kato_code = $2`, datasetID, *katoCode).Scan(&n) } else { err = r.pool.QueryRow(ctx, `SELECT count(*) FROM dataset_observations WHERE dataset_id = $1`, datasetID).Scan(&n) } return n, mapError(err) } // GetByID returns the dataset with the given id, or domain.ErrNotFound. func (r *DatasetRepository) GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error) { row := r.pool.QueryRow(ctx, `SELECT `+datasetColumns+` FROM datasets WHERE id = $1`, id) out, err := scanDataset(row) return out, mapError(err) } const datasetSummaryColumns = `id, category_id, name, description, unit, file_type, size_bytes, status, created_at, updated_at` func scanDatasetSummary(row pgx.Row) (domain.DatasetSummary, error) { var d domain.DatasetSummary err := row.Scan( &d.ID, &d.CategoryID, &d.Name, &d.Description, &d.Unit, &d.FileType, &d.SizeBytes, &d.Status, &d.CreatedAt, &d.UpdatedAt, ) return d, err } // datasetFilterClause builds the WHERE fragment for the given filter, appending // its values to args. It returns a fragment beginning with " WHERE " when any // condition applies, or the empty string when the filter is empty. func datasetFilterClause(f domain.DatasetFilter, args []any) (string, []any) { var conds []string if f.CategoryID != nil { args = append(args, *f.CategoryID) conds = append(conds, fmt.Sprintf("category_id = $%d", len(args))) } if f.FileType != nil { args = append(args, *f.FileType) conds = append(conds, fmt.Sprintf("file_type = $%d", len(args))) } if f.Automated != nil { args = append(args, *f.Automated) conds = append(conds, fmt.Sprintf("automated = $%d", len(args))) } if f.Status != nil { args = append(args, *f.Status) conds = append(conds, fmt.Sprintf("status = $%d", len(args))) } if len(conds) == 0 { return "", args } return " WHERE " + strings.Join(conds, " AND "), args } // ListSummaries returns a page of dataset summaries ordered by creation time // (newest first), constrained by the given filter. func (r *DatasetRepository) ListSummaries(ctx context.Context, filter domain.DatasetFilter, limit, offset int) ([]domain.DatasetSummary, error) { where, args := datasetFilterClause(filter, nil) args = append(args, limit, offset) query := fmt.Sprintf(`SELECT %s FROM datasets%s ORDER BY created_at DESC LIMIT $%d OFFSET $%d`, datasetSummaryColumns, where, len(args)-1, len(args)) rows, err := r.pool.Query(ctx, query, args...) if err != nil { return nil, mapError(err) } defer rows.Close() summaries := make([]domain.DatasetSummary, 0) for rows.Next() { d, err := scanDatasetSummary(rows) if err != nil { return nil, mapError(err) } summaries = append(summaries, d) } return summaries, mapError(rows.Err()) } // Count returns the number of datasets matching the given filter. func (r *DatasetRepository) Count(ctx context.Context, filter domain.DatasetFilter) (int, error) { where, args := datasetFilterClause(filter, nil) var n int err := r.pool.QueryRow(ctx, `SELECT count(*) FROM datasets`+where, args...).Scan(&n) return n, mapError(err) } // Delete removes a dataset. Returns domain.ErrNotFound if it does not exist. func (r *DatasetRepository) Delete(ctx context.Context, id uuid.UUID) error { tag, err := r.pool.Exec(ctx, `DELETE FROM datasets WHERE id = $1`, id) if err != nil { return mapError(err) } if tag.RowsAffected() == 0 { return domain.ErrNotFound } return nil }