Skip to main content

Developing a Data Collector from Scratch

Developing a robust and reliable data collector is a critical step in any data integration pipeline. This guide will walk you through the essential principles, architectural patterns, and practical implementation details for building a data collector in Go, using the provided S3 poller example as a reference.

1. Understanding the Role of a Data Collector

A data collector is a specialized microservice responsible for:

  • Source Interaction: Connecting to an external data source (e.g., an API, a database, a file system like S3, an FTP server).
  • Data Retrieval: Fetching raw data from the source.
  • Raw Data Publication: Publishing the retrieved raw data to a message queue, typically for subsequent processing by a transformer.
  • Scheduling/Triggering: Operating on a schedule (e.g., cron, polling interval) or in response to external events.

The primary goal of a data collector is to reliably acquire data and make it available in a standardized raw format for the next stage of the pipeline.

Typical Pull-Data-Collector flow (the collector actively gets data from the provider)

pull collector flow

Typical Push-Data-Collector flow (the collector waits for data from the provider)

push collector flow

2. Core Principles of Data Collector Development

Adhering to these principles ensures your data collector is maintainable, scalable, and resilient:

2.1. Modularity and Single Responsibility

Each data collector should focus on a single data source or a specific data acquisition method. Avoid combining multiple disparate data sources into one collector. This promotes:

  • Clearer Logic: Easier to understand, debug, and test.
  • Independent Deployment: Changes to one source don't affect others.
  • Scalability: You can scale collectors for different sources independently.

2.2. Configuration Management

Externalize all configurable parameters (API keys, endpoints, polling intervals, bucket names) using environment variables. This keeps your code clean, allows for easy deployment across different environments (development, staging, production), and avoids hardcoding sensitive information.

2.3. Error Handling and Resilience

Data collection often involves interacting with external systems that can be unreliable. Your collector must:

  • Handle Network Errors: Implement retries with exponential backoff for transient network issues.
  • Validate Data: Ensure retrieved data is in the expected format before processing.
  • Log Errors Effectively: Provide sufficient context for debugging.
  • Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.

2.4. Observability (Logging, Tracing, Metrics)

For production systems, it's crucial to understand what your collector is doing.

  • Logging: Use structured logging (slog in Go) to record events, errors, and progress.
  • Tracing: Integrate with a distributed tracing system (like OpenTelemetry, as used by opendatahub-go-sdk) to visualize the flow of requests and identify bottlenecks across microservices.
  • Metrics: Expose metrics (e.g., number of successful polls, failed polls, data volume, processing time) for monitoring and alerting.

2.5. Scheduling and Triggering

Determine how your collector will be activated:

  • Polling: Regularly checking a source for new data (e.g., using cron jobs, time.Ticker).
  • Event-Driven: Reacting to events (e.g., a webhook notification, a message on a specific queue).

The S3 poller uses a cron scheduler, which is a common pattern for polling.

3. Anatomy of a Go Data Collector: S3 Poller Example

Let's dissect the provided S3 poller example to understand its components and how it embodies the principles above.

3.1. Project Structure

A typical Go project for a data collector might look like this:


.
├── src/
│ ├── main.go \# Main application logic
│ └── go.mod \# Go modules file
│ └── go.sum \# Go modules checksums
│ └── go.work \# Go workspace file (if part of a monorepo)
│ └── go.work.sum \# Go workspace checksums
├── .env \# Local environment variables for development
├── Dockerfile \# Containerization instructions (often in infrastructure/docker)
├── docker-compose.yml \# Local development setup (often in collector's root)
└── helm/ \# Helm charts for Kubernetes deployment
└── your-collector-name.yaml

3.2. Environment Variables and Configuration Management

The env struct and the .env file demonstrate how environment variables are loaded and managed.

// ... imports ...

var env struct {
dc.Env // Embeds common environment variables from the SDK (MQ_URI, LOGLEVEL, etc.)
CRON string `env:"CRON"` // Specific cron schedule for this collector
RAW_BINARY bool `env:"RAW_BINARY"` // Flag to interpret raw data as binary

AWS_REGION string `env:"AWS_REGION"`
AWS_S3_FILE_NAME string `env:"AWS_S3_FILE_NAME"`
AWS_S3_BUCKET_NAME string `env:"AWS_S3_BUCKET_NAME"`
AWS_ACCESS_KEY_ID string `env:"AWS_ACCESS_KEY_ID"`
AWS_ACCESS_SECRET_KEY string `env:"AWS_ACCESS_SECRET_KEY"`
}

func main() {
// Initializes messaging, logging, and telemetry based on env vars
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")

// ... rest of the main function ...
}

