339 lines
11 KiB
Go
339 lines
11 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"gis/internal/domain"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// DatasetRepository persists datasets in Postgres.
|
|
type DatasetRepository struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// NewDatasetRepository returns a DatasetRepository backed by the given pool.
|
|
func NewDatasetRepository(pool *pgxpool.Pool) *DatasetRepository {
|
|
return &DatasetRepository{pool: pool}
|
|
}
|
|
|
|
// datasetColumns lists the dataset columns for SELECT and RETURNING. The
|
|
// geometry is exposed as GeoJSON (jsonb) rather than its raw EWKB form, and a
|
|
// bounding box array is derived for raster datasets only.
|
|
const datasetColumns = `id, category_id, code, name, description, unit, filename, storage_key, cog_storage_key, file_type, size_bytes, content_type, properties, meta, automated, status, attribute_columns, kato_column, year_columns, parse_error, ST_AsGeoJSON(geometry)::jsonb AS geometry,
|
|
CASE WHEN file_type = 'raster' AND geometry IS NOT NULL
|
|
THEN ARRAY[ST_XMin(geometry), ST_YMin(geometry), ST_XMax(geometry), ST_YMax(geometry)]
|
|
ELSE NULL END AS bbox,
|
|
created_at, updated_at`
|
|
|
|
func scanDataset(row pgx.Row) (domain.Dataset, error) {
|
|
var d domain.Dataset
|
|
err := row.Scan(
|
|
&d.ID, &d.CategoryID, &d.Code, &d.Name, &d.Description, &d.Unit,
|
|
&d.Filename, &d.StorageKey, &d.CogStorageKey, &d.FileType, &d.SizeBytes, &d.ContentType,
|
|
&d.Properties, &d.Meta, &d.Automated, &d.Status,
|
|
&d.AttributeColumns, &d.KatoColumn, &d.YearColumns, &d.ParseError,
|
|
&d.Geometry, &d.BBox, &d.CreatedAt, &d.UpdatedAt,
|
|
)
|
|
return d, err
|
|
}
|
|
|
|
// nullableJSON returns nil for empty JSON so the column is stored as SQL NULL
|
|
// rather than an empty/invalid value.
|
|
func nullableJSON(raw json.RawMessage) any {
|
|
if len(raw) == 0 {
|
|
return nil
|
|
}
|
|
return raw
|
|
}
|
|
|
|
// Create inserts a new dataset and returns the stored row.
|
|
func (r *DatasetRepository) Create(ctx context.Context, d domain.Dataset) (domain.Dataset, error) {
|
|
row := r.pool.QueryRow(ctx,
|
|
`INSERT INTO datasets (category_id, code, name, description, unit, filename, storage_key, file_type, size_bytes, content_type, properties, meta, automated, status)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
|
RETURNING `+datasetColumns,
|
|
d.CategoryID, d.Code, d.Name, d.Description, d.Unit, d.Filename, d.StorageKey, d.FileType, d.SizeBytes, d.ContentType,
|
|
nullableJSON(d.Properties), nullableJSON(d.Meta), d.Automated, d.Status,
|
|
)
|
|
out, err := scanDataset(row)
|
|
return out, mapError(err)
|
|
}
|
|
|
|
// MarkParsed stores the detected attribute columns and moves the dataset to
|
|
// awaiting_mapping, clearing any previous parse error.
|
|
func (r *DatasetRepository) MarkParsed(ctx context.Context, id uuid.UUID, cols []domain.AttributeColumn) error {
|
|
tag, err := r.pool.Exec(ctx,
|
|
`UPDATE datasets
|
|
SET attribute_columns = $2, status = $3, parse_error = NULL, updated_at = now()
|
|
WHERE id = $1`,
|
|
id, cols, domain.DatasetStatusAwaitingMapping,
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarkParseFailed records a parse failure reason and sets the failed status.
|
|
func (r *DatasetRepository) MarkParseFailed(ctx context.Context, id uuid.UUID, reason string) error {
|
|
tag, err := r.pool.Exec(ctx,
|
|
`UPDATE datasets SET status = $2, parse_error = $3, updated_at = now() WHERE id = $1`,
|
|
id, domain.DatasetStatusFailed, reason,
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveMapping stores the KATO column and year mapping, moves the dataset to
|
|
// extracting, and returns the updated row.
|
|
func (r *DatasetRepository) SaveMapping(ctx context.Context, id uuid.UUID, katoColumn string, years []domain.YearColumn) (domain.Dataset, error) {
|
|
row := r.pool.QueryRow(ctx,
|
|
`UPDATE datasets
|
|
SET kato_column = $2, year_columns = $3, status = $4, parse_error = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING `+datasetColumns,
|
|
id, katoColumn, years, domain.DatasetStatusExtracting,
|
|
)
|
|
out, err := scanDataset(row)
|
|
return out, mapError(err)
|
|
}
|
|
|
|
// MarkConverted stores the COG storage key, optionally sets the footprint
|
|
// geometry (GeoJSON in EPSG:4326; nil keeps the existing geometry), and marks
|
|
// the dataset ready.
|
|
func (r *DatasetRepository) MarkConverted(ctx context.Context, id uuid.UUID, cogKey string, footprint []byte) error {
|
|
var fp any // nil -> SQL NULL -> CASE keeps existing geometry
|
|
if len(footprint) > 0 {
|
|
fp = string(footprint)
|
|
}
|
|
tag, err := r.pool.Exec(ctx,
|
|
`UPDATE datasets
|
|
SET cog_storage_key = $2,
|
|
geometry = CASE WHEN $3::text IS NULL THEN geometry
|
|
ELSE ST_SetSRID(ST_GeomFromGeoJSON($3), 4326) END,
|
|
status = $4, parse_error = NULL, updated_at = now()
|
|
WHERE id = $1`,
|
|
id, cogKey, fp, domain.DatasetStatusReady,
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetProperties stores the extracted attribute table (nil -> NULL) and marks the
|
|
// dataset ready.
|
|
func (r *DatasetRepository) SetProperties(ctx context.Context, id uuid.UUID, properties []byte) error {
|
|
tag, err := r.pool.Exec(ctx,
|
|
`UPDATE datasets SET properties = $2, status = $3, parse_error = NULL, updated_at = now() WHERE id = $1`,
|
|
id, nullableJSON(json.RawMessage(properties)), domain.DatasetStatusReady,
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarkReady sets the dataset status to ready and clears any error.
|
|
func (r *DatasetRepository) MarkReady(ctx context.Context, id uuid.UUID) error {
|
|
tag, err := r.pool.Exec(ctx,
|
|
`UPDATE datasets SET status = $2, parse_error = NULL, updated_at = now() WHERE id = $1`,
|
|
id, domain.DatasetStatusReady,
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReplaceObservations atomically replaces all observations for a dataset.
|
|
func (r *DatasetRepository) ReplaceObservations(ctx context.Context, datasetID uuid.UUID, obs []domain.Observation) error {
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
if _, err := tx.Exec(ctx, `DELETE FROM dataset_observations WHERE dataset_id = $1`, datasetID); err != nil {
|
|
return mapError(err)
|
|
}
|
|
|
|
if len(obs) > 0 {
|
|
rows := make([][]any, len(obs))
|
|
for i, o := range obs {
|
|
d, err := time.Parse("2006-01-02", o.Date)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid observation date %q: %w", o.Date, err)
|
|
}
|
|
rows[i] = []any{datasetID, o.KatoCode, d, o.Value, o.ValueText}
|
|
}
|
|
_, err := tx.CopyFrom(ctx,
|
|
pgx.Identifier{"dataset_observations"},
|
|
[]string{"dataset_id", "kato_code", "date", "value", "value_text"},
|
|
pgx.CopyFromRows(rows),
|
|
)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
}
|
|
|
|
return mapError(tx.Commit(ctx))
|
|
}
|
|
|
|
const observationColumns = `id, dataset_id, kato_code, to_char(date, 'YYYY-MM-DD') AS date, value, value_text`
|
|
|
|
func scanObservation(row pgx.Row) (domain.Observation, error) {
|
|
var o domain.Observation
|
|
err := row.Scan(&o.ID, &o.DatasetID, &o.KatoCode, &o.Date, &o.Value, &o.ValueText)
|
|
return o, err
|
|
}
|
|
|
|
// ListObservations returns a page of observations for a dataset, optionally
|
|
// filtered by KATO code, ordered by (kato_code, date).
|
|
func (r *DatasetRepository) ListObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string, limit, offset int) ([]domain.Observation, error) {
|
|
base := `SELECT ` + observationColumns + ` FROM dataset_observations WHERE dataset_id = $1`
|
|
|
|
var (
|
|
rows pgx.Rows
|
|
err error
|
|
)
|
|
if katoCode != nil {
|
|
rows, err = r.pool.Query(ctx,
|
|
base+` AND kato_code = $2 ORDER BY kato_code, date LIMIT $3 OFFSET $4`,
|
|
datasetID, *katoCode, limit, offset)
|
|
} else {
|
|
rows, err = r.pool.Query(ctx,
|
|
base+` ORDER BY kato_code, date LIMIT $2 OFFSET $3`,
|
|
datasetID, limit, offset)
|
|
}
|
|
if err != nil {
|
|
return nil, mapError(err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make([]domain.Observation, 0)
|
|
for rows.Next() {
|
|
o, err := scanObservation(rows)
|
|
if err != nil {
|
|
return nil, mapError(err)
|
|
}
|
|
out = append(out, o)
|
|
}
|
|
return out, mapError(rows.Err())
|
|
}
|
|
|
|
// CountObservations counts a dataset's observations, optionally filtered by KATO.
|
|
func (r *DatasetRepository) CountObservations(ctx context.Context, datasetID uuid.UUID, katoCode *string) (int, error) {
|
|
var n int
|
|
var err error
|
|
if katoCode != nil {
|
|
err = r.pool.QueryRow(ctx,
|
|
`SELECT count(*) FROM dataset_observations WHERE dataset_id = $1 AND kato_code = $2`,
|
|
datasetID, *katoCode).Scan(&n)
|
|
} else {
|
|
err = r.pool.QueryRow(ctx,
|
|
`SELECT count(*) FROM dataset_observations WHERE dataset_id = $1`, datasetID).Scan(&n)
|
|
}
|
|
return n, mapError(err)
|
|
}
|
|
|
|
// GetByID returns the dataset with the given id, or domain.ErrNotFound.
|
|
func (r *DatasetRepository) GetByID(ctx context.Context, id uuid.UUID) (domain.Dataset, error) {
|
|
row := r.pool.QueryRow(ctx,
|
|
`SELECT `+datasetColumns+` FROM datasets WHERE id = $1`, id)
|
|
out, err := scanDataset(row)
|
|
return out, mapError(err)
|
|
}
|
|
|
|
const datasetSummaryColumns = `id, category_id, code, name, description, unit, file_type, size_bytes, status, created_at, updated_at`
|
|
|
|
func scanDatasetSummary(row pgx.Row) (domain.DatasetSummary, error) {
|
|
var d domain.DatasetSummary
|
|
err := row.Scan(
|
|
&d.ID, &d.CategoryID, &d.Code, &d.Name, &d.Description, &d.Unit,
|
|
&d.FileType, &d.SizeBytes, &d.Status, &d.CreatedAt, &d.UpdatedAt,
|
|
)
|
|
return d, err
|
|
}
|
|
|
|
// ListSummaries returns a page of dataset summaries ordered by creation time
|
|
// (newest first). When categoryID is non-nil it filters to that category.
|
|
func (r *DatasetRepository) ListSummaries(ctx context.Context, categoryID *uuid.UUID, limit, offset int) ([]domain.DatasetSummary, error) {
|
|
base := `SELECT ` + datasetSummaryColumns + ` FROM datasets`
|
|
|
|
var (
|
|
rows pgx.Rows
|
|
err error
|
|
)
|
|
if categoryID != nil {
|
|
rows, err = r.pool.Query(ctx,
|
|
base+` WHERE category_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`,
|
|
*categoryID, limit, offset)
|
|
} else {
|
|
rows, err = r.pool.Query(ctx,
|
|
base+` ORDER BY created_at DESC LIMIT $1 OFFSET $2`, limit, offset)
|
|
}
|
|
if err != nil {
|
|
return nil, mapError(err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
summaries := make([]domain.DatasetSummary, 0)
|
|
for rows.Next() {
|
|
d, err := scanDatasetSummary(rows)
|
|
if err != nil {
|
|
return nil, mapError(err)
|
|
}
|
|
summaries = append(summaries, d)
|
|
}
|
|
return summaries, mapError(rows.Err())
|
|
}
|
|
|
|
// Count returns the number of datasets, optionally filtered to a category.
|
|
func (r *DatasetRepository) Count(ctx context.Context, categoryID *uuid.UUID) (int, error) {
|
|
var n int
|
|
var err error
|
|
if categoryID != nil {
|
|
err = r.pool.QueryRow(ctx, `SELECT count(*) FROM datasets WHERE category_id = $1`, *categoryID).Scan(&n)
|
|
} else {
|
|
err = r.pool.QueryRow(ctx, `SELECT count(*) FROM datasets`).Scan(&n)
|
|
}
|
|
return n, mapError(err)
|
|
}
|
|
|
|
// Delete removes a dataset. Returns domain.ErrNotFound if it does not exist.
|
|
func (r *DatasetRepository) Delete(ctx context.Context, id uuid.UUID) error {
|
|
tag, err := r.pool.Exec(ctx, `DELETE FROM datasets WHERE id = $1`, id)
|
|
if err != nil {
|
|
return mapError(err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|