Developing a Data Transformer from Scratch
A data transformer is the crucial bridge between raw, often disparate, data collected by data collectors and the standardized, structured format required by the Open Data Hub Timeseries Writer API (BDP). This guide provides an in-depth look at developing a data transformer in Go, analyzing its critical components, SDK integration, and common patterns, using the provided example.
1. Understanding the Role of a Data Transformer
A data transformer acts as a processing unit within the data integration pipeline. Its primary responsibilities include:
- Raw Data Consumption: Listening to a message queue for events indicating new raw data.
- Raw Data Retrieval: Fetching the actual raw data from a raw data storage (if only an event notification was received).
- Data Transformation: Converting the raw, source-specific data into the standardized data model of the Open Data Hub. This often involves:
- Parsing: Deserializing the raw data (e.g., JSON, XML).
- Enrichment: Adding derived information or linking to external datasets (e.g., geocoding, unit conversions).
- Mapping: Translating source-specific identifiers or schemas to Open Data Hub standards.
- Creating Hierarchies: Defining relationships between different data entities (e.g., a parking lot containing multiple parking zones).
- Metadata Management: Ensuring that necessary metadata (provenance, data types, stations) is correctly synchronized with the Timeseries Writer API.
- Timeseries Data Publication: Pushing the transformed data, typically time-series measurements, to the
opendatahub-timeseries-writer.
The transformer is where the "intelligence" of data standardization resides, making raw data consumable for various applications.
2. Core Principles of Data Transformer Development
Many principles from data collector development apply, with some specific nuances for transformers:
2.1. Idempotency is Key
Transformers must be idempotent. This means processing the same raw data event multiple times should produce the exact same result in the Timeseries Writer without creating duplicates or conflicting states. The Open Data Hub Timeseries Writer API aids this by silently discarding older duplicate records, but your transformation logic itself should avoid side effects from re-processing.
2.2. Robust Data Validation and Error Handling
Data from collectors can be malformed or incomplete. Your transformer needs to:
- Validate Input: Check if the raw data conforms to expected schemas.
- Selective Processing: If a single record within a batch fails, decide whether to skip only that record or the entire batch.
- Dead-Letter Queues (DLQ): Failed messages should typically be moved to a DLQ for later inspection and manual intervention, preventing them from blocking the main queue. The SDK's consumer often handles this.
- Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.
2.3. Data Model Consistency
Strictly follow the Open Data Hub data model for stations, data types, and measurements. This includes:
- Unique Identifiers: Assign stable and unique IDs to stations and data types.
- Metadata Richness: Populate metadata fields (e.g., names in multiple languages, specific attributes like capacity or operator details) to enhance data discoverability and usability.
2.4. Performance and Batching
Transformations can be CPU or I/O intensive. Consider:
- Batch Processing: The Timeseries Writer API supports pushing data in batches. Leverage this to reduce network overhead.
- Concurrency: Process multiple raw data events concurrently, carefully managing resources.
- Efficient Lookups: If transformation requires static lookup data (like the
stations.csvin our example), load and cache it efficiently (e.g., into an in-memory map) to avoid repeated file reads or database queries.
2.5. Observability
As with collectors, comprehensive logging, tracing, and metrics are crucial for monitoring the transformation process, identifying data quality issues, and debugging performance problems.
3. Anatomy of a Go Data Transformer
Let's break down the provided weather forecast transformer example.
3.1. Project Structure
The project structure will be very similar to a data collector, with the addition of static data files if needed for transformation logic.
.
├── calls.http # Example HTTP requests for testing BDP API directly
├── docker-compose.yml # Local development setup
├── documentation
│ └── ... # External specification document, if exists
├── infrastructure
│ ├── docker
│ │ └── Dockerfile # Containerization instructions
│ ├── docker-compose.build.yml # Build-specific Docker Compose
│ └── helm # Helm charts for Kubernetes deployment
│ └── ... # Environment-specific Helm value files
├── resources # Static data files required by the transformer
│ ├── stations.csv # CSV containing station metadata
│ └── stations.csv.license # License for the static data
├── src # Go source code
│ ├── dto.go # Data Transfer Objects (raw data schema)
│ ├── go.mod # Go modules file
│ ├── go.sum # Go modules checksums
│ ├── main.go # Main application logic
│ ├── main_test.go # Unit tests
│ └── station.go # Logic for handling static station data
└── testdata # Test input and expected output files
├── input # Example raw JSON payloads
│ ├── mybestparking.json
└── output # Expected BDP API calls (for testing)
├── mybestparking--out.json
3.2. Environment Variables and Configuration Management
By design we require all configuration to be done via env variables.
This simplifies the develop cycle and deployment allowing us to use .env files for testing and Helm values.yaml for deployment.
3.3. Authentication for the Timeseries Writer
The Timeseries Writer API uses OAuth2 for authentication. Transformers need to obtain an access token to make authenticated requests.
OAuth2 Client Credentials
The Open Data Hub uses Keycloak as its identity provider. Transformers authenticate using the "Client Credentials" flow, where they present a client_id and client_secret to obtain an access token.
We provide shared OAuth client credentials for development and testing purposes only:
- Host:
https://auth.opendatahub.testingmachine.eu/auth/ - Realm:
noi - Token Endpoint:
https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token - Client ID:
odh-mobility-datacollector-development - Client Secret:
7bd46f8f-c296-416d-a13d-dc81e68d0830
The odh-mobility-datacollector-development client is authorized to write data to the Timeseries Writer.
The above credentials are strictly for testing and development environments. For production deployments and official data pipelines, you must request specific, dedicated OAuth client credentials from the Open Data Hub team. These production credentials will have appropriate permissions and security configurations.
- Using Go SDK
- Self-managed
When using the go-bdp-client SDK, authentication is largely transparent, simplifying development. You only need to configure the relevant environment variables:
ODH_TOKEN_URL: The URL of the OAuth2 token endpoint.ODH_CLIENT_ID: The client ID for your transformer.ODH_CLIENT_SECRET: The client secret for your transformer.
The SDK (specifically bdplib.FromEnv() and its underlying components) will automatically:
- Fetch an access token from the
ODH_TOKEN_URLusing the provided client credentials. - Refresh the token automatically before it expires.
- Include the
Authorization: Bearer <access_token>header in all requests made to the Timeseries Writer API.
This means your Go transformation code does not need to explicitly handle token acquisition or refreshing.
For a deeper understanding of the authentication flow or for manual testing/debugging, you can acquire an access token using a curl command:
curl -X POST \
"[https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token](https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token)" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=odh-mobility-datacollector-development&client_secret=7bd46f8f-c296-416d-a13d-dc81e68d0830"
The response will be a JSON object containing the access_token, expires_in, and other details:
{
"access_token": "eyJhbGciOiJIUzI...",
"expires_in": 300,
"refresh_expires_in": 0,
"token_type": "bearer",
"not-before-policy": 0,
"session_state": "...",
"scope": "openid email profile"
}
Using the Access Token in API Calls
Once you have the access_token (either manually or obtained by the SDK), you include it in the Authorization header of your HTTP requests to the Timeseries Writer API.
For example, to access the /json/stations endpoint:
curl -X GET "http://localhost:8999/json/stations" \
--header 'Content-Type: application/json' \
--header 'Authorization: bearer YOUR_ACCESS_TOKEN'
Replace YOUR_ACCESS_TOKEN with the actual token value you obtained. This header ensures that your request is authenticated and authorized to interact with the Timeseries Writer.
3.4. SDK Integration (opendatahub-go-sdk and go-bdp-client)
The transformer heavily relies on two SDKs: the opendatahub-go-sdk for message consumption and the go-bdp-client for interacting with the Timeseries Writer.
- Go Code (main.go)
// ... imports ...
func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
b := bdplib.FromEnv() // Initialize BDP client
defer tel.FlushOnPanic() // Ensure telemetry is flushed on panic
// Data Type Synchronization (Principle: Metadata Management)
dataTypeList := bdplib.NewDataTypeList(nil)
err := dataTypeList.Load("datatypes.json") // Load static datatype definitions
ms.FailOnError(context.Background(), err, "could not load datatypes")
slog.Info("pushing datatypes on startup")
// Sync all defined data types to the BDP writer.
// This is typically done once on startup to ensure all required types exist.
b.SyncDataTypes(OriginStationType, dataTypeList.All())
slog.Info("listening")
// Initialize the transformer listener.
// It will consume messages from the configured MQ_QUEUE.
listener := tr.NewTr[Forecast](context.Background(), env)
// Start the transformer, passing the `TransformWithBdp` function as the handler.
// This handler will be called for each message consumed from the queue.
err = listener.Start(context.Background(), TransformWithBdp(b))
ms.FailOnError(context.Background(), err, "error while listening to queue")
}
// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}
Key SDK Integration Points:
ms.InitWithEnv: (Already discussed for collectors) Initializes common services like logging, messaging consumer, and telemetry based ontr.Env.bdplib.FromEnv(): This function from thego-bdp-clientsimplifies the initialization of the BDP client. It automatically reads environment variables prefixed withBDP_andODH_to configure the base URL, provenance details, and OAuth2 authentication (token URL, client ID/secret). This centralizes authentication logic.tel.FlushOnPanic(): Essential for ensuring traces are sent before a crash.tr.NewTr[Forecast](ctx, env): Creates a new transformer listener instance. The[Forecast]type parameter specifies the expected Go struct that the raw data (received asrdb.Raw[Forecast]) will be unmarshaled into. The SDK handles unmarshalling the raw[]byteinto this Go struct for you.listener.Start(ctx, TransformWithBdp(b)): This is the main loop of the transformer. It starts consuming messages from the configured queue. For each message, it calls theTransformWithBdphandler function.TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast]: This is an adapter function. Thetr.Handlerinterface expects a function with a specific signature (func(ctx context.Context, payload *rdb.Raw[T]) error).TransformWithBdpcreates such a function, injecting the initializedbdplib.Bdpclient so that the actualTransformlogic can use it to push data.rdb.Raw[Forecast]: The incoming payload from the message queue is unmarshaled by the SDK into this genericRawstruct, whereRawdatacontains your specificForecaststruct.
When constructing the Tr, the type (in this case Forecast) passed to the parametrized constructor is used to deserialize the RawData directly.
listener := tr.NewTr[Forecast](context.Background(), env)
In the above example, the SDK tries to deserialize the data retrieved from the RawDataTable into Forecast objects.
If the data is stored as serialized string in the RawDataTable, use-case very common in our existing collectors, we need to create the Tr passing string as type to properly retrieve the data without incurring in an error.
The SDK provides a middleware function which take care in deserialize the string and changing the handler signatire:
listener := tr.NewTr[string](context.Background(), env)
err := listener.Start(context.Background(),
tr.RawString2JsonMiddleware[Forecast](TransformWithBdp(b))
)
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}
SDK Env Variables
When using the official SDK, there are some special env variables used to configure various aspects of the transformer
| Variable | Description | Allowed Values | Default |
|---|---|---|---|
PROVIDER | String identifying the data provider | [path1]/[path2]/... | (No default set) |
MQ_URI | Connection URI to connect to RabbitMQ | - | (No default set) |
MQ_CLIENT | Client name to identify the connection in RabbitMQ | - | (No default set) |
MQ_EXCHANGE | The exchange notifiyng the new data | - | routed |
MQ_QUEUE | RabbitMQ queue this transfromer pulls messages from | - | (No default set) |
MQ_KEY | Routing key used to route message from MQ_EXCHANGE to MQ_QUEUE | path1.path2... # | (No default set) |
RAW_DATA_BRIDGE_ENDPOINT | Endpoint ot the Raw Data Bridge, which retrieves Raw Data | - | (No default set) |
LOG_LEVEL | Sets the logging severity level. | DEBUG, INFO, WARN, ERROR | INFO |
SERVICE_NAME | The name of the service. | - | gotel |
SERVICE_VERSION | The version of the service. | - | 0.0.1 |
TELEMETRY_ENABLED | Enables or disables telemetry. | true, false | true |
TELEMETRY_TRACE_ENABLED | Enables or disables trace telemetry. | true, false | true |
TELEMETRY_TRACE_BATCH_SIZE | The maximum number of traces to be batched. | - | 10 |
TELEMETRY_TRACE_BATCH_TIMEOUT_SEC | The timeout for a trace batch in seconds. | - | 5 |
TELEMETRY_TRACE_GRPC_ENDPOINT | The gRPC endpoint for trace export. | - | localhost:4317 |
TELEMETRY_TRACE_TLS_ENABLED | Enables or disables TLS for trace export. | true, false | false |
TELEMETRY_TRACE_TLS_CERT | Path to the TLS certificate file for trace export. | - | "" |
TELEMETRY_METRICS_ENABLED | Enables or disables metrics telemetry. | true, false | false |
TELEMETRY_METRICS_TIMEOUT_SEC | The timeout for metrics export in seconds. | - | 30 |
TELEMETRY_METRICS_INTERVAL_SEC | The interval between metrics exports in seconds. | - | 60 |
TELEMETRY_METRICS_GRPC_ENDPOINT | The gRPC endpoint for metrics export. | - | localhost:4317 |
TELEMETRY_METRICS_TLS_ENABLED | Enables or disables TLS for metrics export. | true, false | false |
TELEMETRY_METRICS_TLS_CERT | Path to the TLS certificate file for metrics export. | - | "" |
TELEMETRY_METRICS_EXPORT_MODE | The export mode for metrics. | otlp, prometheus (implied by context) | otlp |
TELEMETRY_METRICS_EXPORT_PORT | The port for the metrics exporter. | - | 2112 |
3.5. Transform function
This is the core business logic of the transformer. It involves mapping the raw data into the Open Data Hub's data model and pushing it.
Each transformer implements its own logic, the below is only an example
- Go Code (main.go) - Transform Function
// ... constants and helper structs ...
// Constants for Station Types and Data Types, enhancing readability and consistency.
const (
stationTypeParent = "ParkingFacility" // Parent station type
stationType = "ParkingStation" // Child station type
shortStay = "short_stay"
subscribers = "subscribers"
dataTypeFreeShort = "free_" + shortStay
dataTypeFreeSubs = "free_" + subscribers
dataTypeFreeTotal = "free"
dataTypeOccupiedShort = "occupied_" + shortStay
dataTypeOccupiedSubs = "occupied_" + subscribers
dataTypeOccupiedTotal = "occupied"
)
// TransformWithBdp is a wrapper that adapts the main Transform logic
// to the signature expected by the SDK's tr.Handler, passing the BDP client.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[FacilityData] {
return func(ctx context.Context, payload *rdb.Raw[FacilityData]) error {
// Calls the core transformation function
return Transform(ctx, bdp, payload)
}
}
// Transform contains the main business logic for converting raw parking data
// into BDP stations and measurements.
func Transform(ctx context.Context, bdp bdplib.Bdp, payload *rdb.Raw[FacilityData]) error {
log := logger.Get(ctx) // Get a context-aware logger for better tracing
var parentStations []bdplib.Station // Slice to collect parent stations for batch sync
// Map to store child stations, using their BDP ID as key, to avoid duplicates
// and easily update metadata if a ParkNo appears multiple times within a payload.
stations := make(map[string]bdplib.Station)
dataMapParent := bdp.CreateDataMap() // Data map for parent station measurements
dataMap := bdp.CreateDataMap() // Data map for child station measurements
ts := payload.Timestamp.UnixMilli() // Get the timestamp for all measurements
// Iterate over each facility in the raw payload
for _, facility := range payload.Rawdata {
// 1. Parent Station Creation and Metadata Association
// Principle: Hierarchical Stations & Metadata Association
id := facility.GetID()
parent_station_data := station_proto.GetStationByID(strconv.Itoa(id)) // Lookup parent station metadata
if nil == parent_station_data {
log.Error("no parent station data", "facility_id", strconv.Itoa(id))
panic("no parent station data") // Fail if critical metadata is missing
}
parentStation := bdplib.CreateStation(
parent_station_data.ID, // BDP ID from CSV
parent_station_data.Name, // Name from CSV
stationTypeParent, // Defined parent station type
parent_station_data.Lat, // Lat from CSV
parent_station_data.Lon, // Lon from CSV
bdp.GetOrigin(), // Provenance origin
)
parentStation.MetaData = parent_station_data.ToMetadata() // Convert CSV metadata to BDP metadata
parentStations = append(parentStations, parentStation) // Add to batch for later sync
// Initialize aggregated sums for parent facility measurements
freeTotalSum := 0
occupiedTotalSum := 0
capacityTotal := 0
freeSubscribersSum := 0
occupiedSubscribersSum := 0
capacitySubscribers := 0
freeShortStaySum := 0
occupiedShortStaySum := 0
capacityShortStay := 0
// Iterate over detailed facility data (e.g., per parking zone/category)
for _, freePlace := range facility.FacilityDetails {
// 2. Child Station Creation/Update and Measurement Pushing
// Principle: Dynamic Station Creation & Measurement Categorization
facility_id := strconv.Itoa(facility.GetID()) + "_" + strconv.Itoa(freePlace.ParkNo) // Derived child station ID
station_data := station_proto.GetStationByID(facility_id) // Lookup child station metadata
if nil == station_data {
log.Error("no station data", "facility_id", facility_id)
panic("no station data")
}
// Check if station already processed in this batch (for multiple categories per ParkNo)
station, ok := stations[facility_id]
if !ok { // If not already created, create a new child station
station = bdplib.CreateStation(
station_data.ID, station_data.Name, stationType, station_data.Lat, station_data.Lon, bdp.GetOrigin())
station.ParentStation = parentStation.Id // Link to parent facility
station.MetaData = station_data.ToMetadata() // Assign metadata from CSV
stations[station_data.ID] = station // Store in map for batch sync
log.Debug("Create station " + station_data.ID)
}
// Add category-specific metadata and measurements for child stations
switch freePlace.CountingCategoryNo {
...
default: // Total (or default category)
station.MetaData["free_limit"] = freePlace.FreeLimit
station.MetaData["occupancy_limit"] = freePlace.OccupancyLimit
station.MetaData["capacity"] = freePlace.Capacity
dataMap.AddRecord(station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600))
dataMap.AddRecord(station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600))
freeTotalSum += freePlace.FreePlaces
occupiedTotalSum += freePlace.CurrentLevel
capacityTotal += freePlace.Capacity
}
}
// Assign aggregated total facility data to parent station's measurements and metadata
// Principle: Aggregation for Parent Station
if freeTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freeTotalSum, 600))
}
if occupiedTotalSum > 0 {
dataMapParent.AddRecord(parent_station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, occupiedTotalSum, 600))
}
if capacityTotal > 0 {
parentStation.MetaData["capacity"] = capacityTotal
}
// ... similar logic for subscribers and short stay totals ...
}
// 3. Final Synchronization and Data Pushing
// Principle: Batch Synchronization & Data Pushing
// The `true, true` arguments to SyncStations mean:
// - `true` for `syncState`: Synchronize the station's active/inactive state based on its presence in the call.
// - `true` for `onlyActivate`: If `true`, only existing stations are activated; new stations are not created.
// (NOTE: In practice, you often want `false` for `onlyActivate` if new stations can appear dynamically.
// The example might imply pre-existing stations in `resources/stations.csv` or a specific activation logic.)
bdp.SyncStations(stationTypeParent, parentStations, true, true)
bdp.SyncStations(stationType, values(stations), true, true) // `values` converts map to slice for batch sync
bdp.PushData(stationTypeParent, dataMapParent) // Push parent station measurements
bdp.PushData(stationType, dataMap) // Push child station measurements
return nil // Transformation successful
}
// values is a generic helper to extract all values from a map into a slice.
func values[M ~map[K]V, K comparable, V any](m M) []V {
r := make([]V, 0, len(m))
for _, v := range m {
r = append(r, v)
}
return r
}
// SyncDataTypes defines and synchronizes all data types used by this transformer
// with the BDP Timeseries Writer. This is typically called once at startup.
func SyncDataTypes(bdp bdplib.Bdp) {
var dataTypes []bdplib.DataType
// Define all required data types with their names, units, record types (rtype), and descriptions.
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeShort, "", "Amount of free 'short stay' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeSubs, "", "Amount of free 'subscribed' parking slots", "Instantaneous"))
dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeTotal, "", "Amount of free parking slots", "Instantaneous"))
// ... similar for occupied data types ...
// Synchronize the defined data types with BDP under the specific stationType.
err := bdp.SyncDataTypes(stationType, dataTypes)
ms.FailOnError(context.Background(), err, "failed to sync types")
}
Critical Points & Patterns:
SyncDataTypes(b): This crucial function, called once on startup, defines all the possible data types (dataTypeFreeShort,dataTypeOccupiedTotal, etc.) that this transformer will ever publish. It registers them with the BDP API, ensuring the target system understands the metrics.- Hierarchical Station Model:
- Parent Station (
ParkingFacility): Represents the overall parking lot. Its metadata is retrieved fromstations.csvusing thefacility.GetID(). - Child Station (
ParkingStation): Represents specific parking zones or categories within a facility. Its ID is dynamically constructed (facility.GetID() + "_" + strconv.Itoa(freePlace.ParkNo)). station.ParentStation = parentStation.Id: This line establishes the crucial parent-child relationship in BDP.
- Parent Station (
- Dynamic Station Creation/Update: Stations (both parent and child) are created or updated within the
Transformfunction loop. Thestationsmap is used to collect unique child stations for batch synchronization. - Metadata Enrichment:
parentStation.MetaData = parent_station_data.ToMetadata()andstation.MetaData = station_data.ToMetadata()show how to dynamically add detailed metadata from the static CSV to the BDP station objects. - Measurement Categorization and Aggregation: The
switch freePlace.CountingCategoryNostatement demonstrates how incoming raw data can be categorized and how individual measurements are assigned to appropriate BDP data types (dataTypeFreeShort,dataTypeOccupiedTotal, etc.). It also shows how to aggregate child station data (e.g.,freeTotalSum) to create parent station measurements. bdplib.CreateRecord(ts, value, period): Creates a single measurement record.tsis the Unix milliseconds timestamp,valueis the transformed measurement, andperioddefines the aggregation interval (e.g., 600 seconds for 10 minutes).- Batch Synchronization and Pushing:
bdp.SyncStations(stationTypeParent, parentStations, true, true)andbdp.SyncStations(stationType, values(stations), true, true): Both parent and child stations are synchronized in batches. Thetrue, truearguments indicatesyncState=true(update station state based on presence in this call) andonlyActivate=true(only activate existing stations, do not create new ones from this sync call; this implies that all valid station IDs are expected to be pre-registered or handled differently if new ones can appear).bdp.PushData(stationTypeParent, dataMapParent)andbdp.PushData(stationType, dataMap): All collected measurements for each station type are pushed in single batch API calls, which is crucial for performance.
- Error Handling: Within the
Transformfunction,ms.FailOnErroris used for critical errors like parsing the timestamp. Other errors (e.g.,Location not found) are logged but might not stop the entire transformation, allowing partial success.
3.6. DTO (dto.go)
The dto.go file would define the Go structs that represent the schema of the raw input data (e.g., FacilityData and its nested structs like FacilityDetail). This is the type tr.NewTr[FacilityData] expects. (The actual content of dto.go was not provided, but its purpose is critical).
4. Containerization with Docker
The Dockerfile for a transformer is very similar to a collector's, with the critical addition of copying any static resource files.
- Dockerfile
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
FROM golang:1.23.7-bookworm as base
FROM base as build-env
WORKDIR /app
COPY src/. . # Copy source code
COPY resources/. ./resources # <--- CRITICAL: Copy static resource files
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o main
# BUILD published image (minimal, production-ready)
FROM alpine:latest as build
WORKDIR /app
COPY --from=build-env /app/main . # Copy compiled binary
COPY --from=build-env /app/resources /resources # <--- CRITICAL: Copy resources to final image
ENTRYPOINT [ "./main"]
# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base as dev
WORKDIR /code
CMD ["go", "run", "main.go"]
5. Local Orchestration with Docker Compose
The docker-compose.yml for a transformer will be almost identical to a collector's, as it still needs a message queue (RabbitMQ) to consume from.
- docker-compose.yml (Conceptual)
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
x-app-common:
&app-common
build:
dockerfile: infrastructure/docker/Dockerfile
context: .
target: dev
env_file:
- .env
volumes:
- ./src:/code
- ./infrastructure:/code/infrastructure
- pkg:/go/pkg/mod
working_dir: /code
services:
app-bdp:
<<: *app-common
profiles:
- bdp
networks:
- ingestion
app:
<<: *app-common
depends_on:
rabbitmq:
condition: service_healthy
profiles:
- dev
rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
profiles:
- dev
volumes:
pkg:
networks:
ingestion:
external: true
When developing you must careful to start the transformer with rabbitmq service
docker compose up --profile dev
When testing the complete pipeline (comprehensive of the collector) you need to start the full Open Data Hub Core and you must careful to start the transformer without rabbitmq service
docker compose up --profile bdp
6. Deployment with Helm
The Helm chart values for a transformer will also mirror a collector's, primarily focused on image details and environment variable configuration.
- Helm Chart Values (Conceptual `your-transformer.test.yaml`)
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
image:
repository: ghcr.io/noi-techpark/opendatahub-transformers/tr-parking-offstreet-skidata # Your transformer's image
pullPolicy: IfNotPresent
tag: "0.0.1" # Specific image tag for deployment
env: # Environment variables passed directly to the container
LOG_LEVEL: "INFO" # Example: Higher log level for production/test
MQ_QUEUE: s3-poller.parking-offstreet-skidata
MQ_EXCHANGE: routed
MQ_KEY: s3-poller.parking-offstreet-skidata
MQ_CLIENT: tr-parking-offstreet-skidata
RAW_DATA_BRIDGE_ENDPOINT: "http://raw-data-bridge-service.default.svc.cluster.local:2000/" # Kubernetes service URL
BDP_BASE_URL: https://share.opendatahub.testingmachine.eu # External URL for BDP
BDP_PROVENANCE_VERSION: 0.1.0
BDP_PROVENANCE_NAME: tr-parking-offstreet-skidata
BDP_ORIGIN: province-bolzano
SERVICE_NAME: tr-parking-offstreet-skidata # For observability
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317
envSecretRef: # Reference Kubernetes secrets for sensitive information
- name: MQ_URI
secret: rabbitmq-svcbind # Secret containing RabbitMQ connection URI
key: uri
- name: ODH_TOKEN_URL # OAuth token URL from secret
secret: odh-oauth-client-credentials
key: token_url
- name: ODH_CLIENT_ID # OAuth Client ID from secret
secret: odh-oauth-client-credentials
key: client_id
- name: ODH_CLIENT_SECRET # OAuth Client Secret from secret
secret: odh-oauth-client-credentials
key: client_secret
Critical Points (Unique to Transformer / OAuth):
- OAuth Credentials in Secrets: For production,
ODH_CLIENT_ID,ODH_CLIENT_SECRET, andODH_TOKEN_URLshould always be loaded from Kubernetes secrets usingenvSecretRef. This ensures sensitive authentication details are not committed to Git.
7. Testing Your Transformer
The provided main_test.go demonstrates effective testing strategies for transformers.
- Go Code (main_test.go)
// SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/noi-techpark/go-bdp-client/bdplib"
"github.com/noi-techpark/go-bdp-client/bdpmock"
"github.com/noi-techpark/opendatahub-go-sdk/ingest/rdb"
"github.com/noi-techpark/opendatahub-go-sdk/testsuite" // Utility for deep comparison with files
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)
// NormalizeBdpMockCalls sorts all slices within the BdpMockCalls structure
// to ensure order-independent comparisons between expected and actual API calls.
func NormalizeBdpMockCalls(calls *bdpmock.BdpMockCalls) {
// For SyncedDataTypes: Sort inner slices of DataType, then sort the outer slice of DataType slices.
for key, dataTypesCalls := range calls.SyncedDataTypes {
for i := range dataTypesCalls {
sort.Slice(dataTypesCalls[i], func(a, b int) bool {
return fmt.Sprintf("%v", dataTypesCalls[i][a]) < fmt.Sprintf("%v", dataTypesCalls[i][b])
})
}
sort.Slice(dataTypesCalls, func(i, j int) bool {
return dataTypeSliceToString(dataTypesCalls[i]) < dataTypeSliceToString(dataTypesCalls[j])
})
calls.SyncedDataTypes[key] = dataTypesCalls
}
// For SyncedData (pushed measurements): Sort DataMaps (batches) by their Name.
for key, dataMaps := range calls.SyncedData {
sort.Slice(dataMaps, func(i, j int) bool {
return dataMaps[i].Name < dataMaps[j].Name
})
calls.SyncedData[key] = dataMaps
}
// For SyncedStations: First sort stations within each call, then sort the calls themselves.
for key, stationCalls := range calls.SyncedStations {
for i := range stationCalls {
sort.Slice(stationCalls[i].Stations, func(a, b int) bool {
return stationCalls[i].Stations[a].Id < stationCalls[i].Stations[b].Id
})
}
sort.Slice(stationCalls, func(i, j int) bool {
// Complex sorting for station calls, ensuring deterministic order.
var idI, idJ string
if len(stationCalls[i].Stations) > 0 { idI = stationCalls[i].Stations[0].Id }
if len(stationCalls[j].Stations) > 0 { idJ = stationCalls[j].Stations[0].Id }
if idI == idJ {
if stationCalls[i].SyncState == stationCalls[j].SyncState {
return !stationCalls[i].OnlyActivate && stationCalls[j].OnlyActivate
}
return !stationCalls[i].SyncState && stationCalls[j].SyncState
}
return idI < idJ
})
calls.SyncedStations[key] = stationCalls
}
}
// dataTypeSliceToString is a helper for sorting slices of bdplib.DataType.
func dataTypeSliceToString(slice []bdplib.DataType) string {
s := ""
for _, dt := range slice {
s += fmt.Sprintf("%v", dt)
}
return s
}
// TestMyBestParking tests the transformation logic for MyBestParking input (another source format).
func TestMyBestParking(t *testing.T) {
// Similar structure to TestSkidata, demonstrating testing multiple input formats.
var in = FacilityData{}
station_proto = ReadStations("../resources/stations.csv")
err := bdpmock.LoadInputData(&in, "../testdata/input/mybestparking.json")
require.Nil(t, err)
timestamp, err := time.Parse("2006-01-02", "2025-01-01")
require.Nil(t, err)
raw := rdb.Raw[FacilityData]{
Rawdata: in,
Timestamp: timestamp,
}
var out = bdpmock.BdpMockCalls{}
err = bdpmock.LoadOutput(&out, "../testdata/output/mybestparking--out.json")
require.Nil(t, err)
b := bdpmock.MockFromEnv()
err = Transform(context.TODO(), b, &raw)
require.Nil(t, err)
mock := b.(*bdpmock.BdpMock)
req := mock.Requests()
NormalizeBdpMockCalls(&req)
testsuite.DeepEqualFromFile(t, out, req)
}
// TestStations specifically tests the static station data loading and metadata conversion.
func TestStations(t *testing.T) {
stations := ReadStations("../resources/stations.csv")
// Test GetStationByID for non-existent and existent IDs
s := stations.GetStationByID("")
require.Nil(t, s)
s = stations.GetStationByID("406983") // An example ID from the CSV
require.NotNil(t, s)
assert.Equal(t, "105_facility", s.ID) // Check the BDP ID from CSV
m := s.ToMetadata() // Test metadata conversion
net := m["netex_parking"].(map[string]any) // Assert nested structure
assert.Equal(t, len(net), 7)
vtypes := (net["vehicletypes"]).(string)
assert.Equal(t, vtypes, "allPassengerVehicles")
s = stations.GetStationByID("608612") // Another example ID
require.NotNil(t, s)
assert.Equal(t, "608612", s.ID)
m = s.ToMetadata()
net2, ok := m["netex_parking"]
assert.Equal(t, false, ok) // Assert no netex_parking for this station
assert.Equal(t, nil, net2)
}
Critical Points:
bdpmock.BdpMock: This is the cornerstone of transformer unit testing. It fully mocks thebdplib.Bdpinterface, allowing you to:- Simulate
bdplib.FromEnv()with test configurations (bdpmock.MockFromEnv()). - Call your
Transformfunction without making actual network requests. - Record all interactions with the mock (
mock.Requests()).
- Simulate
- Input/Output Files (
testdata/input/*.json,testdata/output/*.json): This pattern is crucial for:- Deterministic Tests: Providing exact raw input payloads.
- Expected Behavior: Defining the precise
bdplibcalls (data types, stations, measurements) that your transformer is expected to make. TheseoutputJSON files are typically generated by running the test once with the mock and then verifying the output manually or using a tool liketestsuite.DeepEqualToFile.
NormalizeBdpMockCalls: This helper function is essential for robust testing when dealing with slices or maps where the order of elements might not be guaranteed by the transformation logic but doesn't affect the correctness of the BDP API calls. It sorts elements within the mock's recorded calls (SyncedDataTypes,SyncedData,SyncedStations) to ensureDeepEqualcomparisons are order-independent.testsuite.DeepEqualFromFile: This utility provides a convenient way to compare complex Go structs (likebdpmock.BdpMockCalls) against the content of a JSON file, making assertions concise and readable.- Separation of Concerns in Tests:
TestSkidata,TestMyBestParking: Focus on the end-to-end transformation from raw input to BDP calls.TestStations: Dedicated to testing the static data loading andToMetadataconversion logic instation.go, isolating that component.
8. Local Development Workflow
To get your transformer running locally:
- Clone Repositories:
opendatahub-collectors(contains your collector's source)infrastructure-v2(contains the shareddocker-compose.rabbitmq.ymland overall infrastructure compose files)
- Navigate to Transformer Directory:
cd opendatahub-collectors/transformers/foo-bar(or your specific transformer's path). - Create
.env: Copy the provided.envcontent into a file named.envin this directory. Fill in your AWS credentials (AWS_ACCESS_KEY_ID,AWS_ACCESS_SECRET_KEY). - Start Infrastructure: From the
infrastructure-v2directory, run the base and timeseries compose files:cd ../../infrastructure-v2 # Adjust path if needed
docker compose -f docker-compose.yml up -d # Run in detached mode
docker compose -f docker-compose.timeseries.yml up -d - Start Transformer: From your transformer's directory (
opendatahub-collectors/transformer/foo-bar), run itsdocker-compose.yml:cd opendatahub-collectors/transformers/foo-bar # Adjust path if needed
docker compose up --profile bdp --build # Build and run your transformer
Testing the Data Flow
- Check Transformer Logs: Observe the logs of your
appcontainer. - RabbitMQ Management: Access
http://localhost:15672(guest/guest) in your browser. Navigate to "Queues" and check theingressqueue (or whateverMQ_EXCHANGEyou configured). You should see messages accumulating there. - MongoDB: Connect to
mongodb://localhost:27017/?directConnection=trueusing a tool like MongoDB Compass. You can inspect the raw data stored by the SDK before it's picked up by a transformer.
This setup allows you to develop and test your data transformer in an environment that closely mirrors the production Kubernetes cluster, ensuring smooth integration with the Open Data Hub ecosystem.