Critical Points:

  • dc.Env Embedding: The dc.Env struct from the SDK automatically handles common environment variables like MQ_URI, LOGLEVEL, PROVIDER, etc. By embedding it, your custom env struct inherits these fields, and ms.InitWithEnv can populate them.
  • Separation of Concerns: .env is for local development, and Helm values (.test.yaml) are for deployment-specific configurations, including referencing Kubernetes secrets (envSecretRef) for sensitive data like MQ_URI.
  • ms.InitWithEnv: This is a crucial SDK function that reads the environment variables, initializes the structured logger (slog), sets up the messaging client (RabbitMQ), and configures telemetry. It's the entry point for the SDK's core functionalities.

3.3. SDK Integration (opendatahub-go-sdk)

The opendatahub-go-sdk is central to simplifying collector development.

// ... imports ...

func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")

// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()

// ... AWS config and client setup ...

// 2. Create a new data collector instance
// dc.EmptyData indicates that the input channel doesn't carry specific data,
// as the cron job just triggers a collection run.
collector := dc.NewDc[dc.EmptyData](context.Background(), env.Env)

// 3. Schedule data collection runs
c := cron.New(cron.WithSeconds())
c.AddFunc(env.CRON, func() {
// When the cron job triggers, send an empty input signal to the collector's channel.
// This tells the collector to execute its data fetching logic.
collector.GetInputChannel() <- dc.NewInput[dc.EmptyData](context.Background(), nil)
})

slog.Info("Setup complete. Starting cron scheduler")
go func() {
c.Run() // Start the cron scheduler in a goroutine
}()

// 4. Start the collector's main processing loop
// The provided function (anonymous func) is the core data collection logic.
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// This function is executed each time an input is received on the collector's channel.
// It's responsible for fetching raw data and returning it in the SDK's RawAny format.

// ... S3 data fetching logic ...

var raw any
if env.RAW_BINARY {
raw = body // Store raw bytes
} else {
raw = string(body) // Store as string
}

// Return the collected raw data in the SDK's standardized format
return &rdb.RawAny{
Provider: env.PROVIDER, // Unique identifier for the data source
Timestamp: time.Now(), // Timestamp of data collection
Rawdata: raw, // The actual raw data
}, nil
})
// 5. Handle fatal errors from the collector's main loop
ms.FailOnError(context.Background(), err, err.Error())
}

Key SDK Components:

  • ms.InitWithEnv(ctx, "", &env): This function is the SDK's entry point for initialization. It sets up:
    • Structured Logging (slog): Configures logging based on LOGLEVEL.
    • Messaging: Initializes the RabbitMQ client using MQ_URI, MQ_CLIENT, MQ_EXCHANGE. The SDK handles publishing raw data to the message queue.
    • Telemetry: Sets up OpenTelemetry for tracing and metrics, using TELEMETRY_TRACE_GRPC_ENDPOINT (seen in Helm config).
  • tel.FlushOnPanic(): Ensures that any buffered telemetry data (traces, metrics) is sent to the configured endpoint before the application crashes due to a panic. This is crucial for debugging production issues.
  • dc.NewDc[dc.EmptyData](ctx, env.Env): Creates a new data collector instance. The [dc.EmptyData] type parameter indicates that the input to the collector's processing function doesn't carry specific data; it's simply a trigger.
  • collector.GetInputChannel(): Returns a channel that you can send signals to. Each signal (in this case, dc.NewInput[dc.EmptyData](context.Background(), nil)) triggers the collector.Start function's callback. This decouples the scheduling mechanism from the core collection logic.
  • collector.Start(ctx, func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error)): This is the main processing loop of the collector.
    • It takes a context.Context and a callback function.
    • The callback function is where your actual data fetching logic resides.
    • It's expected to return an *rdb.RawAny object (the standardized raw data format) or an error.
    • The SDK then takes this RawAny object and publishes it to the configured message queue.
  • rdb.RawAny: This struct defines the standard format for raw data that the SDK expects. It includes Provider (a unique identifier for the data source), Timestamp (when the data was collected), and Rawdata (the actual fetched content).
  • ms.FailOnError(ctx, err, msg): A utility function from the SDK that logs a fatal error and exits the application if err is not nil. This is used for unrecoverable errors during setup or the main loop.

3.4. Data Collection Logic (S3 Interaction)

The core logic of fetching data from S3 is encapsulated within the anonymous function passed to collector.Start.

info

Each collector implements its own logic, the below is only an example

// ...
// Create a custom AWS configuration
customConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(env.AWS_REGION), // Configure AWS region from environment
config.WithCredentialsProvider( // Provide static credentials from environment
credentials.NewStaticCredentialsProvider(env.AWS_ACCESS_KEY_ID, env.AWS_ACCESS_SECRET_KEY, ""),
),
)
ms.FailOnError(context.Background(), err, "failed to create AWS config") // Fatal error if config fails

// Create an S3 client using the custom configuration
s3Client := s3.NewFromConfig(customConfig)

