Skip to main content
TESTING

Developing a Data Collector from Scratch

Developing a robust and reliable data collector is a critical step in any data integration pipeline. This guide will walk you through the essential principles, architectural patterns, and practical implementation details for building a data collector in Go, using the provided S3 poller example as a reference.

1. Understanding the Role of a Data Collector

A data collector is a specialized microservice responsible for:

  • Source Interaction: Connecting to an external data source (e.g., an API, a database, a file system like S3, an FTP server).
  • Data Retrieval: Fetching raw data from the source.
  • Raw Data Publication: Publishing the retrieved raw data to a message queue, typically for subsequent processing by a transformer.
  • Scheduling/Triggering: Operating on a schedule (e.g., cron, polling interval) or in response to external events.

The primary goal of a data collector is to reliably acquire data and make it available in a standardized raw format for the next stage of the pipeline.

Typical Pull-Data-Collector flow (the collector actively gets data from the provider)

pull collector flow

Typical Push-Data-Collector flow (the collector waits for data from the provider)

push collector flow

2. Core Principles of Data Collector Development

Adhering to these principles ensures your data collector is maintainable, scalable, and resilient:

2.1. Modularity and Single Responsibility

Each data collector should focus on a single data source or a specific data acquisition method. Avoid combining multiple disparate data sources into one collector. This promotes:

  • Clearer Logic: Easier to understand, debug, and test.
  • Independent Deployment: Changes to one source don't affect others.
  • Scalability: You can scale collectors for different sources independently.

2.2. Configuration Management

Externalize all configurable parameters (API keys, endpoints, polling intervals, bucket names) using environment variables. This keeps your code clean, allows for easy deployment across different environments (development, staging, production), and avoids hardcoding sensitive information.

2.3. Error Handling and Resilience

Data collection often involves interacting with external systems that can be unreliable. Your collector must:

  • Handle Network Errors: Implement retries with exponential backoff for transient network issues.
  • Validate Data: Ensure retrieved data is in the expected format before processing.
  • Log Errors Effectively: Provide sufficient context for debugging.
  • Fail Early: Let the application crash if something goes wrong, the test and production environment handles restarts.

2.4. Observability (Logging, Tracing, Metrics)

For production systems, it's crucial to understand what your collector is doing.

  • Logging: Use structured logging (slog in Go) to record events, errors, and progress.
  • Tracing: Integrate with a distributed tracing system (like OpenTelemetry, as used by opendatahub-go-sdk) to visualize the flow of requests and identify bottlenecks across microservices.
  • Metrics: Expose metrics (e.g., number of successful polls, failed polls, data volume, processing time) for monitoring and alerting.

2.5. Scheduling and Triggering

Determine how your collector will be activated:

  • Polling: Regularly checking a source for new data (e.g., using cron jobs, time.Ticker).
  • Event-Driven: Reacting to events (e.g., a webhook notification, a message on a specific queue).

The S3 poller uses a cron scheduler, which is a common pattern for polling.

3. Anatomy of a Go Data Collector: S3 Poller Example

Let's dissect the provided S3 poller example to understand its components and how it embodies the principles above.

3.1. Project Structure

A typical Go project for a data collector might look like this:


.
├── src/
│ ├── main.go \# Main application logic
│ └── go.mod \# Go modules file
│ └── go.sum \# Go modules checksums
│ └── go.work \# Go workspace file (if part of a monorepo)
│ └── go.work.sum \# Go workspace checksums
├── .env \# Local environment variables for development
├── Dockerfile \# Containerization instructions (often in infrastructure/docker)
├── docker-compose.yml \# Local development setup (often in collector's root)
└── helm/ \# Helm charts for Kubernetes deployment
└── your-collector-name.yaml

3.2. Environment Variables and Configuration Management

By design we require all configuration to be done via env variables.
This simplifies the develop cycle and deployment allowing us to use .env files for testing and Helm values.yaml for deployment.

3.3. SDK Integration (opendatahub-go-sdk)

The opendatahub-go-sdk is central to simplifying collector development.

// ... imports ...

