Skip to main content

Developing a Data Transformer from Scratch

A data transformer is the crucial bridge between raw, often disparate, data collected by data collectors and the standardized, structured format required by the Open Data Hub Timeseries Writer API (BDP). This guide provides an in-depth look at developing a data transformer in Go, analyzing its critical components, SDK integration, and common patterns, using the provided example.

1. Understanding the Role of a Data Transformer

A data transformer acts as a processing unit within the data integration pipeline. Its primary responsibilities include:

  • Raw Data Consumption: Listening to a message queue for events indicating new raw data.
  • Raw Data Retrieval: Fetching the actual raw data from a raw data storage (if only an event notification was received).
  • Data Transformation: Converting the raw, source-specific data into the standardized data model of the Open Data Hub. This often involves:
    • Parsing: Deserializing the raw data (e.g., JSON, XML).
    • Enrichment: Adding derived information or linking to external datasets (e.g., geocoding, unit conversions).
    • Mapping: Translating source-specific identifiers or schemas to Open Data Hub standards.
    • Creating Hierarchies: Defining relationships between different data entities (e.g., a parking lot containing multiple parking zones).
  • Metadata Management: Ensuring that necessary metadata (provenance, data types, stations) is correctly synchronized with the Timeseries Writer API.
  • Timeseries Data Publication: Pushing the transformed data, typically time-series measurements, to the opendatahub-timeseries-writer.

The transformer is where the "intelligence" of data standardization resides, making raw data consumable for various applications.

data transformer flow

2. Core Principles of Data Transformer Development

Many principles from data collector development apply, with some specific nuances for transformers:

2.1. Idempotency is Key

Transformers must be idempotent. This means processing the same raw data event multiple times should produce the exact same result in the Timeseries Writer without creating duplicates or conflicting states. The Open Data Hub Timeseries Writer API aids this by silently discarding older duplicate records, but your transformation logic itself should avoid side effects from re-processing.

2.2. Robust Data Validation and Error Handling

Data from collectors can be malformed or incomplete. Your transformer needs to:

  • Validate Input: Check if the raw data conforms to expected schemas.
  • Selective Processing: If a single record within a batch fails, decide whether to skip only that record or the entire batch.
  • Dead-Letter Queues (DLQ): Failed messages should typically be moved to a DLQ for later inspection and manual intervention, preventing them from blocking the main queue. The SDK's consumer often handles this.
  • Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.

2.3. Data Model Consistency

Strictly follow the Open Data Hub data model for stations, data types, and measurements. This includes:

  • Unique Identifiers: Assign stable and unique IDs to stations and data types.
  • Metadata Richness: Populate metadata fields (e.g., names in multiple languages, specific attributes like capacity or operator details) to enhance data discoverability and usability.

2.4. Performance and Batching

Transformations can be CPU or I/O intensive. Consider:

  • Batch Processing: The Timeseries Writer API supports pushing data in batches. Leverage this to reduce network overhead.
  • Concurrency: Process multiple raw data events concurrently, carefully managing resources.
  • Efficient Lookups: If transformation requires static lookup data (like the stations.csv in our example), load and cache it efficiently (e.g., into an in-memory map) to avoid repeated file reads or database queries.

2.5. Observability

As with collectors, comprehensive logging, tracing, and metrics are crucial for monitoring the transformation process, identifying data quality issues, and debugging performance problems.

3. Anatomy of a Go Data Transformer

Let's break down the provided weather forecast transformer example.

3.1. Project Structure

The project structure will be very similar to a data collector, with the addition of static data files if needed for transformation logic.