// ...
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// Get the object from S3
output, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(env.AWS_S3_BUCKET_NAME), // S3 bucket name from environment
Key: aws.String(env.AWS_S3_FILE_NAME), // S3 file key (name) from environment
})
if err != nil {
// Log the error with context (bucket, file) and return nil, err to the SDK
slog.Error("error while getting s3 object:", "err", err, "bucket", env.AWS_S3_BUCKET_NAME, "file", env.AWS_S3_FILE_NAME)
return nil, err
}

defer output.Body.Close() // Ensure the S3 response body is closed
body, err := io.ReadAll(output.Body) // Read the entire S3 object body
if err != nil {
slog.Error("error reading response body:", "err", err)
return nil, err
}

var raw any // Declare a variable to hold the raw data
if env.RAW_BINARY {
raw = body // If RAW_BINARY is true, store the raw bytes
} else {
raw = string(body) // Otherwise, convert bytes to string
}

// Construct the standardized RawAny object
return &rdb.RawAny{
Provider: env.PROVIDER, // Identifier for this data source
Timestamp: time.Now(), // Timestamp of when the data was collected
Rawdata: raw, // The actual raw data (string or []byte)
}, nil
})
ms.FailOnError(context.Background(), err, err.Error()) // Fatal error if collector.Start fails
}

Critical Points:

  • AWS SDK Initialization: The AWS SDK is initialized with the AWS_REGION, AWS_ACCESS_KEY_ID, and AWS_ACCESS_SECRET_KEY from environment variables. This is a standard pattern for configuring cloud service clients.
  • Error Handling: Crucially, errors from s3Client.GetObject and io.ReadAll are logged with slog.Error (providing context like bucket and file names) and then returned. The collector.Start function will then handle these errors, potentially preventing the message from being acknowledged and allowing for retries or dead-lettering by the messaging system.
  • rdb.RawAny Structure: The returned *rdb.RawAny object is the standardized output of any data collector using this SDK. It's the payload that will be sent to the message queue. The Provider field is particularly important as it uniquely identifies the source of this raw data, allowing transformers to filter or process it accordingly.

4. Containerization with Docker

Docker is the recommended way to package and deploy your data collector. The provided Dockerfile uses a multi-stage build to create efficient and secure images.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

FROM golang:1.23.7-bookworm AS base

EXPOSE 8080 # Expose port 8080 (though this collector doesn't run a web server, it's a common practice)

FROM base AS build-env
WORKDIR /app
COPY src/ . # Copy source code to the working directory
RUN go mod download # Download Go module dependencies
RUN CGO_ENABLED=0 GOOS=linux go build -o main # Build the Go application:
# CGO_ENABLED=0: Disables CGo, producing a statically linked binary
# GOOS=linux: Compiles for Linux operating system
# -o main: Output executable named 'main'

# BUILD published image (production-ready, minimal image)
FROM alpine:3 AS build
WORKDIR /app
COPY --from=build-env /app/main . # Copy only the compiled binary from the build-env stage
ENTRYPOINT [ "./main"] # Set the entrypoint for the container

# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base AS dev
WORKDIR /code
CMD ["go", "run", "."] # Run the Go application directly from source for development

# TESTS
FROM base AS test
WORKDIR /code
CMD ["go", "test", "."] # Run Go tests

Critical Points:

  • Multi-Stage Build: This is a best practice for Go applications.
    • base: Defines the base Go environment.
    • build-env: Used only for building the executable. It includes source code and dependencies.
    • build: The final, slim production image. It only copies the compiled binary from build-env, resulting in a much smaller image size (e.g., using alpine:3 instead of the full golang image). This reduces attack surface and download times.
    • dev and test: Separate stages for local development and running tests, allowing for different build/run environments without polluting the production image.
  • CGO_ENABLED=0 GOOS=linux: This is crucial for Go applications in Docker. It ensures the Go binary is statically linked and compiled for Linux, making it highly portable and suitable for minimal base images like Alpine.

4.1. Local Orchestration with Docker Compose

docker-compose.yml simplifies running and testing your collector and its dependencies (like RabbitMQ) locally.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

services:
app: # Your data collector service
depends_on:
rabbitmq: # Ensure rabbitmq is healthy before starting the app
condition: service_healthy
build:
dockerfile: infrastructure/docker/Dockerfile # Path to your Dockerfile
context: . # Build context is the current directory
target: dev # Use the 'dev' stage from the Dockerfile
env_file:
- .env # Load environment variables from the .env file
volumes:
- ./src:/code # Mount your source code for hot-reloading in dev mode
- pkg:/go/pkg/mod # Cache Go modules for faster builds
working_dir: /code # Set the working directory inside the container