func main() {
// 1. Initialize SDK components (messaging, logging, telemetry)
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data collector...")

// Ensure telemetry traces are flushed on application panic
defer tel.FlushOnPanic()

// ... AWS config and client setup ...

// 2. Create a new data collector instance
// dc.EmptyData indicates that the input channel doesn't carry specific data,
// as the cron job just triggers a collection run.
collector := dc.NewDc[dc.EmptyData](context.Background(), env.Env)

// 3. Schedule data collection runs
c := cron.New(cron.WithSeconds())
c.AddFunc(env.CRON, func() {
// When the cron job triggers, send an empty input signal to the collector's channel.
// This tells the collector to execute its data fetching logic.
collector.GetInputChannel() <- dc.NewInput[dc.EmptyData](context.Background(), nil)
})

slog.Info("Setup complete. Starting cron scheduler")
go func() {
c.Run() // Start the cron scheduler in a goroutine
}()

// 4. Start the collector's main processing loop
// The provided function (anonymous func) is the core data collection logic.
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// This function is executed each time an input is received on the collector's channel.
// It's responsible for fetching raw data and returning it in the SDK's RawAny format.

// ... S3 data fetching logic ...

var raw any
if env.RAW_BINARY {
raw = body // Store raw bytes
} else {
raw = string(body) // Store as string
}

// Return the collected raw data in the SDK's standardized format
return &rdb.RawAny{
Provider: env.PROVIDER, // Unique identifier for the data source
Timestamp: time.Now(), // Timestamp of data collection
Rawdata: raw, // The actual raw data
}, nil
})
// 5. Handle fatal errors from the collector's main loop
ms.FailOnError(context.Background(), err, err.Error())
}

Key SDK Components:

  • ms.InitWithEnv(ctx, "", &env): This function is the SDK's entry point for initialization. It sets up:
    • Structured Logging (slog): Configures logging based on LOGLEVEL.
    • Messaging: Initializes the RabbitMQ client using MQ_URI, MQ_CLIENT, MQ_EXCHANGE. The SDK handles publishing raw data to the message queue.
    • Telemetry: Sets up OpenTelemetry for tracing and metrics, using TELEMETRY_TRACE_GRPC_ENDPOINT (seen in Helm config).
  • tel.FlushOnPanic(): Ensures that any buffered telemetry data (traces, metrics) is sent to the configured endpoint before the application crashes due to a panic. This is crucial for debugging production issues.
  • dc.NewDc[dc.EmptyData](ctx, env.Env): Creates a new data collector instance. The [dc.EmptyData] type parameter indicates that the input to the collector's processing function doesn't carry specific data; it's simply a trigger.
  • collector.GetInputChannel(): Returns a channel that you can send signals to. Each signal (in this case, dc.NewInput[dc.EmptyData](context.Background(), nil)) triggers the collector.Start function's callback. This decouples the scheduling mechanism from the core collection logic.
  • collector.Start(ctx, func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error)): This is the main processing loop of the collector.
    • It takes a context.Context and a callback function.
    • The callback function is where your actual data fetching logic resides.
    • It's expected to return an *rdb.RawAny object (the standardized raw data format) or an error.
    • The SDK then takes this RawAny object and publishes it to the configured message queue.
  • rdb.RawAny: This struct defines the standard format for raw data that the SDK expects. It includes Provider (a unique identifier for the data source), Timestamp (when the data was collected), and Rawdata (the actual fetched content).
  • ms.FailOnError(ctx, err, msg): A utility function from the SDK that logs a fatal error and exits the application if err is not nil. This is used for unrecoverable errors during setup or the main loop.

SDK Env Variables

When using the official SDK, there are some special env variables used to configure various aspects of the collector