.
├── calls.http # Example HTTP requests for testing BDP API directly
├── docker-compose.yml # Local development setup
├── documentation
│ └── ... # External specification document, if exists
├── infrastructure
│ ├── docker
│ │ └── Dockerfile # Containerization instructions
│ ├── docker-compose.build.yml # Build-specific Docker Compose
│ └── helm # Helm charts for Kubernetes deployment
│ └── ... # Environment-specific Helm value files
├── resources # Static data files required by the transformer
│ ├── stations.csv # CSV containing station metadata
│ └── stations.csv.license # License for the static data
├── src # Go source code
│ ├── dto.go # Data Transfer Objects (raw data schema)
│ ├── go.mod # Go modules file
│ ├── go.sum # Go modules checksums
│ ├── main.go # Main application logic
│ ├── main_test.go # Unit tests
│ └── station.go # Logic for handling static station data
└── testdata # Test input and expected output files
├── input # Example raw JSON payloads
│ ├── mybestparking.json
└── output # Expected BDP API calls (for testing)
├── mybestparking--out.json

3.2. Environment Variables and Configuration Management

Configuration is externalized via environment variables, ensuring flexibility across different environments.

// ... imports ...

var env tr.Env // Embeds common environment variables from the Transformer SDK
var station_proto Stations // Global variable to hold loaded station metadata

func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")

// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()

// 2. Initialize BDP client using environment variables for authentication and endpoint
b := bdplib.FromEnv()

// 3. Load static station metadata from CSV
station_proto = ReadStations("../resources/stations.csv")

// 4. Synchronize Data Types with BDP
SyncDataTypes(b)

slog.Info("listening")
// 5. Initialize the transformer listener.
// It will consume messages from the configured MQ_QUEUE.
listener := tr.NewTr[FacilityData](context.Background(), env)

// 6. Start the transformer, passing the `TransformWithBdp` function as the handler.
// This handler will be called for each message consumed from the queue.
err := listener.Start(context.Background(), TransformWithBdp(b))

ms.FailOnError(context.Background(), err, "error while listening to queue")
}

Critical Points:

  • tr.Env Embedding: Similar to dc.Env for collectors, tr.Env from the transformer SDK handles common environment variables related to message queuing for transformers (like MQ_QUEUE, MQ_KEY).
  • bdplib.FromEnv(): This function from the go-bdp-client simplifies the initialization of the BDP client. It automatically reads environment variables prefixed with BDP_ and ODH_ to configure the base URL, provenance details, and OAuth2 authentication (token URL, client ID/secret). This centralizes authentication logic.
  • Message Queue Configuration:
    • MQ_QUEUE: Specifies the queue this transformer will consume messages from. This queue should be populated by the data collector.
    • MQ_EXCHANGE / MQ_KEY: Define the RabbitMQ exchange and routing key if the message flow involves complex routing.
  • RAW_DATA_BRIDGE_ENDPOINT: This variable indicates a pattern where the raw data itself is not placed in the RabbitMQ message payload. Instead, the message might contain a pointer to the raw data, which the transformer then fetches via a "Raw Data Bridge" service.
  • BDP Provenance: BDP_PROVENANCE_VERSION, BDP_PROVENANCE_NAME, BDP_ORIGIN are crucial for data lineage within the Open Data Hub. Every piece of data pushed by this transformer will carry this provenance information.
  • OAuth2 Authentication: ODH_TOKEN_URL, ODH_CLIENT_ID, ODH_CLIENT_SECRET are used by bdplib.FromEnv() to configure the OAuth2 client credentials flow, ensuring authenticated access to the Timeseries Writer.

3.3. Authentication for the Timeseries Writer

The Timeseries Writer API uses OAuth2 for authentication. Transformers need to obtain an access token to make authenticated requests.

OAuth2 Client Credentials

The Open Data Hub uses Keycloak as its identity provider. Transformers authenticate using the "Client Credentials" flow, where they present a client_id and client_secret to obtain an access token.

We provide shared OAuth client credentials for development and testing purposes only:

  • Host: https://auth.opendatahub.testingmachine.eu/auth/
  • Realm: noi
  • Token Endpoint: https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token
  • Client ID: odh-mobility-datacollector-development
  • Client Secret: 7bd46f8f-c296-416d-a13d-dc81e68d0830

