Developing a Data Collector from Scratch
Developing a robust and reliable data collector is a critical step in any data integration pipeline. This guide will walk you through the essential principles, architectural patterns, and practical implementation details for building a data collector in Go, using the provided S3 poller example as a reference.
1. Understanding the Role of a Data Collector
A data collector is a specialized microservice responsible for:
- Source Interaction: Connecting to an external data source (e.g., an API, a database, a file system like S3, an FTP server).
- Data Retrieval: Fetching raw data from the source.
- Raw Data Publication: Publishing the retrieved raw data to a message queue, typically for subsequent processing by a transformer.
- Scheduling/Triggering: Operating on a schedule (e.g., cron, polling interval) or in response to external events.
The primary goal of a data collector is to reliably acquire data and make it available in a standardized raw format for the next stage of the pipeline.
Typical Pull-Data-Collector flow (the collector actively gets data from the provider)
Typical Push-Data-Collector flow (the collector waits for data from the provider)
2. Core Principles of Data Collector Development
Adhering to these principles ensures your data collector is maintainable, scalable, and resilient:
2.1. Modularity and Single Responsibility
Each data collector should focus on a single data source or a specific data acquisition method. Avoid combining multiple disparate data sources into one collector. This promotes:
- Clearer Logic: Easier to understand, debug, and test.
- Independent Deployment: Changes to one source don't affect others.
- Scalability: You can scale collectors for different sources independently.
2.2. Configuration Management
Externalize all configurable parameters (API keys, endpoints, polling intervals, bucket names) using environment variables. This keeps your code clean, allows for easy deployment across different environments (development, staging, production), and avoids hardcoding sensitive information.
2.3. Error Handling and Resilience
Data collection often involves interacting with external systems that can be unreliable. Your collector must:
- Handle Network Errors: Implement retries with exponential backoff for transient network issues.
- Validate Data: Ensure retrieved data is in the expected format before processing.
- Log Errors Effectively: Provide sufficient context for debugging.
- Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.
2.4. Observability (Logging, Tracing, Metrics)
For production systems, it's crucial to understand what your collector is doing.
- Logging: Use structured logging (
slog
in Go) to record events, errors, and progress. - Tracing: Integrate with a distributed tracing system (like OpenTelemetry, as used by
opendatahub-go-sdk
) to visualize the flow of requests and identify bottlenecks across microservices. - Metrics: Expose metrics (e.g., number of successful polls, failed polls, data volume, processing time) for monitoring and alerting.
2.5. Scheduling and Triggering
Determine how your collector will be activated:
- Polling: Regularly checking a source for new data (e.g., using cron jobs,
time.Ticker
). - Event-Driven: Reacting to events (e.g., a webhook notification, a message on a specific queue).
The S3 poller uses a cron scheduler, which is a common pattern for polling.
3. Anatomy of a Go Data Collector: S3 Poller Example
Let's dissect the provided S3 poller example to understand its components and how it embodies the principles above.
3.1. Project Structure
A typical Go project for a data collector might look like this:
.
├── src/
│ ├── main.go \# Main application logic
│ └── go.mod \# Go modules file
│ └── go.sum \# Go modules checksums
│ └── go.work \# Go workspace file (if part of a monorepo)
│ └── go.work.sum \# Go workspace checksums
├── .env \# Local environment variables for development
├── Dockerfile \# Containerization instructions (often in infrastructure/docker)
├── docker-compose.yml \# Local development setup (often in collector's root)
└── helm/ \# Helm charts for Kubernetes deployment
└── your-collector-name.yaml
3.2. Environment Variables and Configuration Management
The env
struct and the .env
file demonstrate how environment variables are loaded and managed.
- Go Code (main.go)
- .env file
- Helm Chart (.test.yaml)
// ... imports ...
var env struct {
dc.Env // Embeds common environment variables from the SDK (MQ_URI, LOGLEVEL, etc.)
CRON string `env:"CRON"` // Specific cron schedule for this collector
RAW_BINARY bool `env:"RAW_BINARY"` // Flag to interpret raw data as binary
AWS_REGION string `env:"AWS_REGION"`
AWS_S3_FILE_NAME string `env:"AWS_S3_FILE_NAME"`
AWS_S3_BUCKET_NAME string `env:"AWS_S3_BUCKET_NAME"`
AWS_ACCESS_KEY_ID string `env:"AWS_ACCESS_KEY_ID"`
AWS_ACCESS_SECRET_KEY string `env:"AWS_ACCESS_SECRET_KEY"`
}
func main() {
// Initializes messaging, logging, and telemetry based on env vars
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")
// ... rest of the main function ...
}
# Common SDK environment variables
MQ_URI=amqp://guest:guest@rabbitmq
MQ_CLIENT=dc-meteorology-bz-forecast # Identify your data collector to RabbitMQ
MQ_EXCHANGE=ingress # Defaults to ingress
LOGLEVEL=DEBUG
PROVIDER=s3-poller/meteorology-bz-forecast # Unique identifier for this data source
# Collector-specific environment variables
CRON="0/10 * * * * *" # Cron polling schedule, starting with seconds (e.g., every 10 seconds)
AWS_REGION=eu-west-1
AWS_S3_FILE_NAME=SMOS_MCPL-WX_EXP_SIAG.JSON
AWS_S3_BUCKET_NAME=dc-meteorology-province-forecast
AWS_ACCESS_KEY_ID=
AWS_ACCESS_SECRET_KEY=
# interpret response as binary and store as base64. defaults to false
RAW_BINARY=false
# ...
env:
MQ_CLIENT: dc-meteorology-bz-forecast
PROVIDER: s3-poller/meteorology-bz-forecast
CRON: "0 0 0/1 * * *" # Overrides local CRON for production (hourly)
AWS_REGION: "eu-west-1"
AWS_S3_FILE_NAME: "SMOS_MCPL-WX_EXP_SIAG.JSON"
AWS_S3_BUCKET_NAME: dc-meteorology-province-forecast
SERVICE_NAME: dc-meteorology-bz-forecast
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317 # Telemetry endpoint
envSecretRef: # Reference secrets for sensitive information
- name: MQ_URI
secret: rabbitmq-svcbind
key: uri
Critical Points:
dc.Env
Embedding: Thedc.Env
struct from the SDK automatically handles common environment variables likeMQ_URI
,LOGLEVEL
,PROVIDER
, etc. By embedding it, your customenv
struct inherits these fields, andms.InitWithEnv
can populate them.- Separation of Concerns:
.env
is for local development, and Helm values (.test.yaml
) are for deployment-specific configurations, including referencing Kubernetes secrets (envSecretRef
) for sensitive data likeMQ_URI
. ms.InitWithEnv
: This is a crucial SDK function that reads the environment variables, initializes the structured logger (slog
), sets up the messaging client (RabbitMQ), and configures telemetry. It's the entry point for the SDK's core functionalities.
3.3. SDK Integration (opendatahub-go-sdk
)
The opendatahub-go-sdk
is central to simplifying collector development.
- Go Code (main.go)
// ... imports ...
func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")
// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()
// ... AWS config and client setup ...
// 2. Create a new data collector instance
// dc.EmptyData indicates that the input channel doesn't carry specific data,
// as the cron job just triggers a collection run.
collector := dc.NewDc[dc.EmptyData](context.Background(), env.Env)
// 3. Schedule data collection runs
c := cron.New(cron.WithSeconds())
c.AddFunc(env.CRON, func() {
// When the cron job triggers, send an empty input signal to the collector's channel.
// This tells the collector to execute its data fetching logic.
collector.GetInputChannel() <- dc.NewInput[dc.EmptyData](context.Background(), nil)
})
slog.Info("Setup complete. Starting cron scheduler")
go func() {
c.Run() // Start the cron scheduler in a goroutine
}()
// 4. Start the collector's main processing loop
// The provided function (anonymous func) is the core data collection logic.
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// This function is executed each time an input is received on the collector's channel.
// It's responsible for fetching raw data and returning it in the SDK's RawAny format.
// ... S3 data fetching logic ...
var raw any
if env.RAW_BINARY {
raw = body // Store raw bytes
} else {
raw = string(body) // Store as string
}
// Return the collected raw data in the SDK's standardized format
return &rdb.RawAny{
Provider: env.PROVIDER, // Unique identifier for the data source
Timestamp: time.Now(), // Timestamp of data collection
Rawdata: raw, // The actual raw data
}, nil
})
// 5. Handle fatal errors from the collector's main loop
ms.FailOnError(context.Background(), err, err.Error())
}
Key SDK Components:
ms.InitWithEnv(ctx, "", &env)
: This function is the SDK's entry point for initialization. It sets up:- Structured Logging (
slog
): Configures logging based onLOGLEVEL
. - Messaging: Initializes the RabbitMQ client using
MQ_URI
,MQ_CLIENT
,MQ_EXCHANGE
. The SDK handles publishing raw data to the message queue. - Telemetry: Sets up OpenTelemetry for tracing and metrics, using
TELEMETRY_TRACE_GRPC_ENDPOINT
(seen in Helm config).
- Structured Logging (
tel.FlushOnPanic()
: Ensures that any buffered telemetry data (traces, metrics) is sent to the configured endpoint before the application crashes due to a panic. This is crucial for debugging production issues.dc.NewDc[dc.EmptyData](ctx, env.Env)
: Creates a new data collector instance. The[dc.EmptyData]
type parameter indicates that the input to the collector's processing function doesn't carry specific data; it's simply a trigger.collector.GetInputChannel()
: Returns a channel that you can send signals to. Each signal (in this case,dc.NewInput[dc.EmptyData](context.Background(), nil)
) triggers thecollector.Start
function's callback. This decouples the scheduling mechanism from the core collection logic.collector.Start(ctx, func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error))
: This is the main processing loop of the collector.- It takes a
context.Context
and a callback function. - The callback function is where your actual data fetching logic resides.
- It's expected to return an
*rdb.RawAny
object (the standardized raw data format) or an error. - The SDK then takes this
RawAny
object and publishes it to the configured message queue.
- It takes a
rdb.RawAny
: This struct defines the standard format for raw data that the SDK expects. It includesProvider
(a unique identifier for the data source),Timestamp
(when the data was collected), andRawdata
(the actual fetched content).ms.FailOnError(ctx, err, msg)
: A utility function from the SDK that logs a fatal error and exits the application iferr
is notnil
. This is used for unrecoverable errors during setup or the main loop.
3.4. Data Collection Logic (S3 Interaction)
The core logic of fetching data from S3 is encapsulated within the anonymous function passed to collector.Start
.
Each collector implements its own logic, the below is only an example
- Go Code (main.go)
// ...
// Create a custom AWS configuration
customConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(env.AWS_REGION), // Configure AWS region from environment
config.WithCredentialsProvider( // Provide static credentials from environment
credentials.NewStaticCredentialsProvider(env.AWS_ACCESS_KEY_ID, env.AWS_ACCESS_SECRET_KEY, ""),
),
)
ms.FailOnError(context.Background(), err, "failed to create AWS config") // Fatal error if config fails
// Create an S3 client using the custom configuration
s3Client := s3.NewFromConfig(customConfig)
// ...
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// Get the object from S3
output, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(env.AWS_S3_BUCKET_NAME), // S3 bucket name from environment
Key: aws.String(env.AWS_S3_FILE_NAME), // S3 file key (name) from environment
})
if err != nil {
// Log the error with context (bucket, file) and return nil, err to the SDK
slog.Error("error while getting s3 object:", "err", err, "bucket", env.AWS_S3_BUCKET_NAME, "file", env.AWS_S3_FILE_NAME)
return nil, err
}
defer output.Body.Close() // Ensure the S3 response body is closed
body, err := io.ReadAll(output.Body) // Read the entire S3 object body
if err != nil {
slog.Error("error reading response body:", "err", err)
return nil, err
}
var raw any // Declare a variable to hold the raw data
if env.RAW_BINARY {
raw = body // If RAW_BINARY is true, store the raw bytes
} else {
raw = string(body) // Otherwise, convert bytes to string
}
// Construct the standardized RawAny object
return &rdb.RawAny{
Provider: env.PROVIDER, // Identifier for this data source
Timestamp: time.Now(), // Timestamp of when the data was collected
Rawdata: raw, // The actual raw data (string or []byte)
}, nil
})
ms.FailOnError(context.Background(), err, err.Error()) // Fatal error if collector.Start fails
}
Critical Points:
- AWS SDK Initialization: The AWS SDK is initialized with the
AWS_REGION
,AWS_ACCESS_KEY_ID
, andAWS_ACCESS_SECRET_KEY
from environment variables. This is a standard pattern for configuring cloud service clients. - Error Handling: Crucially, errors from
s3Client.GetObject
andio.ReadAll
are logged withslog.Error
(providing context like bucket and file names) and then returned. Thecollector.Start
function will then handle these errors, potentially preventing the message from being acknowledged and allowing for retries or dead-lettering by the messaging system. rdb.RawAny
Structure: The returned*rdb.RawAny
object is the standardized output of any data collector using this SDK. It's the payload that will be sent to the message queue. TheProvider
field is particularly important as it uniquely identifies the source of this raw data, allowing transformers to filter or process it accordingly.
4. Containerization with Docker
Docker is the recommended way to package and deploy your data collector. The provided Dockerfile
uses a multi-stage build to create efficient and secure images.
- Dockerfile
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
FROM golang:1.23.7-bookworm AS base
EXPOSE 8080 # Expose port 8080 (though this collector doesn't run a web server, it's a common practice)
FROM base AS build-env
WORKDIR /app
COPY src/ . # Copy source code to the working directory
RUN go mod download # Download Go module dependencies
RUN CGO_ENABLED=0 GOOS=linux go build -o main # Build the Go application:
# CGO_ENABLED=0: Disables CGo, producing a statically linked binary
# GOOS=linux: Compiles for Linux operating system
# -o main: Output executable named 'main'
# BUILD published image (production-ready, minimal image)
FROM alpine:3 AS build
WORKDIR /app
COPY --from=build-env /app/main . # Copy only the compiled binary from the build-env stage
ENTRYPOINT [ "./main"] # Set the entrypoint for the container
# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base AS dev
WORKDIR /code
CMD ["go", "run", "."] # Run the Go application directly from source for development
# TESTS
FROM base AS test
WORKDIR /code
CMD ["go", "test", "."] # Run Go tests
Critical Points:
- Multi-Stage Build: This is a best practice for Go applications.
base
: Defines the base Go environment.build-env
: Used only for building the executable. It includes source code and dependencies.build
: The final, slim production image. It only copies the compiled binary frombuild-env
, resulting in a much smaller image size (e.g., usingalpine:3
instead of the fullgolang
image). This reduces attack surface and download times.dev
andtest
: Separate stages for local development and running tests, allowing for different build/run environments without polluting the production image.
CGO_ENABLED=0 GOOS=linux
: This is crucial for Go applications in Docker. It ensures the Go binary is statically linked and compiled for Linux, making it highly portable and suitable for minimal base images like Alpine.
4.1. Local Orchestration with Docker Compose
docker-compose.yml
simplifies running and testing your collector and its dependencies (like RabbitMQ) locally.
- docker-compose.yml
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
services:
app: # Your data collector service
depends_on:
rabbitmq: # Ensure rabbitmq is healthy before starting the app
condition: service_healthy
build:
dockerfile: infrastructure/docker/Dockerfile # Path to your Dockerfile
context: . # Build context is the current directory
target: dev # Use the 'dev' stage from the Dockerfile
env_file:
- .env # Load environment variables from the .env file
volumes:
- ./src:/code # Mount your source code for hot-reloading in dev mode
- pkg:/go/pkg/mod # Cache Go modules for faster builds
working_dir: /code # Set the working directory inside the container
rabbitmq: # RabbitMQ service (dependency)
extends: # Reuse a common RabbitMQ definition
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false # Do not attach to RabbitMQ's logs by default (can be noisy)
volumes:
pkg: # Define the named volume for Go module caching
When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq
service
docker compose up app
5. Deployment with Helm
Helm is a package manager for Kubernetes, used to define, install, and upgrade complex Kubernetes applications. The helm/meteorology-bz-forecast.test.yaml
file is a values file that overrides default settings in a Helm chart for a specific deployment environment (e.g., a test environment).
- Helm Chart Values (.test.yaml)
image:
repository: ghcr.io/noi-techpark/opendatahub-collectors/dc-s3-poller # Docker image repository
pullPolicy: IfNotPresent # Only pull image if not already present on the node
tag: "0.0.1" # Specific image tag/version to deploy
env: # Environment variables passed directly to the container
MQ_CLIENT: dc-meteorology-bz-forecast
PROVIDER: s3-poller/meteorology-bz-forecast
CRON: "0 0 0/1 * * *" # Production cron schedule (hourly)
AWS_REGION: "eu-west-1"
AWS_S3_FILE_NAME: "SMOS_MCPL-WX_EXP_SIAG.JSON"
AWS_S3_BUCKET_NAME: dc-meteorology-province-forecast
SERVICE_NAME: dc-meteorology-bz-forecast # Service name for observability/discovery
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317 # OpenTelemetry collector endpoint
envSecretRef: # Reference Kubernetes secrets for sensitive environment variables
- name: MQ_URI # Name of the environment variable
secret: rabbitmq-svcbind # Name of the Kubernetes secret
key: uri # Key within the secret that holds the value
Critical Points:
image
: Defines which Docker image to deploy.repository
: The path to your Docker image (e.g., in GitHub Container Registry).pullPolicy
:IfNotPresent
is common for test/dev,Always
for production to ensure the latest image is used.tag
: Crucial for versioning and reproducibility. Always use a specific tag, neverlatest
in production.
env
: Directly sets environment variables within the Kubernetes pod. These values typically override defaults defined in the main Helm chart. Notice howCRON
is set to an hourly schedule here, different from the local development.env
file.envSecretRef
: This is paramount for security in production environments. Instead of hardcoding sensitive values likeMQ_URI
directly in the Helm values (which would be stored in Git),envSecretRef
tells Kubernetes to fetch the value forMQ_URI
from a Kubernetes Secret namedrabbitmq-svcbind
under the keyuri
. This ensures sensitive data is not exposed in configuration files.SERVICE_NAME
andTELEMETRY_TRACE_GRPC_ENDPOINT
: These are used for integrating with the Kubernetes cluster's observability stack.SERVICE_NAME
helps identify the service in monitoring tools, andTELEMETRY_TRACE_GRPC_ENDPOINT
points to the OpenTelemetry collector or agent within the cluster, enabling distributed tracing.
6. Local Development Workflow
To get your S3 poller data collector running locally:
- Clone Repositories:
opendatahub-collectors
(contains your collector's source)infrastructure-v2
(contains the shareddocker-compose.rabbitmq.yml
and overall infrastructure compose files)
- Navigate to Collector Directory:
cd opendatahub-collectors/collectors/s3-poller
(or your specific collector's path). - Create
.env
: Copy the provided.env
content into a file named.env
in this directory. Fill in your AWS credentials (AWS_ACCESS_KEY_ID
,AWS_ACCESS_SECRET_KEY
). - Start Infrastructure: From the
infrastructure-v2
directory, run the base and timeseries compose files:cd ../../infrastructure-v2 # Adjust path if needed
docker compose -f docker-compose.yml up -d # Run in detached mode
docker compose -f docker-compose.timeseries.yml up -d - Start Collector: From your collector's directory (
opendatahub-collectors/collectors/s3-poller
), run itsdocker-compose.yml
:Thecd opendatahub-collectors/collectors/s3-poller # Adjust path if needed
docker compose up app --build # Build and run your collectorapp
service in your collector'sdocker-compose.yml
will wait forrabbitmq
to be healthy before starting.
Testing the Data Flow
- Check Collector Logs: Observe the logs of your
app
container. You should see messages like "Starting data collector...", "Polling S3 bucket...", and "Published dummy S3 data to message queue." (if using the dummy example). - RabbitMQ Management: Access
http://localhost:15672
(guest/guest) in your browser. Navigate to "Queues" and check theingress
queue (or whateverMQ_EXCHANGE
you configured). You should see messages accumulating there. - MongoDB: Connect to
mongodb://localhost:27017/?directConnection=true
using a tool like MongoDB Compass. You can inspect the raw data stored by the SDK before it's picked up by a transformer.
This setup allows you to develop and test your data collector in an environment that closely mirrors the production Kubernetes cluster, ensuring smooth integration with the Open Data Hub ecosystem.