VariableDescriptionAllowed ValuesDefault
PROVIDERString identifying the data provider[path1]/[path2]/...(No default set)
MQ_URIConnection URI to connect to RabbitMQ-(No default set)
MQ_EXCHANGEExchange name where to push messages-ingress
MQ_CLIENTClient name to identify the connection in RabbitMQ-(No default set)
LOG_LEVELSets the logging severity level.DEBUG, INFO, WARN, ERRORINFO
SERVICE_NAMEThe name of the service.-gotel
SERVICE_VERSIONThe version of the service.-0.0.1
TELEMETRY_ENABLEDEnables or disables telemetry.true, falsetrue
TELEMETRY_TRACE_ENABLEDEnables or disables trace telemetry.true, falsetrue
TELEMETRY_TRACE_BATCH_SIZEThe maximum number of traces to be batched.-10
TELEMETRY_TRACE_BATCH_TIMEOUT_SECThe timeout for a trace batch in seconds.-5
TELEMETRY_TRACE_GRPC_ENDPOINTThe gRPC endpoint for trace export.-localhost:4317
TELEMETRY_TRACE_TLS_ENABLEDEnables or disables TLS for trace export.true, falsefalse
TELEMETRY_TRACE_TLS_CERTPath to the TLS certificate file for trace export.-""
TELEMETRY_METRICS_ENABLEDEnables or disables metrics telemetry.true, falsefalse
TELEMETRY_METRICS_TIMEOUT_SECThe timeout for metrics export in seconds.-30
TELEMETRY_METRICS_INTERVAL_SECThe interval between metrics exports in seconds.-60
TELEMETRY_METRICS_GRPC_ENDPOINTThe gRPC endpoint for metrics export.-localhost:4317
TELEMETRY_METRICS_TLS_ENABLEDEnables or disables TLS for metrics export.true, falsefalse
TELEMETRY_METRICS_TLS_CERTPath to the TLS certificate file for metrics export.-""
TELEMETRY_METRICS_EXPORT_MODEThe export mode for metrics.otlp, prometheus (implied by context)otlp
TELEMETRY_METRICS_EXPORT_PORTThe port for the metrics exporter.-2112

3.4. Data Collection Logic (S3 Interaction)

The core logic of fetching data from S3 is encapsulated within the anonymous function passed to collector.Start.

info

Each collector implements its own logic, the below is only an example

// ...
// Create a custom AWS configuration
customConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(env.AWS_REGION), // Configure AWS region from environment
config.WithCredentialsProvider( // Provide static credentials from environment
credentials.NewStaticCredentialsProvider(env.AWS_ACCESS_KEY_ID, env.AWS_ACCESS_SECRET_KEY, ""),
),
)
ms.FailOnError(context.Background(), err, "failed to create AWS config") // Fatal error if config fails

// Create an S3 client using the custom configuration
s3Client := s3.NewFromConfig(customConfig)

// ...
err = collector.Start(context.Background(), func(ctx context.Context, a dc.EmptyData) (*rdb.RawAny, error) {
// Get the object from S3
output, err := s3Client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(env.AWS_S3_BUCKET_NAME), // S3 bucket name from environment
Key: aws.String(env.AWS_S3_FILE_NAME), // S3 file key (name) from environment
})
if err != nil {
// Log the error with context (bucket, file) and return nil, err to the SDK
slog.Error("error while getting s3 object:", "err", err, "bucket", env.AWS_S3_BUCKET_NAME, "file", env.AWS_S3_FILE_NAME)
return nil, err
}

defer output.Body.Close() // Ensure the S3 response body is closed
body, err := io.ReadAll(output.Body) // Read the entire S3 object body
if err != nil {
slog.Error("error reading response body:", "err", err)
return nil, err
}

var raw any // Declare a variable to hold the raw data
if env.RAW_BINARY {
raw = body // If RAW_BINARY is true, store the raw bytes
} else {
raw = string(body) // Otherwise, convert bytes to string
}

// Construct the standardized RawAny object
return &rdb.RawAny{
Provider: env.PROVIDER, // Identifier for this data source
Timestamp: time.Now(), // Timestamp of when the data was collected
Rawdata: raw, // The actual raw data (string or []byte)
}, nil
})
ms.FailOnError(context.Background(), err, err.Error()) // Fatal error if collector.Start fails
}