The odh-mobility-datacollector-development client is authorized to write data to the Timeseries Writer.

Important

The above credentials are strictly for testing and development environments. For production deployments and official data pipelines, you must request specific, dedicated OAuth client credentials from the Open Data Hub team. These production credentials will have appropriate permissions and security configurations.

When using the go-bdp-client SDK, authentication is largely transparent, simplifying development. You only need to configure the relevant environment variables:

  • ODH_TOKEN_URL: The URL of the OAuth2 token endpoint.
  • ODH_CLIENT_ID: The client ID for your transformer.
  • ODH_CLIENT_SECRET: The client secret for your transformer.

The SDK (specifically bdplib.FromEnv() and its underlying components) will automatically:

  1. Fetch an access token from the ODH_TOKEN_URL using the provided client credentials.
  2. Refresh the token automatically before it expires.
  3. Include the Authorization: Bearer <access_token> header in all requests made to the Timeseries Writer API.

This means your Go transformation code does not need to explicitly handle token acquisition or refreshing.

3.4. SDK Integration (opendatahub-go-sdk and go-bdp-client)

The transformer heavily relies on two SDKs: the opendatahub-go-sdk for message consumption and the go-bdp-client for interacting with the Timeseries Writer.

// ... imports ...

func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
b := bdplib.FromEnv() // Initialize BDP client

defer tel.FlushOnPanic() // Ensure telemetry is flushed on panic

// Data Type Synchronization (Principle: Metadata Management)
dataTypeList := bdplib.NewDataTypeList(nil)
err := dataTypeList.Load("datatypes.json") // Load static datatype definitions
ms.FailOnError(context.Background(), err, "could not load datatypes")

slog.Info("pushing datatypes on startup")
// Sync all defined data types to the BDP writer.
// This is typically done once on startup to ensure all required types exist.
b.SyncDataTypes(OriginStationType, dataTypeList.All())

slog.Info("listening")
// Initialize the transformer listener.
// It will consume messages from the configured MQ_QUEUE.
listener := tr.NewTr[Forecast](context.Background(), env)

// Start the transformer, passing the `TransformWithBdp` function as the handler.
// This handler will be called for each message consumed from the queue.
err = listener.Start(context.Background(), TransformWithBdp(b))

ms.FailOnError(context.Background(), err, "error while listening to queue")
}

// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}

Key SDK Integration Points:

  • ms.InitWithEnv: (Already discussed for collectors) Initializes common services like logging, messaging consumer, and telemetry based on tr.Env.
  • tel.FlushOnPanic(): Essential for ensuring traces are sent before a crash.
  • tr.NewTr[Forecast](ctx, env): Creates a new transformer listener instance. The [Forecast] type parameter specifies the expected Go struct that the raw data (received as rdb.Raw[Forecast]) will be unmarshaled into. The SDK handles unmarshalling the raw []byte into this Go struct for you.
  • listener.Start(ctx, TransformWithBdp(b)): This is the main loop of the transformer. It starts consuming messages from the configured queue. For each message, it calls the TransformWithBdp handler function.
  • TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast]: This is an adapter function. The tr.Handler interface expects a function with a specific signature (func(ctx context.Context, payload *rdb.Raw[T]) error). TransformWithBdp creates such a function, injecting the initialized bdplib.Bdp client so that the actual Transform logic can use it to push data.
  • rdb.Raw[Forecast]: The incoming payload from the message queue is unmarshaled by the SDK into this generic Raw struct, where Rawdata contains your specific Forecast struct.

3.5. Transform function

This is the core business logic of the transformer. It involves mapping the raw data into the Open Data Hub's data model and pushing it.

info

Each transformer implements its own logic, the below is only an example

// ... constants and helper structs ...