rabbitmq: # RabbitMQ service (dependency)
extends: # Reuse a common RabbitMQ definition
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false # Do not attach to RabbitMQ's logs by default (can be noisy)
volumes:
pkg: # Define the named volume for Go module caching
warning

When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq service

docker compose up app

5. Deployment with Helm

Helm is a package manager for Kubernetes, used to define, install, and upgrade complex Kubernetes applications. The helm/meteorology-bz-forecast.test.yaml file is a values file that overrides default settings in a Helm chart for a specific deployment environment (e.g., a test environment).

image:
repository: ghcr.io/noi-techpark/opendatahub-collectors/dc-s3-poller # Docker image repository
pullPolicy: IfNotPresent # Only pull image if not already present on the node
tag: "0.0.1" # Specific image tag/version to deploy

env: # Environment variables passed directly to the container
MQ_CLIENT: dc-meteorology-bz-forecast
PROVIDER: s3-poller/meteorology-bz-forecast

CRON: "0 0 0/1 * * *" # Production cron schedule (hourly)

AWS_REGION: "eu-west-1"
AWS_S3_FILE_NAME: "SMOS_MCPL-WX_EXP_SIAG.JSON"
AWS_S3_BUCKET_NAME: dc-meteorology-province-forecast

SERVICE_NAME: dc-meteorology-bz-forecast # Service name for observability/discovery
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317 # OpenTelemetry collector endpoint

envSecretRef: # Reference Kubernetes secrets for sensitive environment variables
- name: MQ_URI # Name of the environment variable
secret: rabbitmq-svcbind # Name of the Kubernetes secret
key: uri # Key within the secret that holds the value

Critical Points:

  • image: Defines which Docker image to deploy.
    • repository: The path to your Docker image (e.g., in GitHub Container Registry).
    • pullPolicy: IfNotPresent is common for test/dev, Always for production to ensure the latest image is used.
    • tag: Crucial for versioning and reproducibility. Always use a specific tag, never latest in production.
  • env: Directly sets environment variables within the Kubernetes pod. These values typically override defaults defined in the main Helm chart. Notice how CRON is set to an hourly schedule here, different from the local development .env file.
  • envSecretRef: This is paramount for security in production environments. Instead of hardcoding sensitive values like MQ_URI directly in the Helm values (which would be stored in Git), envSecretRef tells Kubernetes to fetch the value for MQ_URI from a Kubernetes Secret named rabbitmq-svcbind under the key uri. This ensures sensitive data is not exposed in configuration files.
  • SERVICE_NAME and TELEMETRY_TRACE_GRPC_ENDPOINT: These are used for integrating with the Kubernetes cluster's observability stack. SERVICE_NAME helps identify the service in monitoring tools, and TELEMETRY_TRACE_GRPC_ENDPOINT points to the OpenTelemetry collector or agent within the cluster, enabling distributed tracing.

6. Local Development Workflow

To get your S3 poller data collector running locally:

  1. Clone Repositories:
    • opendatahub-collectors (contains your collector's source)
    • infrastructure-v2 (contains the shared docker-compose.rabbitmq.yml and overall infrastructure compose files)
  2. Navigate to Collector Directory: cd opendatahub-collectors/collectors/s3-poller (or your specific collector's path).
  3. Create .env: Copy the provided .env content into a file named .env in this directory. Fill in your AWS credentials (AWS_ACCESS_KEY_ID, AWS_ACCESS_SECRET_KEY).
  4. Start Infrastructure: From the infrastructure-v2 directory, run the base and timeseries compose files:
    cd ../../infrastructure-v2 # Adjust path if needed
    docker compose -f docker-compose.yml up -d # Run in detached mode
    docker compose -f docker-compose.timeseries.yml up -d
  5. Start Collector: From your collector's directory (opendatahub-collectors/collectors/s3-poller), run its docker-compose.yml:
    cd opendatahub-collectors/collectors/s3-poller # Adjust path if needed
    docker compose up app --build # Build and run your collector
    The app service in your collector's docker-compose.yml will wait for rabbitmq to be healthy before starting.

Testing the Data Flow

  • Check Collector Logs: Observe the logs of your app container. You should see messages like "Starting data collector...", "Polling S3 bucket...", and "Published dummy S3 data to message queue." (if using the dummy example).
  • RabbitMQ Management: Access http://localhost:15672 (guest/guest) in your browser. Navigate to "Queues" and check the ingress queue (or whatever MQ_EXCHANGE you configured). You should see messages accumulating there.
  • MongoDB: Connect to mongodb://localhost:27017/?directConnection=true using a tool like MongoDB Compass. You can inspect the raw data stored by the SDK before it's picked up by a transformer.

This setup allows you to develop and test your data collector in an environment that closely mirrors the production Kubernetes cluster, ensuring smooth integration with the Open Data Hub ecosystem.