Critical Points:

  • AWS SDK Initialization: The AWS SDK is initialized with the AWS_REGION, AWS_ACCESS_KEY_ID, and AWS_ACCESS_SECRET_KEY from environment variables. This is a standard pattern for configuring cloud service clients.
  • Error Handling: Crucially, errors from s3Client.GetObject and io.ReadAll are logged with slog.Error (providing context like bucket and file names) and then returned. The collector.Start function will then handle these errors, potentially preventing the message from being acknowledged and allowing for retries or dead-lettering by the messaging system.
  • rdb.RawAny Structure: The returned *rdb.RawAny object is the standardized output of any data collector using this SDK. It's the payload that will be sent to the message queue. The Provider field is particularly important as it uniquely identifies the source of this raw data, allowing transformers to filter or process it accordingly.

4. Containerization with Docker

Docker is the recommended way to package and deploy your data collector. The provided Dockerfile uses a multi-stage build to create efficient and secure images.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

FROM golang:1.23.7-bookworm AS base

EXPOSE 8080 # Expose port 8080 (though this collector doesn't run a web server, it's a common practice)

FROM base AS build-env
WORKDIR /app
COPY src/ . # Copy source code to the working directory
RUN go mod download # Download Go module dependencies
RUN CGO_ENABLED=0 GOOS=linux go build -o main # Build the Go application:
# CGO_ENABLED=0: Disables CGo, producing a statically linked binary
# GOOS=linux: Compiles for Linux operating system
# -o main: Output executable named 'main'

# BUILD published image (production-ready, minimal image)
FROM alpine:3 AS build
WORKDIR /app
COPY --from=build-env /app/main . # Copy only the compiled binary from the build-env stage
ENTRYPOINT [ "./main"] # Set the entrypoint for the container

# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base AS dev
WORKDIR /code
CMD ["go", "run", "."] # Run the Go application directly from source for development

# TESTS
FROM base AS test
WORKDIR /code
CMD ["go", "test", "."] # Run Go tests

Critical Points:

  • Multi-Stage Build: This is a best practice for Go applications.
    • base: Defines the base Go environment.
    • build-env: Used only for building the executable. It includes source code and dependencies.
    • build: The final, slim production image. It only copies the compiled binary from build-env, resulting in a much smaller image size (e.g., using alpine:3 instead of the full golang image). This reduces attack surface and download times.
    • dev and test: Separate stages for local development and running tests, allowing for different build/run environments without polluting the production image.
  • CGO_ENABLED=0 GOOS=linux: This is crucial for Go applications in Docker. It ensures the Go binary is statically linked and compiled for Linux, making it highly portable and suitable for minimal base images like Alpine.

4.1. Local Orchestration with Docker Compose

docker-compose.yml simplifies running and testing your collector and its dependencies (like RabbitMQ) locally.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

x-app-common:
&app-common
build:
dockerfile: infrastructure/docker/Dockerfile
context: .
target: dev
env_file:
- .env
volumes:
- ./src:/code
- ./infrastructure:/code/infrastructure
- pkg:/go/pkg/mod
working_dir: /code

services:

app-bdp:
<<: *app-common
profiles:
- bdp
networks:
- ingestion

app:
<<: *app-common
depends_on:
rabbitmq:
condition: service_healthy
profiles:
- dev

rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false

profiles:
- dev

volumes:
pkg:

networks:
ingestion:
external: true

warning

When developing you must careful to start the collector with rabbitmq service

docker compose up --profile dev
warning

When testing the complete pipeline (comprehensive of the transformer) you need to start the full Open Data Hub Core and you must careful to start the collector without rabbitmq service

docker compose up --profile bdp

5. Deployment with Helm

Helm is a package manager for Kubernetes, used to define, install, and upgrade complex Kubernetes applications. The helm/meteorology-bz-forecast.test.yaml file is a values file that overrides default settings in a Helm chart for a specific deployment environment (e.g., a test environment).

image:
repository: ghcr.io/noi-techpark/opendatahub-collectors/dc-s3-poller # Docker image repository
pullPolicy: IfNotPresent # Only pull image if not already present on the node
tag: "0.0.1" # Specific image tag/version to deploy