// Constants for Station Types and Data Types, enhancing readability and consistency.
const (
stationTypeParent = "ParkingFacility" // Parent station type
stationType = "ParkingStation" // Child station type

shortStay = "short_stay"
subscribers = "subscribers"

dataTypeFreeShort = "free_" + shortStay
dataTypeFreeSubs = "free_" + subscribers
dataTypeFreeTotal = "free"
dataTypeOccupiedShort = "occupied_" + shortStay
dataTypeOccupiedSubs = "occupied_" + subscribers
dataTypeOccupiedTotal = "occupied"
)

// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler, passing the BDP client.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[FacilityData] {
return func(ctx context.Context, payload *rdb.Raw[FacilityData]) error {
// Calls the core transformation function
return Transform(ctx, bdp, payload)
}
}

// Transform contains the main business logic for converting raw parking data
// into BDP stations and measurements.
func Transform(ctx context.Context, bdp bdplib.Bdp, payload *rdb.Raw[FacilityData]) error {
log := logger.Get(ctx) // Get a context-aware logger for better tracing

var parentStations []bdplib.Station // Slice to collect parent stations for batch sync
// Map to store child stations, using their BDP ID as key, to avoid duplicates
// and easily update metadata if a ParkNo appears multiple times within a payload.
stations := make(map[string]bdplib.Station)

dataMapParent := bdp.CreateDataMap() // Data map for parent station measurements
dataMap := bdp.CreateDataMap() // Data map for child station measurements

ts := payload.Timestamp.UnixMilli() // Get the timestamp for all measurements

// Iterate over each facility in the raw payload
for _, facility := range payload.Rawdata {
// 1. Parent Station Creation and Metadata Association
// Principle: Hierarchical Stations & Metadata Association
id := facility.GetID()
parent_station_data := station_proto.GetStationByID(strconv.Itoa(id)) // Lookup parent station metadata
if nil == parent_station_data {
log.Error("no parent station data", "facility_id", strconv.Itoa(id))
panic("no parent station data") // Fail if critical metadata is missing
}

parentStation := bdplib.CreateStation(
parent_station_data.ID, // BDP ID from CSV
parent_station_data.Name, // Name from CSV
stationTypeParent, // Defined parent station type
parent_station_data.Lat, // Lat from CSV
parent_station_data.Lon, // Lon from CSV
bdp.GetOrigin(), // Provenance origin
)
parentStation.MetaData = parent_station_data.ToMetadata() // Convert CSV metadata to BDP metadata

parentStations = append(parentStations, parentStation) // Add to batch for later sync

// Initialize aggregated sums for parent facility measurements
freeTotalSum := 0
occupiedTotalSum := 0
capacityTotal := 0
freeSubscribersSum := 0
occupiedSubscribersSum := 0
capacitySubscribers := 0
freeShortStaySum := 0
occupiedShortStaySum := 0
capacityShortStay := 0

// Iterate over detailed facility data (e.g., per parking zone/category)
for _, freePlace := range facility.FacilityDetails {
// 2. Child Station Creation/Update and Measurement Pushing
// Principle: Dynamic Station Creation & Measurement Categorization
facility_id := strconv.Itoa(facility.GetID()) + "_" + strconv.Itoa(freePlace.ParkNo) // Derived child station ID
station_data := station_proto.GetStationByID(facility_id) // Lookup child station metadata
if nil == station_data {
log.Error("no station data", "facility_id", facility_id)
panic("no station data")
}
// Check if station already processed in this batch (for multiple categories per ParkNo)
station, ok := stations[facility_id]

if !ok { // If not already created, create a new child station
station = bdplib.CreateStation(
station_data.ID, station_data.Name, stationType, station_data.Lat, station_data.Lon, bdp.GetOrigin())
station.ParentStation = parentStation.Id // Link to parent facility
station.MetaData = station_data.ToMetadata() // Assign metadata from CSV
stations[station_data.ID] = station // Store in map for batch sync
log.Debug("Create station " + station_data.ID)
}

// Add category-specific metadata and measurements for child stations
switch freePlace.CountingCategoryNo {
...
default: // Total (or default category)
station.MetaData["free_limit"] = freePlace.FreeLimit
station.MetaData["occupancy_limit"] = freePlace.OccupancyLimit
station.MetaData["capacity"] = freePlace.Capacity
dataMap.AddRecord(station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600))
dataMap.AddRecord(station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600))
freeTotalSum += freePlace.FreePlaces
occupiedTotalSum += freePlace.CurrentLevel
capacityTotal += freePlace.Capacity
}
}

