Developing a Data Transformer from Scratch
A data transformer is the crucial bridge between raw, often disparate, data collected by data collectors and the standardized, structured format required by the Open Data Hub Timeseries Writer API (BDP). This guide provides an in-depth look at developing a data transformer in Go, analyzing its critical components, SDK integration, and common patterns, using the provided example.
1. Understanding the Role of a Data Transformer
A data transformer acts as a processing unit within the data integration pipeline. Its primary responsibilities include:
- Raw Data Consumption: Listening to a message queue for events indicating new raw data.
- Raw Data Retrieval: Fetching the actual raw data from a raw data storage (if only an event notification was received).
- Data Transformation: Converting the raw, source-specific data into the standardized data model of the Open Data Hub. This often involves:
- Parsing: Deserializing the raw data (e.g., JSON, XML).
- Enrichment: Adding derived information or linking to external datasets (e.g., geocoding, unit conversions).
- Mapping: Translating source-specific identifiers or schemas to Open Data Hub standards.
- Creating Hierarchies: Defining relationships between different data entities (e.g., a parking lot containing multiple parking zones).
- Metadata Management: Ensuring that necessary metadata (provenance, data types, stations) is correctly synchronized with the Timeseries Writer API.
- Timeseries Data Publication: Pushing the transformed data, typically time-series measurements, to the
opendatahub-timeseries-writer
.
The transformer is where the "intelligence" of data standardization resides, making raw data consumable for various applications.
2. Core Principles of Data Transformer Development
Many principles from data collector development apply, with some specific nuances for transformers:
2.1. Idempotency is Key
Transformers must be idempotent. This means processing the same raw data event multiple times should produce the exact same result in the Timeseries Writer without creating duplicates or conflicting states. The Open Data Hub Timeseries Writer API aids this by silently discarding older duplicate records, but your transformation logic itself should avoid side effects from re-processing.
2.2. Robust Data Validation and Error Handling
Data from collectors can be malformed or incomplete. Your transformer needs to:
- Validate Input: Check if the raw data conforms to expected schemas.
- Selective Processing: If a single record within a batch fails, decide whether to skip only that record or the entire batch.
- Dead-Letter Queues (DLQ): Failed messages should typically be moved to a DLQ for later inspection and manual intervention, preventing them from blocking the main queue. The SDK's consumer often handles this.
- Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.
2.3. Data Model Consistency
Strictly follow the Open Data Hub data model for stations, data types, and measurements. This includes:
- Unique Identifiers: Assign stable and unique IDs to stations and data types.
- Metadata Richness: Populate metadata fields (e.g., names in multiple languages, specific attributes like capacity or operator details) to enhance data discoverability and usability.
2.4. Performance and Batching
Transformations can be CPU or I/O intensive. Consider:
- Batch Processing: The Timeseries Writer API supports pushing data in batches. Leverage this to reduce network overhead.
- Concurrency: Process multiple raw data events concurrently, carefully managing resources.
- Efficient Lookups: If transformation requires static lookup data (like the
stations.csv
in our example), load and cache it efficiently (e.g., into an in-memory map) to avoid repeated file reads or database queries.
2.5. Observability
As with collectors, comprehensive logging, tracing, and metrics are crucial for monitoring the transformation process, identifying data quality issues, and debugging performance problems.
3. Anatomy of a Go Data Transformer
Let's break down the provided weather forecast transformer example.
3.1. Project Structure
The project structure will be very similar to a data collector, with the addition of static data files if needed for transformation logic.
.
├── calls.http # Example HTTP requests for testing BDP API directly
├── docker-compose.yml # Local development setup
├── documentation
│ └── ... # External specification document, if exists
├── infrastructure
│ ├── docker
│ │ └── Dockerfile # Containerization instructions
│ ├── docker-compose.build.yml # Build-specific Docker Compose
│ └── helm # Helm charts for Kubernetes deployment
│ └── ... # Environment-specific Helm value files
├── resources # Static data files required by the transformer
│ ├── stations.csv # CSV containing station metadata
│ └── stations.csv.license # License for the static data
├── src # Go source code
│ ├── dto.go # Data Transfer Objects (raw data schema)
│ ├── go.mod # Go modules file
│ ├── go.sum # Go modules checksums
│ ├── main.go # Main application logic
│ ├── main_test.go # Unit tests
│ └── station.go # Logic for handling static station data
└── testdata # Test input and expected output files
├── input # Example raw JSON payloads
│ ├── mybestparking.json
└── output # Expected BDP API calls (for testing)
├── mybestparking--out.json
3.2. Environment Variables and Configuration Management
Configuration is externalized via environment variables, ensuring flexibility across different environments.
- Go Code (main.go)
- .env file
// ... imports ...
var env tr.Env // Embeds common environment variables from the Transformer SDK
var station_proto Stations // Global variable to hold loaded station metadata
func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()
// 2. Initialize BDP client using environment variables for authentication and endpoint
b := bdplib.FromEnv()
// 3. Load static station metadata from CSV
station_proto = ReadStations("../resources/stations.csv")
// 4. Synchronize Data Types with BDP
SyncDataTypes(b)
slog.Info("listening")
// 5. Initialize the transformer listener.
// It will consume messages from the configured MQ_QUEUE.
listener := tr.NewTr[FacilityData](context.Background(), env)
// 6. Start the transformer, passing the `TransformWithBdp` function as the handler.
// This handler will be called for each message consumed from the queue.
err := listener.Start(context.Background(), TransformWithBdp(b))
ms.FailOnError(context.Background(), err, "error while listening to queue")
}
# Common SDK environment variables for a transformer
LOG_LEVEL="DEBUG"
MQ_QUEUE=s3-poller.parking-offstreet-skidata # Specific queue to consume from
MQ_EXCHANGE=routed
MQ_KEY=s3-poller.parking-offstreet-skidata # Routing key for messages
MQ_CLIENT=tr-parking-offstreet-skidata # Identify your transformer to RabbitMQ
MQ_URI=amqp://username:password@localhost:5672 # RabbitMQ connection URI
# Raw Data Bridge Endpoint (if used to fetch raw data after notification)
RAW_DATA_BRIDGE_ENDPOINT="http://localhost:2000/" # Example: URL for a service fetching raw data
# BDP Client specific environment variables (for connecting to Timeseries Writer)
BDP_BASE_URL=https://share.opendatahub.testingmachine.eu # Base URL of the Timeseries Writer API
BDP_PROVENANCE_VERSION=0.1.0 # Version of this transformer for provenance
BDP_PROVENANCE_NAME=tr-parking-offstreet-skidata # Name of this transformer for provenance
BDP_ORIGIN=province-bolzano # Origin of the data being transformed
# OAuth2 / Keycloak credentials for BDP authentication
ODH_TOKEN_URL=https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token
ODH_CLIENT_SECRET=ODH_CLIENT_ID=odh-mobility-datacollector-development # OAuth Client ID
ODH_CLIENT_SECRET=7bd46f8f-c296-416d-a13d-dc81e68d0830 # OAuth Client Secret
Critical Points:
tr.Env
Embedding: Similar todc.Env
for collectors,tr.Env
from the transformer SDK handles common environment variables related to message queuing for transformers (likeMQ_QUEUE
,MQ_KEY
).bdplib.FromEnv()
: This function from thego-bdp-client
simplifies the initialization of the BDP client. It automatically reads environment variables prefixed withBDP_
andODH_
to configure the base URL, provenance details, and OAuth2 authentication (token URL, client ID/secret). This centralizes authentication logic.- Message Queue Configuration:
MQ_QUEUE
: Specifies the queue this transformer will consume messages from. This queue should be populated by the data collector.MQ_EXCHANGE
/MQ_KEY
: Define the RabbitMQ exchange and routing key if the message flow involves complex routing.
RAW_DATA_BRIDGE_ENDPOINT
: This variable indicates a pattern where the raw data itself is not placed in the RabbitMQ message payload. Instead, the message might contain a pointer to the raw data, which the transformer then fetches via a "Raw Data Bridge" service.- BDP Provenance:
BDP_PROVENANCE_VERSION
,BDP_PROVENANCE_NAME
,BDP_ORIGIN
are crucial for data lineage within the Open Data Hub. Every piece of data pushed by this transformer will carry this provenance information. - OAuth2 Authentication:
ODH_TOKEN_URL
,ODH_CLIENT_ID
,ODH_CLIENT_SECRET
are used bybdplib.FromEnv()
to configure the OAuth2 client credentials flow, ensuring authenticated access to the Timeseries Writer.
3.3. Authentication for the Timeseries Writer
The Timeseries Writer API uses OAuth2 for authentication. Transformers need to obtain an access token to make authenticated requests.
OAuth2 Client Credentials
The Open Data Hub uses Keycloak as its identity provider. Transformers authenticate using the "Client Credentials" flow, where they present a client_id
and client_secret
to obtain an access token.
We provide shared OAuth client credentials for development and testing purposes only:
- Host:
https://auth.opendatahub.testingmachine.eu/auth/
- Realm:
noi
- Token Endpoint:
https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token
- Client ID:
odh-mobility-datacollector-development
- Client Secret:
7bd46f8f-c296-416d-a13d-dc81e68d0830
The odh-mobility-datacollector-development
client is authorized to write data to the Timeseries Writer.
The above credentials are strictly for testing and development environments. For production deployments and official data pipelines, you must request specific, dedicated OAuth client credentials from the Open Data Hub team. These production credentials will have appropriate permissions and security configurations.
- Using Go SDK
- Self-managed
When using the go-bdp-client
SDK, authentication is largely transparent, simplifying development. You only need to configure the relevant environment variables:
ODH_TOKEN_URL
: The URL of the OAuth2 token endpoint.ODH_CLIENT_ID
: The client ID for your transformer.ODH_CLIENT_SECRET
: The client secret for your transformer.
The SDK (specifically bdplib.FromEnv()
and its underlying components) will automatically:
- Fetch an access token from the
ODH_TOKEN_URL
using the provided client credentials. - Refresh the token automatically before it expires.
- Include the
Authorization: Bearer <access_token>
header in all requests made to the Timeseries Writer API.
This means your Go transformation code does not need to explicitly handle token acquisition or refreshing.
For a deeper understanding of the authentication flow or for manual testing/debugging, you can acquire an access token using a curl
command:
curl -X POST \
"[https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token](https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token)" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=odh-mobility-datacollector-development&client_secret=7bd46f8f-c296-416d-a13d-dc81e68d0830"
The response will be a JSON object containing the access_token
, expires_in
, and other details:
{
"access_token": "eyJhbGciOiJIUzI...",
"expires_in": 300,
"refresh_expires_in": 0,
"token_type": "bearer",
"not-before-policy": 0,
"session_state": "...",
"scope": "openid email profile"
}
Using the Access Token in API Calls
Once you have the access_token
(either manually or obtained by the SDK), you include it in the Authorization
header of your HTTP requests to the Timeseries Writer API.
For example, to access the /json/stations
endpoint:
curl -X GET "http://localhost:8999/json/stations" \
--header 'Content-Type: application/json' \
--header 'Authorization: bearer YOUR_ACCESS_TOKEN'
Replace YOUR_ACCESS_TOKEN
with the actual token value you obtained. This header ensures that your request is authenticated and authorized to interact with the Timeseries Writer.
3.4. SDK Integration (opendatahub-go-sdk
and go-bdp-client
)
The transformer heavily relies on two SDKs: the opendatahub-go-sdk
for message consumption and the go-bdp-client
for interacting with the Timeseries Writer.
- Go Code (main.go)
// ... imports ...
func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
b := bdplib.FromEnv() // Initialize BDP client
defer tel.FlushOnPanic() // Ensure telemetry is flushed on panic
// Data Type Synchronization (Principle: Metadata Management)
dataTypeList := bdplib.NewDataTypeList(nil)
err := dataTypeList.Load("datatypes.json") // Load static datatype definitions
ms.FailOnError(context.Background(), err, "could not load datatypes")
slog.Info("pushing datatypes on startup")
// Sync all defined data types to the BDP writer.
// This is typically done once on startup to ensure all required types exist.
b.SyncDataTypes(OriginStationType, dataTypeList.All())
slog.Info("listening")
// Initialize the transformer listener.
// It will consume messages from the configured MQ_QUEUE.
listener := tr.NewTr[Forecast](context.Background(), env)
// Start the transformer, passing the `TransformWithBdp` function as the handler.
// This handler will be called for each message consumed from the queue.
err = listener.Start(context.Background(), TransformWithBdp(b))
ms.FailOnError(context.Background(), err, "error while listening to queue")
}
// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}
Key SDK Integration Points:
ms.InitWithEnv
: (Already discussed for collectors) Initializes common services like logging, messaging consumer, and telemetry based ontr.Env
.tel.FlushOnPanic()
: Essential for ensuring traces are sent before a crash.tr.NewTr[Forecast](ctx, env)
: Creates a new transformer listener instance. The[Forecast]
type parameter specifies the expected Go struct that the raw data (received asrdb.Raw[Forecast]
) will be unmarshaled into. The SDK handles unmarshalling the raw[]byte
into this Go struct for you.listener.Start(ctx, TransformWithBdp(b))
: This is the main loop of the transformer. It starts consuming messages from the configured queue. For each message, it calls theTransformWithBdp
handler function.TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast]
: This is an adapter function. Thetr.Handler
interface expects a function with a specific signature (func(ctx context.Context, payload *rdb.Raw[T]) error
).TransformWithBdp
creates such a function, injecting the initializedbdplib.Bdp
client so that the actualTransform
logic can use it to push data.rdb.Raw[Forecast]
: The incoming payload from the message queue is unmarshaled by the SDK into this genericRaw
struct, whereRawdata
contains your specificForecast
struct.
3.5. Transform function
This is the core business logic of the transformer. It involves mapping the raw data into the Open Data Hub's data model and pushing it.
Each transformer implements its own logic, the below is only an example
- Go Code (main.go) - `Transform` Function
// ... constants and helper structs ...
// Constants for Station Types and Data Types, enhancing readability and consistency.
const (
stationTypeParent = "ParkingFacility" // Parent station type
stationType = "ParkingStation" // Child station type
shortStay = "short_stay"
subscribers = "subscribers"
dataTypeFreeShort = "free_" + shortStay
dataTypeFreeSubs = "free_" + subscribers
dataTypeFreeTotal = "free"
dataTypeOccupiedShort = "occupied_" + shortStay
dataTypeOccupiedSubs = "occupied_" + subscribers
dataTypeOccupiedTotal = "occupied"
)
// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler, passing the BDP client.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[FacilityData] {
return func(ctx context.Context, payload *rdb.Raw[FacilityData]) error {
// Calls the core transformation function
return Transform(ctx, bdp, payload)
}
}
// Transform contains the main business logic for converting raw parking data
// into BDP stations and measurements.
func Transform(ctx context.Context, bdp bdplib.Bdp, payload *rdb.Raw[FacilityData]) error {
log := logger.Get(ctx) // Get a context-aware logger for better tracing
var parentStations []bdplib.Station // Slice to collect parent stations for batch sync
// Map to store child stations, using their BDP ID as key, to avoid duplicates
// and easily update metadata if a ParkNo appears multiple times within a payload.
stations := make(map[string]bdplib.Station)
dataMapParent := bdp.CreateDataMap() // Data map for parent station measurements
dataMap := bdp.CreateDataMap() // Data map for child station measurements
ts := payload.Timestamp.UnixMilli() // Get the timestamp for all measurements
// Iterate over each facility in the raw payload
for _, facility := range payload.Rawdata {
// 1. Parent Station Creation and Metadata Association
// Principle: Hierarchical Stations & Metadata Association
id := facility.GetID()
parent_station_data := station_proto.GetStationByID(strconv.Itoa(id)) // Lookup parent station metadata
if nil == parent_station_data {
log.Error("no parent station data", "facility_id", strconv.Itoa(id))
panic("no parent station data") // Fail if critical metadata is missing
}
parentStation := bdplib.CreateStation(
parent_station_data.ID, // BDP ID from CSV
parent_station_data.Name, // Name from CSV
stationTypeParent, // Defined parent station type
parent_station_data.Lat, // Lat from CSV
parent_station_data.Lon, // Lon from CSV
bdp.GetOrigin(), // Provenance origin
)
parentStation.MetaData = parent_station_data.ToMetadata() // Convert CSV metadata to BDP metadata
parentStations = append(parentStations, parentStation) // Add to batch for later sync
// Initialize aggregated sums for parent facility measurements
freeTotalSum := 0
occupiedTotalSum := 0
capacityTotal := 0
freeSubscribersSum := 0
occupiedSubscribersSum := 0
capacitySubscribers := 0
freeShortStaySum := 0
occupiedShortStaySum := 0
capacityShortStay := 0
// Iterate over detailed facility data (e.g., per parking zone/category)
for _, freePlace := range facility.FacilityDetails {
// 2. Child Station Creation/Update and Measurement Pushing
// Principle: Dynamic Station Creation & Measurement Categorization
facility_id := strconv.Itoa(facility.GetID()) + "_" + strconv.Itoa(freePlace.ParkNo) // Derived child station ID
station_data := station_proto.GetStationByID(facility_id) // Lookup child station metadata
if nil == station_data {
log.Error("no station data", "facility_id", facility_id)
panic("no station data")
}
// Check if station already processed in this batch (for multiple categories per ParkNo)
station, ok := stations[facility_id]
if !ok { // If not already created, create a new child station
station = bdplib.CreateStation(
station_data.ID, station_data.Name, stationType, station_data.Lat, station_data.Lon, bdp.GetOrigin())
station.ParentStation = parentStation.Id // Link to parent facility
station.MetaData = station_data.ToMetadata() // Assign metadata from CSV
stations[station_data.ID] = station // Store in map for batch sync
log.Debug("Create station " + station_data.ID)
}
// Add category-specific metadata and measurements for child stations
switch freePlace.CountingCategoryNo {
...
default: // Total (or default category)
station.MetaData["free_limit"] = freePlace.FreeLimit
station.MetaData["occupancy_limit"] = freePlace.OccupancyLimit
station.MetaData["capacity"] = freePlace.Capacity
dataMap.AddRecord(station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600))
dataMap.AddRecord(station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600))
freeTotalSum += freePlace.FreePlaces
occupiedTotalSum += freePlace.CurrentLevel
capacityTotal += freePlace.Capacity
}
}
// Assign aggregated total facility data to parent station's measurements and metadata
// Principle: Aggregation for Parent Station
if freeTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freeTotalSum, 600))
}
if occupiedTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, occupiedTotalSum, 600))
}
if capacityTotal > 0 {
parentStation.MetaData["capacity"] = capacityTotal
}
// ... similar logic for subscribers and short stay totals ...
}
// 3. Final Synchronization and Data Pushing
// Principle: Batch Synchronization & Data Pushing
// The `true, true` arguments to SyncStations mean:
// - `true` for `syncState`: Synchronize the station's active/inactive state based on its presence in the call.
// - `true` for `onlyActivate`: If `true`, only existing stations are activated; new stations are not created.
// (NOTE: In practice, you often want `false` for `onlyActivate` if new stations can appear dynamically.
// The example might imply pre-existing stations in `resources/stations.csv` or a specific activation logic.)
bdp.SyncStations(stationTypeParent, parentStations, true, true)
bdp.SyncStations(stationType, values(stations), true, true) // `values` converts map to slice for batch sync
bdp.PushData(stationTypeParent, dataMapParent) // Push parent station measurements
bdp.PushData(stationType, dataMap) // Push child station measurements
return nil // Transformation successful
}
// values is a generic helper to extract all values from a map into a slice.
func values[M ~map[K]V, K comparable, V any](m M) []V {
r := make([]V, 0, len(m))
for _, v := range m {
r = append(r, v)
}
return r
}
// SyncDataTypes defines and synchronizes all data types used by this transformer
// with the BDP Timeseries Writer. This is typically called once at startup.
func SyncDataTypes(bdp bdplib.Bdp) {
var dataTypes []bdplib.DataType
// Define all required data types with their names, units, record types (rtype), and descriptions.
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeShort, "", "Amount of free 'short stay' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeSubs, "", "Amount of free 'subscribed' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeTotal, "", "Amount of free parking slots", "Instantaneous"))
// ... similar for occupied data types ...
// Synchronize the defined data types with BDP under the specific stationType.
err := bdp.SyncDataTypes(stationType, dataTypes)
ms.FailOnError(context.Background(), err, "failed to sync types")
}
Critical Points & Patterns:
SyncDataTypes(b)
: This crucial function, called once on startup, defines all the possible data types (dataTypeFreeShort
,dataTypeOccupiedTotal
, etc.) that this transformer will ever publish. It registers them with the BDP API, ensuring the target system understands the metrics.- Hierarchical Station Model:
- Parent Station (
ParkingFacility
): Represents the overall parking lot. Its metadata is retrieved fromstations.csv
using thefacility.GetID()
. - Child Station (
ParkingStation
): Represents specific parking zones or categories within a facility. Its ID is dynamically constructed (facility.GetID() + "_" + strconv.Itoa(freePlace.ParkNo)
). station.ParentStation = parentStation.Id
: This line establishes the crucial parent-child relationship in BDP.
- Parent Station (
- Dynamic Station Creation/Update: Stations (both parent and child) are created or updated within the
Transform
function loop. Thestations
map is used to collect unique child stations for batch synchronization. - Metadata Enrichment:
parentStation.MetaData = parent_station_data.ToMetadata()
andstation.MetaData = station_data.ToMetadata()
show how to dynamically add detailed metadata from the static CSV to the BDP station objects. - Measurement Categorization and Aggregation: The
switch freePlace.CountingCategoryNo
statement demonstrates how incoming raw data can be categorized and how individual measurements are assigned to appropriate BDP data types (dataTypeFreeShort
,dataTypeOccupiedTotal
, etc.). It also shows how to aggregate child station data (e.g.,freeTotalSum
) to create parent station measurements. bdplib.CreateRecord(ts, value, period)
: Creates a single measurement record.ts
is the Unix milliseconds timestamp,value
is the transformed measurement, andperiod
defines the aggregation interval (e.g., 600 seconds for 10 minutes).- Batch Synchronization and Pushing:
bdp.SyncStations(stationTypeParent, parentStations, true, true)
andbdp.SyncStations(stationType, values(stations), true, true)
: Both parent and child stations are synchronized in batches. Thetrue, true
arguments indicatesyncState=true
(update station state based on presence in this call) andonlyActivate=true
(only activate existing stations, do not create new ones from this sync call; this implies that all valid station IDs are expected to be pre-registered or handled differently if new ones can appear).bdp.PushData(stationTypeParent, dataMapParent)
andbdp.PushData(stationType, dataMap)
: All collected measurements for each station type are pushed in single batch API calls, which is crucial for performance.
- Error Handling: Within the
Transform
function,ms.FailOnError
is used for critical errors like parsing the timestamp. Other errors (e.g.,Location not found
) are logged but might not stop the entire transformation, allowing partial success.
3.6. DTO (dto.go
)
The dto.go
file would define the Go structs that represent the schema of the raw input data (e.g., FacilityData
and its nested structs like FacilityDetail
). This is the type tr.NewTr[FacilityData]
expects. (The actual content of dto.go
was not provided, but its purpose is critical).
4. Containerization with Docker
The Dockerfile for a transformer is very similar to a collector's, with the critical addition of copying any static resource files.
- Dockerfile
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
FROM golang:1.23.7-bookworm as base
FROM base as build-env
WORKDIR /app
COPY src/. . # Copy source code
COPY resources/. ./resources # <--- CRITICAL: Copy static resource files
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o main
# BUILD published image (minimal, production-ready)
FROM alpine:latest as build
WORKDIR /app
COPY --from=build-env /app/main . # Copy compiled binary
COPY --from=build-env /app/resources /resources # <--- CRITICAL: Copy resources to final image
ENTRYPOINT [ "./main"]
# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base as dev
WORKDIR /code
CMD ["go", "run", "main.go"]
5. Local Orchestration with Docker Compose
The docker-compose.yml
for a transformer will be almost identical to a collector's, as it still needs a message queue (RabbitMQ) to consume from.
- docker-compose.yml (Conceptual)
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
services:
app: # Your data transformer service
depends_on:
rabbitmq:
condition: service_healthy
build:
dockerfile: infrastructure/docker/Dockerfile # Or wherever your Dockerfile is
context: .
target: dev
env_file:
- .env # Load environment variables, including MQ_URI, BDP_*, ODH_*
volumes:
- ./src:/code
- ./resources:/code/resources # Mount resources for development (optional if copied in Dockerfile)
- pkg:/go/pkg/mod
working_dir: /code
rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
volumes:
pkg:
When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq
service
docker compose up app
6. Deployment with Helm
The Helm chart values for a transformer will also mirror a collector's, primarily focused on image details and environment variable configuration.
- Helm Chart Values (Conceptual `your-transformer.test.yaml`)
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
image:
repository: ghcr.io/noi-techpark/opendatahub-transformers/tr-parking-offstreet-skidata # Your transformer's image
pullPolicy: IfNotPresent
tag: "0.0.1" # Specific image tag for deployment
env: # Environment variables passed directly to the container
LOG_LEVEL: "INFO" # Example: Higher log level for production/test
MQ_QUEUE: s3-poller.parking-offstreet-skidata
MQ_EXCHANGE: routed
MQ_KEY: s3-poller.parking-offstreet-skidata
MQ_CLIENT: tr-parking-offstreet-skidata
RAW_DATA_BRIDGE_ENDPOINT: "http://raw-data-bridge-service.default.svc.cluster.local:2000/" # Kubernetes service URL
BDP_BASE_URL: https://share.opendatahub.testingmachine.eu # External URL for BDP
BDP_PROVENANCE_VERSION: 0.1.0
BDP_PROVENANCE_NAME: tr-parking-offstreet-skidata
BDP_ORIGIN: province-bolzano
SERVICE_NAME: tr-parking-offstreet-skidata # For observability
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317
envSecretRef: # Reference Kubernetes secrets for sensitive information
- name: MQ_URI
secret: rabbitmq-svcbind # Secret containing RabbitMQ connection URI
key: uri
- name: ODH_TOKEN_URL # OAuth token URL from secret
secret: odh-oauth-client-credentials
key: token_url
- name: ODH_CLIENT_ID # OAuth Client ID from secret
secret: odh-oauth-client-credentials
key: client_id
- name: ODH_CLIENT_SECRET # OAuth Client Secret from secret
secret: odh-oauth-client-credentials
key: client_secret
Critical Points (Unique to Transformer / OAuth):
- OAuth Credentials in Secrets: For production,
ODH_CLIENT_ID
,ODH_CLIENT_SECRET
, andODH_TOKEN_URL
should always be loaded from Kubernetes secrets usingenvSecretRef
. This ensures sensitive authentication details are not committed to Git.
7. Testing Your Transformer
The provided main_test.go
demonstrates effective testing strategies for transformers.
- Go Code (main_test.go)
// SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/noi-techpark/go-bdp-client/bdplib"
"github.com/noi-techpark/go-bdp-client/bdpmock"
"github.com/noi-techpark/opendatahub-go-sdk/ingest/rdb"
"github.com/noi-techpark/opendatahub-go-sdk/testsuite" // Utility for deep comparison with files
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)
// NormalizeBdpMockCalls sorts all slices within the BdpMockCalls structure
// to ensure order-independent comparisons between expected and actual API calls.
func NormalizeBdpMockCalls(calls *bdpmock.BdpMockCalls) {
// For SyncedDataTypes: Sort inner slices of DataType, then sort the outer slice of DataType slices.
for key, dataTypesCalls := range calls.SyncedDataTypes {
for i := range dataTypesCalls {
sort.Slice(dataTypesCalls[i], func(a, b int) bool {
return fmt.Sprintf("%v", dataTypesCalls[i][a]) < fmt.Sprintf("%v", dataTypesCalls[i][b])
})
}
sort.Slice(dataTypesCalls, func(i, j int) bool {
return dataTypeSliceToString(dataTypesCalls[i]) < dataTypeSliceToString(dataTypesCalls[j])
})
calls.SyncedDataTypes[key] = dataTypesCalls
}
// For SyncedData (pushed measurements): Sort DataMaps (batches) by their Name.
for key, dataMaps := range calls.SyncedData {
sort.Slice(dataMaps, func(i, j int) bool {
return dataMaps[i].Name < dataMaps[j].Name
})
calls.SyncedData[key] = dataMaps
}
// For SyncedStations: First sort stations within each call, then sort the calls themselves.
for key, stationCalls := range calls.SyncedStations {
for i := range stationCalls {
sort.Slice(stationCalls[i].Stations, func(a, b int) bool {
return stationCalls[i].Stations[a].Id < stationCalls[i].Stations[b].Id
})
}
sort.Slice(stationCalls, func(i, j int) bool {
// Complex sorting for station calls, ensuring deterministic order.
var idI, idJ string
if len(stationCalls[i].Stations) > 0 { idI = stationCalls[i].Stations[0].Id }
if len(stationCalls[j].Stations) > 0 { idJ = stationCalls[j].Stations[0].Id }
if idI == idJ {
if stationCalls[i].SyncState == stationCalls[j].SyncState {
return !stationCalls[i].OnlyActivate && stationCalls[j].OnlyActivate
}
return !stationCalls[i].SyncState && stationCalls[j].SyncState
}
return idI < idJ
})
calls.SyncedStations[key] = stationCalls
}
}
// dataTypeSliceToString is a helper for sorting slices of bdplib.DataType.
func dataTypeSliceToString(slice []bdplib.DataType) string {
s := ""
for _, dt := range slice {
s += fmt.Sprintf("%v", dt)
}
return s
}
// TestMyBestParking tests the transformation logic for MyBestParking input (another source format).
func TestMyBestParking(t *testing.T) {
// Similar structure to TestSkidata, demonstrating testing multiple input formats.
var in = FacilityData{}
station_proto = ReadStations("../resources/stations.csv")
err := bdpmock.LoadInputData(&in, "../testdata/input/mybestparking.json")
require.Nil(t, err)
timestamp, err := time.Parse("2006-01-02", "2025-01-01")
require.Nil(t, err)
raw := rdb.Raw[FacilityData]{
Rawdata: in,
Timestamp: timestamp,
}
var out = bdpmock.BdpMockCalls{}
err = bdpmock.LoadOutput(&out, "../testdata/output/mybestparking--out.json")
require.Nil(t, err)
b := bdpmock.MockFromEnv()
err = Transform(context.TODO(), b, &raw)
require.Nil(t, err)
mock := b.(*bdpmock.BdpMock)
req := mock.Requests()
NormalizeBdpMockCalls(&req)
testsuite.DeepEqualFromFile(t, out, req)
}
// TestStations specifically tests the static station data loading and metadata conversion.
func TestStations(t *testing.T) {
stations := ReadStations("../resources/stations.csv")
// Test GetStationByID for non-existent and existent IDs
s := stations.GetStationByID("")
require.Nil(t, s)
s = stations.GetStationByID("406983") // An example ID from the CSV
require.NotNil(t, s)
assert.Equal(t, "105_facility", s.ID) // Check the BDP ID from CSV
m := s.ToMetadata() // Test metadata conversion
net := m["netex_parking"].(map[string]any) // Assert nested structure
assert.Equal(t, len(net), 7)
vtypes := (net["vehicletypes"]).(string)
assert.Equal(t, vtypes, "allPassengerVehicles")
s = stations.GetStationByID("608612") // Another example ID
require.NotNil(t, s)
assert.Equal(t, "608612", s.ID)
m = s.ToMetadata()
net2, ok := m["netex_parking"]
assert.Equal(t, false, ok) // Assert no netex_parking for this station
assert.Equal(t, nil, net2)
}
Critical Points:
bdpmock.BdpMock
: This is the cornerstone of transformer unit testing. It fully mocks thebdplib.Bdp
interface, allowing you to:- Simulate
bdplib.FromEnv()
with test configurations (bdpmock.MockFromEnv()
). - Call your
Transform
function without making actual network requests. - Record all interactions with the mock (
mock.Requests()
).
- Simulate
- Input/Output Files (
testdata/input/*.json
,testdata/output/*.json
): This pattern is crucial for:- Deterministic Tests: Providing exact raw input payloads.
- Expected Behavior: Defining the precise
bdplib
calls (data types, stations, measurements) that your transformer is expected to make. Theseoutput
JSON files are typically generated by running the test once with the mock and then verifying the output manually or using a tool liketestsuite.DeepEqualToFile
.
NormalizeBdpMockCalls
: This helper function is essential for robust testing when dealing with slices or maps where the order of elements might not be guaranteed by the transformation logic but doesn't affect the correctness of the BDP API calls. It sorts elements within the mock's recorded calls (SyncedDataTypes
,SyncedData
,SyncedStations
) to ensureDeepEqual
comparisons are order-independent.testsuite.DeepEqualFromFile
: This utility provides a convenient way to compare complex Go structs (likebdpmock.BdpMockCalls
) against the content of a JSON file, making assertions concise and readable.- Separation of Concerns in Tests:
TestSkidata
,TestMyBestParking
: Focus on the end-to-end transformation from raw input to BDP calls.TestStations
: Dedicated to testing the static data loading andToMetadata
conversion logic instation.go
, isolating that component.