env: # Environment variables passed directly to the container
MQ_CLIENT: dc-meteorology-bz-forecast
PROVIDER: s3-poller/meteorology-bz-forecast

CRON: "0 0 0/1 * * *" # Production cron schedule (hourly)

AWS_REGION: "eu-west-1"
AWS_S3_FILE_NAME: "SMOS_MCPL-WX_EXP_SIAG.JSON"
AWS_S3_BUCKET_NAME: dc-meteorology-province-forecast

SERVICE_NAME: dc-meteorology-bz-forecast # Service name for observability/discovery
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317 # OpenTelemetry collector endpoint

envSecretRef: # Reference Kubernetes secrets for sensitive environment variables
- name: MQ_URI # Name of the environment variable
secret: rabbitmq-svcbind # Name of the Kubernetes secret
key: uri # Key within the secret that holds the value

Critical Points:

  • image: Defines which Docker image to deploy.
    • repository: The path to your Docker image (e.g., in GitHub Container Registry).
    • pullPolicy: IfNotPresent is common for test/dev, Always for production to ensure the latest image is used.
    • tag: Crucial for versioning and reproducibility. Always use a specific tag, never latest in production.
  • env: Directly sets environment variables within the Kubernetes pod. These values typically override defaults defined in the main Helm chart. Notice how CRON is set to an hourly schedule here, different from the local development .env file.
  • envSecretRef: This is paramount for security in production environments. Instead of hardcoding sensitive values like MQ_URI directly in the Helm values (which would be stored in Git), envSecretRef tells Kubernetes to fetch the value for MQ_URI from a Kubernetes Secret named rabbitmq-svcbind under the key uri. This ensures sensitive data is not exposed in configuration files.
  • SERVICE_NAME and TELEMETRY_TRACE_GRPC_ENDPOINT: These are used for integrating with the Kubernetes cluster's observability stack. SERVICE_NAME helps identify the service in monitoring tools, and TELEMETRY_TRACE_GRPC_ENDPOINT points to the OpenTelemetry collector or agent within the cluster, enabling distributed tracing.

6. Local Development Workflow

To get your S3 poller data collector running locally:

  1. Clone Repositories:
    • opendatahub-collectors (contains your collector's source)
    • infrastructure-v2 (contains the shared docker-compose.rabbitmq.yml and overall infrastructure compose files)
  2. Navigate to Collector Directory: cd opendatahub-collectors/collectors/s3-poller (or your specific collector's path).
  3. Create .env: Copy the provided .env content into a file named .env in this directory. Fill in your AWS credentials (AWS_ACCESS_KEY_ID, AWS_ACCESS_SECRET_KEY).
  4. Start Infrastructure: From the infrastructure-v2 directory, run the base and timeseries compose files:
    cd ../../infrastructure-v2 # Adjust path if needed
    docker compose -f docker-compose.yml up -d # Run in detached mode
    docker compose -f docker-compose.timeseries.yml up -d
  5. Start Collector: From your collector's directory (opendatahub-collectors/collectors/s3-poller), run its docker-compose.yml:
    cd opendatahub-collectors/collectors/s3-poller # Adjust path if needed
    docker compose up --profile bdp --build # Build and run your collector

Testing the Data Flow

  • Check Collector Logs: Observe the logs of your app container. You should see messages like "Starting data collector...", "Polling S3 bucket...", and "Published dummy S3 data to message queue." (if using the dummy example).
  • RabbitMQ Management: Access http://localhost:15672 (guest/guest) in your browser. Navigate to "Queues" and check the ingress queue (or whatever MQ_EXCHANGE you configured). You should see messages accumulating there.
  • MongoDB: Connect to mongodb://localhost:27017/?directConnection=true using a tool like MongoDB Compass. You can inspect the raw data stored by the SDK before it's picked up by a transformer.

This setup allows you to develop and test your data collector in an environment that closely mirrors the production Kubernetes cluster, ensuring smooth integration with the Open Data Hub ecosystem.