// Assign aggregated total facility data to parent station's measurements and metadata
// Principle: Aggregation for Parent Station
if freeTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freeTotalSum, 600))
}
if occupiedTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, occupiedTotalSum, 600))
}
if capacityTotal > 0 {
parentStation.MetaData["capacity"] = capacityTotal
}
// ... similar logic for subscribers and short stay totals ...
}

// 3. Final Synchronization and Data Pushing
// Principle: Batch Synchronization & Data Pushing
// The `true, true` arguments to SyncStations mean:
// - `true` for `syncState`: Synchronize the station's active/inactive state based on its presence in the call.
// - `true` for `onlyActivate`: If `true`, only existing stations are activated; new stations are not created.
// (NOTE: In practice, you often want `false` for `onlyActivate` if new stations can appear dynamically.
// The example might imply pre-existing stations in `resources/stations.csv` or a specific activation logic.)
bdp.SyncStations(stationTypeParent, parentStations, true, true)
bdp.SyncStations(stationType, values(stations), true, true) // `values` converts map to slice for batch sync

bdp.PushData(stationTypeParent, dataMapParent) // Push parent station measurements
bdp.PushData(stationType, dataMap) // Push child station measurements

return nil // Transformation successful
}

// values is a generic helper to extract all values from a map into a slice.
func values[M ~map[K]V, K comparable, V any](m M) []V {
r := make([]V, 0, len(m))
for _, v := range m {
r = append(r, v)
}
return r
}

// SyncDataTypes defines and synchronizes all data types used by this transformer
// with the BDP Timeseries Writer. This is typically called once at startup.
func SyncDataTypes(bdp bdplib.Bdp) {
var dataTypes []bdplib.DataType
// Define all required data types with their names, units, record types (rtype), and descriptions.
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeShort, "", "Amount of free 'short stay' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeSubs, "", "Amount of free 'subscribed' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeTotal, "", "Amount of free parking slots", "Instantaneous"))
// ... similar for occupied data types ...

// Synchronize the defined data types with BDP under the specific stationType.
err := bdp.SyncDataTypes(stationType, dataTypes)
ms.FailOnError(context.Background(), err, "failed to sync types")
}

Critical Points & Patterns:

  • SyncDataTypes(b): This crucial function, called once on startup, defines all the possible data types (dataTypeFreeShort, dataTypeOccupiedTotal, etc.) that this transformer will ever publish. It registers them with the BDP API, ensuring the target system understands the metrics.
  • Hierarchical Station Model:
    • Parent Station (ParkingFacility): Represents the overall parking lot. Its metadata is retrieved from stations.csv using the facility.GetID().
    • Child Station (ParkingStation): Represents specific parking zones or categories within a facility. Its ID is dynamically constructed (facility.GetID() + "_" + strconv.Itoa(freePlace.ParkNo)).
    • station.ParentStation = parentStation.Id: This line establishes the crucial parent-child relationship in BDP.
  • Dynamic Station Creation/Update: Stations (both parent and child) are created or updated within the Transform function loop. The stations map is used to collect unique child stations for batch synchronization.
  • Metadata Enrichment: parentStation.MetaData = parent_station_data.ToMetadata() and station.MetaData = station_data.ToMetadata() show how to dynamically add detailed metadata from the static CSV to the BDP station objects.
  • Measurement Categorization and Aggregation: The switch freePlace.CountingCategoryNo statement demonstrates how incoming raw data can be categorized and how individual measurements are assigned to appropriate BDP data types (dataTypeFreeShort, dataTypeOccupiedTotal, etc.). It also shows how to aggregate child station data (e.g., freeTotalSum) to create parent station measurements.
  • bdplib.CreateRecord(ts, value, period): Creates a single measurement record. ts is the Unix milliseconds timestamp, value is the transformed measurement, and period defines the aggregation interval (e.g., 600 seconds for 10 minutes).
  • Batch Synchronization and Pushing:
    • bdp.SyncStations(stationTypeParent, parentStations, true, true) and bdp.SyncStations(stationType, values(stations), true, true): Both parent and child stations are synchronized in batches. The true, true arguments indicate syncState=true (update station state based on presence in this call) and onlyActivate=true (only activate existing stations, do not create new ones from this sync call; this implies that all valid station IDs are expected to be pre-registered or handled differently if new ones can appear).
    • bdp.PushData(stationTypeParent, dataMapParent) and bdp.PushData(stationType, dataMap): All collected measurements for each station type are pushed in single batch API calls, which is crucial for performance.
  • Error Handling: Within the Transform function, ms.FailOnError is used for critical errors like parsing the timestamp. Other errors (e.g., Location not found) are logged but might not stop the entire transformation, allowing partial success.

