Developing a Data Collector from Scratch
Developing a robust and reliable data collector is a critical step in any data integration pipeline. This guide will walk you through the essential principles, architectural patterns, and practical implementation details for building a data collector in Go, using the provided S3 poller example as a reference.
1. Understanding the Role of a Data Collector
A data collector is a specialized microservice responsible for:
- Source Interaction: Connecting to an external data source (e.g., an API, a database, a file system like S3, an FTP server).
- Data Retrieval: Fetching raw data from the source.
- Raw Data Publication: Publishing the retrieved raw data to a message queue, typically for subsequent processing by a transformer.
- Scheduling/Triggering: Operating on a schedule (e.g., cron, polling interval) or in response to external events.
The primary goal of a data collector is to reliably acquire data and make it available in a standardized raw format for the next stage of the pipeline.
Typical Pull-Data-Collector flow (the collector actively gets data from the provider)
Typical Push-Data-Collector flow (the collector waits for data from the provider)
2. Core Principles of Data Collector Development
Adhering to these principles ensures your data collector is maintainable, scalable, and resilient:
2.1. Modularity and Single Responsibility
Each data collector should focus on a single data source or a specific data acquisition method. Avoid combining multiple disparate data sources into one collector. This promotes:
- Clearer Logic: Easier to understand, debug, and test.
- Independent Deployment: Changes to one source don't affect others.
- Scalability: You can scale collectors for different sources independently.
2.2. Configuration Management
Externalize all configurable parameters (API keys, endpoints, polling intervals, bucket names) using environment variables. This keeps your code clean, allows for easy deployment across different environments (development, staging, production), and avoids hardcoding sensitive information.
2.3. Error Handling and Resilience
Data collection often involves interacting with external systems that can be unreliable. Your collector must:
- Handle Network Errors: Implement retries with exponential backoff for transient network issues.
- Validate Data: Ensure retrieved data is in the expected format before processing.
- Log Errors Effectively: Provide sufficient context for debugging.
- Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.
2.4. Observability (Logging, Tracing, Metrics)
For production systems, it's crucial to understand what your collector is doing.
- Logging: Use structured logging (
slogin Go) to record events, errors, and progress. - Tracing: Integrate with a distributed tracing system (like OpenTelemetry, as used by
opendatahub-go-sdk) to visualize the flow of requests and identify bottlenecks across microservices. - Metrics: Expose metrics (e.g., number of successful polls, failed polls, data volume, processing time) for monitoring and alerting.
2.5. Scheduling and Triggering
Determine how your collector will be activated:
- Polling: Regularly checking a source for new data (e.g., using cron jobs,
time.Ticker). - Event-Driven: Reacting to events (e.g., a webhook notification, a message on a specific queue).
The S3 poller uses a cron scheduler, which is a common pattern for polling.
3. Anatomy of a Go Data Collector: S3 Poller Example
Let's dissect the provided S3 poller example to understand its components and how it embodies the principles above.
3.1. Project Structure
A typical Go project for a data collector might look like this:
.
├── src/
│ ├── main.go \# Main application logic
│ └── go.mod \# Go modules file
│ └── go.sum \# Go modules checksums
│ └── go.work \# Go workspace file (if part of a monorepo)
│ └── go.work.sum \# Go workspace checksums
├── .env \# Local environment variables for development
├── Dockerfile \# Containerization instructions (often in infrastructure/docker)
├── docker-compose.yml \# Local development setup (often in collector's root)
└── helm/ \# Helm charts for Kubernetes deployment
└── your-collector-name.yaml
3.2. Environment Variables and Configuration Management
By design we require all configuration to be done via env variables.
This simplifies the develop cycle and deployment allowing us to use .env files for testing and Helm values.yaml for deployment.
3.3. SDK Integration (opendatahub-go-sdk)
The opendatahub-go-sdk is central to simplifying collector development.
- Go Code (main.go)
// ... imports ...
func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")
// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()
// ... AWS config and client setup ...
// 2. Create a new data collector instance
// dc.EmptyData indicates that the input channel doesn't carry specific data,
// as the cron job just triggers a collection run.
collector := dc.NewDc[dc.EmptyData](context.Background(), env.Env)
// 3. Schedule data collection runs
c := cron.New(cron.WithSeconds())
c.AddFunc(env.CRON, func() {
// When the cron job triggers, send an empty input signal to the collector's channel.
// This tells the collector to execute its data fetching logic.
collector.GetInputChannel() <- dc.NewInput[dc.EmptyData](context.Background(), nil)
})
slog.Info("Setup complete. Starting cron scheduler")
go func() {
c.Run() // Start the cron scheduler in a goroutine
}()
// 4. Start the collector's main processing loop
// The provided function (anonymous func) is the core data collection logic.
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// This function is executed each time an input is received on the collector's channel.
// It's responsible for fetching raw data and returning it in the SDK's RawAny format.
// ... S3 data fetching logic ...
var raw any
if env.RAW_BINARY {
raw = body // Store raw bytes
} else {
raw = string(body) // Store as string
}
// Return the collected raw data in the SDK's standardized format
return &rdb.RawAny{
Provider: env.PROVIDER, // Unique identifier for the data source
Timestamp: time.Now(), // Timestamp of data collection
Rawdata: raw, // The actual raw data
}, nil
})
// 5. Handle fatal errors from the collector's main loop
ms.FailOnError(context.Background(), err, err.Error())
}
Key SDK Components:
ms.InitWithEnv(ctx, "", &env): This function is the SDK's entry point for initialization. It sets up:- Structured Logging (
slog): Configures logging based onLOGLEVEL. - Messaging: Initializes the RabbitMQ client using
MQ_URI,MQ_CLIENT,MQ_EXCHANGE. The SDK handles publishing raw data to the message queue. - Telemetry: Sets up OpenTelemetry for tracing and metrics, using
TELEMETRY_TRACE_GRPC_ENDPOINT(seen in Helm config).
- Structured Logging (
tel.FlushOnPanic(): Ensures that any buffered telemetry data (traces, metrics) is sent to the configured endpoint before the application crashes due to a panic. This is crucial for debugging production issues.dc.NewDc[dc.EmptyData](ctx, env.Env): Creates a new data collector instance. The[dc.EmptyData]type parameter indicates that the input to the collector's processing function doesn't carry specific data; it's simply a trigger.collector.GetInputChannel(): Returns a channel that you can send signals to. Each signal (in this case,dc.NewInput[dc.EmptyData](context.Background(), nil)) triggers thecollector.Startfunction's callback. This decouples the scheduling mechanism from the core collection logic.collector.Start(ctx, func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error)): This is the main processing loop of the collector.- It takes a
context.Contextand a callback function. - The callback function is where your actual data fetching logic resides.
- It's expected to return an
*rdb.RawAnyobject (the standardized raw data format) or an error. - The SDK then takes this
RawAnyobject and publishes it to the configured message queue.
- It takes a
rdb.RawAny: This struct defines the standard format for raw data that the SDK expects. It includesProvider(a unique identifier for the data source),Timestamp(when the data was collected), andRawdata(the actual fetched content).ms.FailOnError(ctx, err, msg): A utility function from the SDK that logs a fatal error and exits the application iferris notnil. This is used for unrecoverable errors during setup or the main loop.
SDK Env Variables
When using the official SDK, there are some special env variables used to configure various aspects of the collector
| Variable | Description | Allowed Values | Default |
|---|---|---|---|
PROVIDER | String identifying the data provider | [path1]/[path2]/... | (No default set) |
MQ_URI | Connection URI to connect to RabbitMQ | - | (No default set) |
MQ_EXCHANGE | Exchange name where to push messages | - | ingress |
MQ_CLIENT | Client name to identify the connection in RabbitMQ | - | (No default set) |
LOG_LEVEL | Sets the logging severity level. | DEBUG, INFO, WARN, ERROR | INFO |
SERVICE_NAME | The name of the service. | - | gotel |
SERVICE_VERSION | The version of the service. | - | 0.0.1 |
TELEMETRY_ENABLED | Enables or disables telemetry. | true, false | true |
TELEMETRY_TRACE_ENABLED | Enables or disables trace telemetry. | true, false | true |
TELEMETRY_TRACE_BATCH_SIZE | The maximum number of traces to be batched. | - | 10 |
TELEMETRY_TRACE_BATCH_TIMEOUT_SEC | The timeout for a trace batch in seconds. | - | 5 |
TELEMETRY_TRACE_GRPC_ENDPOINT | The gRPC endpoint for trace export. | - | localhost:4317 |
TELEMETRY_TRACE_TLS_ENABLED | Enables or disables TLS for trace export. | true, false | false |
TELEMETRY_TRACE_TLS_CERT | Path to the TLS certificate file for trace export. | - | "" |
TELEMETRY_METRICS_ENABLED | Enables or disables metrics telemetry. | true, false | false |
TELEMETRY_METRICS_TIMEOUT_SEC | The timeout for metrics export in seconds. | - | 30 |
TELEMETRY_METRICS_INTERVAL_SEC | The interval between metrics exports in seconds. | - | 60 |
TELEMETRY_METRICS_GRPC_ENDPOINT | The gRPC endpoint for metrics export. | - | localhost:4317 |
TELEMETRY_METRICS_TLS_ENABLED | Enables or disables TLS for metrics export. | true, false | false |
TELEMETRY_METRICS_TLS_CERT | Path to the TLS certificate file for metrics export. | - | "" |
TELEMETRY_METRICS_EXPORT_MODE | The export mode for metrics. | otlp, prometheus (implied by context) | otlp |
TELEMETRY_METRICS_EXPORT_PORT | The port for the metrics exporter. | - | 2112 |
3.4. Data Collection Logic (S3 Interaction)
The core logic of fetching data from S3 is encapsulated within the anonymous function passed to collector.Start.
Each collector implements its own logic, the below is only an example
- Go Code (main.go)
// ...
// Create a custom AWS configuration
customConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(env.AWS_REGION), // Configure AWS region from environment
config.WithCredentialsProvider( // Provide static credentials from environment
credentials.NewStaticCredentialsProvider(env.AWS_ACCESS_KEY_ID, env.AWS_ACCESS_SECRET_KEY, ""),
),
)
ms.FailOnError(context.Background(), err, "failed to create AWS config") // Fatal error if config fails
// Create an S3 client using the custom configuration
s3Client := s3.NewFromConfig(customConfig)
// ...
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// Get the object from S3
output, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(env.AWS_S3_BUCKET_NAME), // S3 bucket name from environment
Key: aws.String(env.AWS_S3_FILE_NAME), // S3 file key (name) from environment
})
if err != nil {
// Log the error with context (bucket, file) and return nil, err to the SDK
slog.Error("error while getting s3 object:", "err", err, "bucket", env.AWS_S3_BUCKET_NAME, "file", env.AWS_S3_FILE_NAME)
return nil, err
}
defer output.Body.Close() // Ensure the S3 response body is closed
body, err := io.ReadAll(output.Body) // Read the entire S3 object body
if err != nil {
slog.Error("error reading response body:", "err", err)
return nil, err
}
var raw any // Declare a variable to hold the raw data
if env.RAW_BINARY {
raw = body // If RAW_BINARY is true, store the raw bytes
} else {
raw = string(body) // Otherwise, convert bytes to string
}
// Construct the standardized RawAny object
return &rdb.RawAny{
Provider: env.PROVIDER, // Identifier for this data source
Timestamp: time.Now(), // Timestamp of when the data was collected
Rawdata: raw, // The actual raw data (string or []byte)
}, nil
})
ms.FailOnError(context.Background(), err, err.Error()) // Fatal error if collector.Start fails
}
Critical Points:
- AWS SDK Initialization: The AWS SDK is initialized with the
AWS_REGION,AWS_ACCESS_KEY_ID, andAWS_ACCESS_SECRET_KEYfrom environment variables. This is a standard pattern for configuring cloud service clients. - Error Handling: Crucially, errors from
s3Client.GetObjectandio.ReadAllare logged withslog.Error(providing context like bucket and file names) and then returned. Thecollector.Startfunction will then handle these errors, potentially preventing the message from being acknowledged and allowing for retries or dead-lettering by the messaging system. rdb.RawAnyStructure: The returned*rdb.RawAnyobject is the standardized output of any data collector using this SDK. It's the payload that will be sent to the message queue. TheProviderfield is particularly important as it uniquely identifies the source of this raw data, allowing transformers to filter or process it accordingly.
4. Containerization with Docker
Docker is the recommended way to package and deploy your data collector. The provided Dockerfile uses a multi-stage build to create efficient and secure images.
- Dockerfile
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
FROM golang:1.23.7-bookworm AS base
EXPOSE 8080 # Expose port 8080 (though this collector doesn't run a web server, it's a common practice)
FROM base AS build-env
WORKDIR /app
COPY src/ . # Copy source code to the working directory
RUN go mod download # Download Go module dependencies
RUN CGO_ENABLED=0 GOOS=linux go build -o main # Build the Go application:
# CGO_ENABLED=0: Disables CGo, producing a statically linked binary
# GOOS=linux: Compiles for Linux operating system
# -o main: Output executable named 'main'
# BUILD published image (production-ready, minimal image)
FROM alpine:3 AS build
WORKDIR /app
COPY --from=build-env /app/main . # Copy only the compiled binary from the build-env stage
ENTRYPOINT [ "./main"] # Set the entrypoint for the container
# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base AS dev
WORKDIR /code
CMD ["go", "run", "."] # Run the Go application directly from source for development
# TESTS
FROM base AS test
WORKDIR /code
CMD ["go", "test", "."] # Run Go tests
Critical Points:
- Multi-Stage Build: This is a best practice for Go applications.
base: Defines the base Go environment.build-env: Used only for building the executable. It includes source code and dependencies.build: The final, slim production image. It only copies the compiled binary frombuild-env, resulting in a much smaller image size (e.g., usingalpine:3instead of the fullgolangimage). This reduces attack surface and download times.devandtest: Separate stages for local development and running tests, allowing for different build/run environments without polluting the production image.
CGO_ENABLED=0 GOOS=linux: This is crucial for Go applications in Docker. It ensures the Go binary is statically linked and compiled for Linux, making it highly portable and suitable for minimal base images like Alpine.
4.1. Local Orchestration with Docker Compose
docker-compose.yml simplifies running and testing your collector and its dependencies (like RabbitMQ) locally.
- docker-compose.yml
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
x-app-common:
&app-common
build:
dockerfile: infrastructure/docker/Dockerfile
context: .
target: dev
env_file:
- .env
volumes:
- ./src:/code
- ./infrastructure:/code/infrastructure
- pkg:/go/pkg/mod
working_dir: /code
services:
app-bdp:
<<: *app-common
profiles:
- bdp
networks:
- ingestion
app:
<<: *app-common
depends_on:
rabbitmq:
condition: service_healthy
profiles:
- dev
rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
profiles:
- dev
volumes:
pkg:
networks:
ingestion:
external: true
When developing you must careful to start the collector with rabbitmq service
docker compose up --profile dev
When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq service
docker compose up --profile bdp
5. Deployment with Helm
Helm is a package manager for Kubernetes, used to define, install, and upgrade complex Kubernetes applications. The helm/meteorology-bz-forecast.test.yaml file is a values file that overrides default settings in a Helm chart for a specific deployment environment (e.g., a test environment).
- Helm Chart Values (.test.yaml)
image:
repository: ghcr.io/noi-techpark/opendatahub-collectors/dc-s3-poller # Docker image repository
pullPolicy: IfNotPresent # Only pull image if not already present on the node
tag: "0.0.1" # Specific image tag/version to deploy
env: # Environment variables passed directly to the container
MQ_CLIENT: dc-meteorology-bz-forecast
PROVIDER: s3-poller/meteorology-bz-forecast
CRON: "0 0 0/1 * * *" # Production cron schedule (hourly)
AWS_REGION: "eu-west-1"
AWS_S3_FILE_NAME: "SMOS_MCPL-WX_EXP_SIAG.JSON"
AWS_S3_BUCKET_NAME: dc-meteorology-province-forecast
SERVICE_NAME: dc-meteorology-bz-forecast # Service name for observability/discovery
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317 # OpenTelemetry collector endpoint
envSecretRef: # Reference Kubernetes secrets for sensitive environment variables
- name: MQ_URI # Name of the environment variable
secret: rabbitmq-svcbind # Name of the Kubernetes secret
key: uri # Key within the secret that holds the value
Critical Points:
image: Defines which Docker image to deploy.repository: The path to your Docker image (e.g., in GitHub Container Registry).pullPolicy:IfNotPresentis common for test/dev,Alwaysfor production to ensure the latest image is used.tag: Crucial for versioning and reproducibility. Always use a specific tag, neverlatestin production.
env: Directly sets environment variables within the Kubernetes pod. These values typically override defaults defined in the main Helm chart. Notice howCRONis set to an hourly schedule here, different from the local development.envfile.envSecretRef: This is paramount for security in production environments. Instead of hardcoding sensitive values likeMQ_URIdirectly in the Helm values (which would be stored in Git),envSecretReftells Kubernetes to fetch the value forMQ_URIfrom a Kubernetes Secret namedrabbitmq-svcbindunder the keyuri. This ensures sensitive data is not exposed in configuration files.SERVICE_NAMEandTELEMETRY_TRACE_GRPC_ENDPOINT: These are used for integrating with the Kubernetes cluster's observability stack.SERVICE_NAMEhelps identify the service in monitoring tools, andTELEMETRY_TRACE_GRPC_ENDPOINTpoints to the OpenTelemetry collector or agent within the cluster, enabling distributed tracing.
6. Local Development Workflow
To get your S3 poller data collector running locally:
- Clone Repositories:
opendatahub-collectors(contains your collector's source)infrastructure-v2(contains the shareddocker-compose.rabbitmq.ymland overall infrastructure compose files)
- Navigate to Collector Directory:
cd opendatahub-collectors/collectors/s3-poller(or your specific collector's path). - Create
.env: Copy the provided.envcontent into a file named.envin this directory. Fill in your AWS credentials (AWS_ACCESS_KEY_ID,AWS_ACCESS_SECRET_KEY). - Start Infrastructure: From the
infrastructure-v2directory, run the base and timeseries compose files:cd ../../infrastructure-v2 # Adjust path if needed
docker compose -f docker-compose.yml up -d # Run in detached mode
docker compose -f docker-compose.timeseries.yml up -d - Start Collector: From your collector's directory (
opendatahub-collectors/collectors/s3-poller), run itsdocker-compose.yml:cd opendatahub-collectors/collectors/s3-poller # Adjust path if needed
docker compose up --profile bdp --build # Build and run your collector
Testing the Data Flow
- Check Collector Logs: Observe the logs of your
appcontainer. You should see messages like "Starting data collector...", "Polling S3 bucket...", and "Published dummy S3 data to message queue." (if using the dummy example). - RabbitMQ Management: Access
http://localhost:15672(guest/guest) in your browser. Navigate to "Queues" and check theingressqueue (or whateverMQ_EXCHANGEyou configured). You should see messages accumulating there. - MongoDB: Connect to
mongodb://localhost:27017/?directConnection=trueusing a tool like MongoDB Compass. You can inspect the raw data stored by the SDK before it's picked up by a transformer.
This setup allows you to develop and test your data collector in an environment that closely mirrors the production Kubernetes cluster, ensuring smooth integration with the Open Data Hub ecosystem.