gis/internal/repository/postgres/dataset.go

350 lines
12 KiB
Go

package postgres
import (
"context"
"encoding/json"
"fmt"
"time"
"gis/internal/domain"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// DatasetRepository persists datasets in Postgres.
type DatasetRepository struct {
pool *pgxpool.Pool
}
// NewDatasetRepository returns a DatasetRepository backed by the given pool.
func NewDatasetRepository(pool *pgxpool.Pool) *DatasetRepository {
return &DatasetRepository{pool: pool}
}
// datasetColumns lists the dataset columns for SELECT and RETURNING. The
// geometry is exposed as GeoJSON (jsonb) rather than its raw EWKB form, and a
// bounding box array is derived for raster datasets only.
const datasetColumns = `id, category_id, name, description, unit, filename, storage_key, cog_storage_key, file_type, size_bytes, content_type, properties, meta, automated, status, attribute_columns, kato_column, year_columns, parse_error, ST_AsGeoJSON(geometry)::jsonb AS geometry,
CASE WHEN file_type = 'raster' AND geometry IS NOT NULL
THEN ARRAY[ST_XMin(geometry), ST_YMin(geometry), ST_XMax(geometry), ST_YMax(geometry)]
ELSE NULL END AS bbox,
created_at, updated_at`
func scanDataset(row pgx.Row) (domain.Dataset, error) {
var d domain.Dataset
err := row.Scan(
&d.ID, &d.CategoryID, &d.Name, &d.Description, &d.Unit,
&d.Filename, &d.StorageKey, &d.CogStorageKey, &d.FileType, &d.SizeBytes, &d.ContentType,
&d.Properties, &d.Meta, &d.Automated, &d.Status,
&d.AttributeColumns, &d.KatoColumn, &d.YearColumns, &d.ParseError,
&d.Geometry, &d.BBox, &d.CreatedAt, &d.UpdatedAt,
)
return d, err
}
// nullableJSON returns nil for empty JSON so the column is stored as SQL NULL
// rather than an empty/invalid value.
func nullableJSON(raw json.RawMessage) any {
if len(raw) == 0 {
return nil
}
return raw
}
// Create inserts a new dataset and returns the stored row.
func (r *DatasetRepository) Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error) {
row := r.pool.QueryRow(ctx,
`INSERT INTO datasets (category_id, name, description, unit, filename, storage_key, file_type, size_bytes, content_type, properties, meta, automated, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING `+datasetColumns,
d.CategoryID, d.Name, d.Description, d.Unit, d.Filename, d.StorageKey, d.FileType, d.SizeBytes, d.ContentType,
nullableJSON(d.Properties), nullableJSON(d.Meta), d.Automated, d.Status,
)
out, err := scanDataset(row)
return out, mapError(err)
}
// MarkParsed stores the detected attribute columns and moves the dataset to
// awaiting_mapping, clearing any previous parse error.
func (r *DatasetRepository) MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error {
tag, err := r.pool.Exec(ctx,
`UPDATE datasets
SET attribute_columns = $2, status = $3, parse_error = NULL, updated_at = now()
WHERE id = $1`,
id, cols, domain.DatasetStatusAwaitingMapping,
)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}
// MarkParseFailed records a parse failure reason and sets the failed status.
func (r *DatasetRepository) MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error {
tag, err := r.pool.Exec(ctx,
`UPDATE datasets SET status = $2, parse_error = $3, updated_at = now() WHERE id = $1`,
id, domain.DatasetStatusFailed, reason,
)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}
// SaveMapping stores the KATO column and year mapping, moves the dataset to
// extracting, and returns the updated row.
func (r *DatasetRepository) SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error) {
row := r.pool.QueryRow(ctx,
`UPDATE datasets
SET kato_column = $2, year_columns = $3, status = $4, parse_error = NULL, updated_at = now()
WHERE id = $1
RETURNING `+datasetColumns,
id, katoColumn, years, domain.DatasetStatusExtracting,
)
out, err := scanDataset(row)
return out, mapError(err)
}
// MarkConverted stores the COG storage key, optionally sets the footprint
// geometry (GeoJSON in EPSG:4326; nil keeps the existing geometry), and marks
// the dataset ready.
func (r *DatasetRepository) MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error {
var fp any // nil -> SQL NULL -> CASE keeps existing geometry
if len(footprint) > 0 {
fp = string(footprint)
}
tag, err := r.pool.Exec(ctx,
`UPDATE datasets
SET cog_storage_key = $2,
geometry = CASE WHEN $3::text IS NULL THEN geometry
ELSE ST_SetSRID(ST_GeomFromGeoJSON($3), 4326) END,
status = $4, parse_error = NULL, updated_at = now()
WHERE id = $1`,
id, cogKey, fp, domain.DatasetStatusReady,
)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}
// SetProperties stores the extracted attribute table (nil -> NULL) and the
// dissolved feature geometry (GeoJSON in EPSG:4326; nil keeps the existing
// geometry), then marks the dataset ready. The geometry is reduced to the union
// of all features via ST_UnaryUnion.
func (r *DatasetRepository) SetProperties(ctx context.Context, id uuid.UUID, properties, geometry []byte) error {
var geom any // nil -> SQL NULL -> CASE keeps existing geometry
if len(geometry) > 0 {
geom = string(geometry)
}
tag, err := r.pool.Exec(ctx,
`UPDATE datasets
SET properties = $2,
geometry = CASE WHEN $3::text IS NULL THEN geometry
ELSE ST_UnaryUnion(ST_SetSRID(ST_GeomFromGeoJSON($3), 4326)) END,
status = $4, parse_error = NULL, updated_at = now()
WHERE id = $1`,
id, nullableJSON(json.RawMessage(properties)), geom, domain.DatasetStatusReady,
)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}
// MarkReady sets the dataset status to ready and clears any error.
func (r *DatasetRepository) MarkReady(ctx context.Context, id uuid.UUID) error {
tag, err := r.pool.Exec(ctx,
`UPDATE datasets SET status = $2, parse_error = NULL, updated_at = now() WHERE id = $1`,
id, domain.DatasetStatusReady,
)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}
// ReplaceObservations atomically replaces all observations for a dataset.
func (r *DatasetRepository) ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return mapError(err)
}
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, `DELETE FROM dataset_observations WHERE dataset_id = $1`, datasetID); err != nil {
return mapError(err)
}
if len(obs) > 0 {
rows := make([][]any, len(obs))
for i, o := range obs {
d, err := time.Parse("2006-01-02", o.Date)
if err != nil {
return fmt.Errorf("invalid observation date %q: %w", o.Date, err)
}
rows[i] = []any{datasetID, o.KatoCode, d, o.Value, o.ValueText}
}
_, err := tx.CopyFrom(ctx,
pgx.Identifier{"dataset_observations"},
[]string{"dataset_id", "kato_code", "date", "value", "value_text"},
pgx.CopyFromRows(rows),
)
if err != nil {
return mapError(err)
}
}
return mapError(tx.Commit(ctx))
}
const observationColumns = `id, dataset_id, kato_code, to_char(date, 'YYYY-MM-DD') AS date, value, value_text`
func scanObservation(row pgx.Row) (domain.Observation, error) {
var o domain.Observation
err := row.Scan(&o.ID, &o.DatasetID, &o.KatoCode, &o.Date, &o.Value, &o.ValueText)
return o, err
}
// ListObservations returns a page of observations for a dataset, optionally
// filtered by KATO code, ordered by (kato_code, date).
func (r *DatasetRepository) ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error) {
base := `SELECT ` + observationColumns + ` FROM dataset_observations WHERE dataset_id = $1`
var (
rows pgx.Rows
err error
)
if katoCode != nil {
rows, err = r.pool.Query(ctx,
base+` AND kato_code = $2 ORDER BY kato_code, date LIMIT $3 OFFSET $4`,
datasetID, *katoCode, limit, offset)
} else {
rows, err = r.pool.Query(ctx,
base+` ORDER BY kato_code, date LIMIT $2 OFFSET $3`,
datasetID, limit, offset)
}
if err != nil {
return nil, mapError(err)
}
defer rows.Close()
out := make([]domain.Observation, 0)
for rows.Next() {
o, err := scanObservation(rows)
if err != nil {
return nil, mapError(err)
}
out = append(out, o)
}
return out, mapError(rows.Err())
}
// CountObservations counts a dataset's observations, optionally filtered by KATO.
func (r *DatasetRepository) CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error) {
var n int
var err error
if katoCode != nil {
err = r.pool.QueryRow(ctx,
`SELECT count(*) FROM dataset_observations WHERE dataset_id = $1 AND kato_code = $2`,
datasetID, *katoCode).Scan(&n)
} else {
err = r.pool.QueryRow(ctx,
`SELECT count(*) FROM dataset_observations WHERE dataset_id = $1`, datasetID).Scan(&n)
}
return n, mapError(err)
}
// GetByID returns the dataset with the given id, or domain.ErrNotFound.
func (r *DatasetRepository) GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error) {
row := r.pool.QueryRow(ctx,
`SELECT `+datasetColumns+` FROM datasets WHERE id = $1`, id)
out, err := scanDataset(row)
return out, mapError(err)
}
const datasetSummaryColumns = `id, category_id, name, description, unit, file_type, size_bytes, status, created_at, updated_at`
func scanDatasetSummary(row pgx.Row) (domain.DatasetSummary, error) {
var d domain.DatasetSummary
err := row.Scan(
&d.ID, &d.CategoryID, &d.Name, &d.Description, &d.Unit,
&d.FileType, &d.SizeBytes, &d.Status, &d.CreatedAt, &d.UpdatedAt,
)
return d, err
}
// ListSummaries returns a page of dataset summaries ordered by creation time
// (newest first). When categoryID is non-nil it filters to that category.
func (r *DatasetRepository) ListSummaries(ctx context.Context, categoryID *uuid.UUID, limit, offset int) ([]domain.DatasetSummary, error) {
base := `SELECT ` + datasetSummaryColumns + ` FROM datasets`
var (
rows pgx.Rows
err error
)
if categoryID != nil {
rows, err = r.pool.Query(ctx,
base+` WHERE category_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`,
*categoryID, limit, offset)
} else {
rows, err = r.pool.Query(ctx,
base+` ORDER BY created_at DESC LIMIT $1 OFFSET $2`, limit, offset)
}
if err != nil {
return nil, mapError(err)
}
defer rows.Close()
summaries := make([]domain.DatasetSummary, 0)
for rows.Next() {
d, err := scanDatasetSummary(rows)
if err != nil {
return nil, mapError(err)
}
summaries = append(summaries, d)
}
return summaries, mapError(rows.Err())
}
// Count returns the number of datasets, optionally filtered to a category.
func (r *DatasetRepository) Count(ctx context.Context, categoryID *uuid.UUID) (int, error) {
var n int
var err error
if categoryID != nil {
err = r.pool.QueryRow(ctx, `SELECT count(*) FROM datasets WHERE category_id = $1`, *categoryID).Scan(&n)
} else {
err = r.pool.QueryRow(ctx, `SELECT count(*) FROM datasets`).Scan(&n)
}
return n, mapError(err)
}
// Delete removes a dataset. Returns domain.ErrNotFound if it does not exist.
func (r *DatasetRepository) Delete(ctx context.Context, id uuid.UUID) error {
tag, err := r.pool.Exec(ctx, `DELETE FROM datasets WHERE id = $1`, id)
if err != nil {
return mapError(err)
}
if tag.RowsAffected() == 0 {
return domain.ErrNotFound
}
return nil
}