3.6. DTO (dto.go)

The dto.go file would define the Go structs that represent the schema of the raw input data (e.g., FacilityData and its nested structs like FacilityDetail). This is the type tr.NewTr[FacilityData] expects. (The actual content of dto.go was not provided, but its purpose is critical).

4. Containerization with Docker

The Dockerfile for a transformer is very similar to a collector's, with the critical addition of copying any static resource files.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

FROM golang:1.23.7-bookworm as base

FROM base as build-env
WORKDIR /app
COPY src/. . # Copy source code
COPY resources/. ./resources # <--- CRITICAL: Copy static resource files
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o main

# BUILD published image (minimal, production-ready)
FROM alpine:latest as build
WORKDIR /app
COPY --from=build-env /app/main . # Copy compiled binary
COPY --from=build-env /app/resources /resources # <--- CRITICAL: Copy resources to final image
ENTRYPOINT [ "./main"]

# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base as dev
WORKDIR /code
CMD ["go", "run", "main.go"]

5. Local Orchestration with Docker Compose

The docker-compose.yml for a transformer will be almost identical to a collector's, as it still needs a message queue (RabbitMQ) to consume from.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

services:
app: # Your data transformer service
depends_on:
rabbitmq:
condition: service_healthy
build:
dockerfile: infrastructure/docker/Dockerfile # Or wherever your Dockerfile is
context: .
target: dev
env_file:
- .env # Load environment variables, including MQ_URI, BDP_*, ODH_*
volumes:
- ./src:/code
- ./resources:/code/resources # Mount resources for development (optional if copied in Dockerfile)
- pkg:/go/pkg/mod
working_dir: /code

rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
volumes:
pkg:
warning

When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq service

docker compose up app

6. Deployment with Helm

The Helm chart values for a transformer will also mirror a collector's, primarily focused on image details and environment variable configuration.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

image:
repository: ghcr.io/noi-techpark/opendatahub-transformers/tr-parking-offstreet-skidata # Your transformer's image
pullPolicy: IfNotPresent
tag: "0.0.1" # Specific image tag for deployment

env: # Environment variables passed directly to the container
LOG_LEVEL: "INFO" # Example: Higher log level for production/test
MQ_QUEUE: s3-poller.parking-offstreet-skidata
MQ_EXCHANGE: routed
MQ_KEY: s3-poller.parking-offstreet-skidata
MQ_CLIENT: tr-parking-offstreet-skidata

RAW_DATA_BRIDGE_ENDPOINT: "http://raw-data-bridge-service.default.svc.cluster.local:2000/" # Kubernetes service URL

BDP_BASE_URL: https://share.opendatahub.testingmachine.eu # External URL for BDP
BDP_PROVENANCE_VERSION: 0.1.0
BDP_PROVENANCE_NAME: tr-parking-offstreet-skidata
BDP_ORIGIN: province-bolzano

SERVICE_NAME: tr-parking-offstreet-skidata # For observability
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317

envSecretRef: # Reference Kubernetes secrets for sensitive information
- name: MQ_URI
secret: rabbitmq-svcbind # Secret containing RabbitMQ connection URI
key: uri
- name: ODH_TOKEN_URL # OAuth token URL from secret
secret: odh-oauth-client-credentials
key: token_url
- name: ODH_CLIENT_ID # OAuth Client ID from secret
secret: odh-oauth-client-credentials
key: client_id
- name: ODH_CLIENT_SECRET # OAuth Client Secret from secret
secret: odh-oauth-client-credentials
key: client_secret

Critical Points (Unique to Transformer / OAuth):

  • OAuth Credentials in Secrets: For production, ODH_CLIENT_ID, ODH_CLIENT_SECRET, and ODH_TOKEN_URL should always be loaded from Kubernetes secrets using envSecretRef. This ensures sensitive authentication details are not committed to Git.

7. Testing Your Transformer

The provided main_test.go demonstrates effective testing strategies for transformers.

// SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
//
// SPDX-License-Identifier: AGPL-3.0-or-later

package main

import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/noi-techpark/go-bdp-client/bdplib"
"github.com/noi-techpark/go-bdp-client/bdpmock"
"github.com/noi-techpark/opendatahub-go-sdk/ingest/rdb"
"github.com/noi-techpark/opendatahub-go-sdk/testsuite" // Utility for deep comparison with files
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)

// NormalizeBdpMockCalls sorts all slices within the BdpMockCalls structure
// to ensure order-independent comparisons between expected and actual API calls.
func NormalizeBdpMockCalls(calls *bdpmock.BdpMockCalls) {
// For SyncedDataTypes: Sort inner slices of DataType, then sort the outer slice of DataType slices.
for key, dataTypesCalls := range calls.SyncedDataTypes {
for i := range dataTypesCalls {
sort.Slice(dataTypesCalls[i], func(a, b int) bool {
return fmt.Sprintf("%v", dataTypesCalls[i][a]) < fmt.Sprintf("%v", dataTypesCalls[i][b])
})
}
sort.Slice(dataTypesCalls, func(i, j int) bool {
return dataTypeSliceToString(dataTypesCalls[i]) < dataTypeSliceToString(dataTypesCalls[j])
})
calls.SyncedDataTypes[key] = dataTypesCalls
}

// For SyncedData (pushed measurements): Sort DataMaps (batches) by their Name.
for key, dataMaps := range calls.SyncedData {
sort.Slice(dataMaps, func(i, j int) bool {
return dataMaps[i].Name < dataMaps[j].Name
})
calls.SyncedData[key] = dataMaps
}

// For SyncedStations: First sort stations within each call, then sort the calls themselves.
for key, stationCalls := range calls.SyncedStations {
for i := range stationCalls {
sort.Slice(stationCalls[i].Stations, func(a, b int) bool {
return stationCalls[i].Stations[a].Id < stationCalls[i].Stations[b].Id
})
}
sort.Slice(stationCalls, func(i, j int) bool {
// Complex sorting for station calls, ensuring deterministic order.
var idI, idJ string
if len(stationCalls[i].Stations) > 0 { idI = stationCalls[i].Stations[0].Id }
if len(stationCalls[j].Stations) > 0 { idJ = stationCalls[j].Stations[0].Id }
if idI == idJ {
if stationCalls[i].SyncState == stationCalls[j].SyncState {
return !stationCalls[i].OnlyActivate && stationCalls[j].OnlyActivate
}
return !stationCalls[i].SyncState && stationCalls[j].SyncState
}
return idI < idJ
})
calls.SyncedStations[key] = stationCalls
}
}

// dataTypeSliceToString is a helper for sorting slices of bdplib.DataType.
func dataTypeSliceToString(slice []bdplib.DataType) string {
s := ""
for _, dt := range slice {
s += fmt.Sprintf("%v", dt)
}
return s
}

// TestMyBestParking tests the transformation logic for MyBestParking input (another source format).
func TestMyBestParking(t *testing.T) {
// Similar structure to TestSkidata, demonstrating testing multiple input formats.
var in = FacilityData{}
station_proto = ReadStations("../resources/stations.csv")
err := bdpmock.LoadInputData(&in, "../testdata/input/mybestparking.json")
require.Nil(t, err)

timestamp, err := time.Parse("2006-01-02", "2025-01-01")
require.Nil(t, err)

raw := rdb.Raw[FacilityData]{
Rawdata: in,
Timestamp: timestamp,
}

var out = bdpmock.BdpMockCalls{}
err = bdpmock.LoadOutput(&out, "../testdata/output/mybestparking--out.json")
require.Nil(t, err)

b := bdpmock.MockFromEnv()

err = Transform(context.TODO(), b, &raw)
require.Nil(t, err)

mock := b.(*bdpmock.BdpMock)

req := mock.Requests()
NormalizeBdpMockCalls(&req)
testsuite.DeepEqualFromFile(t, out, req)
}

// TestStations specifically tests the static station data loading and metadata conversion.
func TestStations(t *testing.T) {
stations := ReadStations("../resources/stations.csv")

// Test GetStationByID for non-existent and existent IDs
s := stations.GetStationByID("")
require.Nil(t, s)

s = stations.GetStationByID("406983") // An example ID from the CSV
require.NotNil(t, s)
assert.Equal(t, "105_facility", s.ID) // Check the BDP ID from CSV

m := s.ToMetadata() // Test metadata conversion
net := m["netex_parking"].(map[string]any) // Assert nested structure
assert.Equal(t, len(net), 7)
vtypes := (net["vehicletypes"]).(string)
assert.Equal(t, vtypes, "allPassengerVehicles")

s = stations.GetStationByID("608612") // Another example ID
require.NotNil(t, s)
assert.Equal(t, "608612", s.ID)

m = s.ToMetadata()
net2, ok := m["netex_parking"]
assert.Equal(t, false, ok) // Assert no netex_parking for this station
assert.Equal(t, nil, net2)
}

Critical Points:

  • bdpmock.BdpMock: This is the cornerstone of transformer unit testing. It fully mocks the bdplib.Bdp interface, allowing you to:
    • Simulate bdplib.FromEnv() with test configurations (bdpmock.MockFromEnv()).
    • Call your Transform function without making actual network requests.
    • Record all interactions with the mock (mock.Requests()).
  • Input/Output Files (testdata/input/*.json, testdata/output/*.json): This pattern is crucial for:
    • Deterministic Tests: Providing exact raw input payloads.
    • Expected Behavior: Defining the precise bdplib calls (data types, stations, measurements) that your transformer is expected to make. These output JSON files are typically generated by running the test once with the mock and then verifying the output manually or using a tool like testsuite.DeepEqualToFile.
  • NormalizeBdpMockCalls: This helper function is essential for robust testing when dealing with slices or maps where the order of elements might not be guaranteed by the transformation logic but doesn't affect the correctness of the BDP API calls. It sorts elements within the mock's recorded calls (SyncedDataTypes, SyncedData, SyncedStations) to ensure DeepEqual comparisons are order-independent.
  • testsuite.DeepEqualFromFile: This utility provides a convenient way to compare complex Go structs (like bdpmock.BdpMockCalls) against the content of a JSON file, making assertions concise and readable.
  • Separation of Concerns in Tests:
    • TestSkidata, TestMyBestParking: Focus on the end-to-end transformation from raw input to BDP calls.
    • TestStations: Dedicated to testing the static data loading and ToMetadata conversion logic in station.go, isolating that component.