Contributing a New Datasource β
This guide walks you through implementing a new datasource adapter for Lynq. The operator uses an adapter pattern to support multiple data sources, making it easy to add support for new databases or data sources.
Overview β
Lynq uses a pluggable datasource architecture:
Key Benefits:
- β Clean interface - Only 2 methods to implement
- β Reference implementation - MySQL adapter as example
- β Type-safe - Strongly typed configuration
- β Testable - Easy to mock and test
- β Isolated - No changes to core controller logic
Prerequisites β
Before starting:
- Go 1.22+ installed
- Familiarity with:
- Go interfaces
- SQL or your target datasource
- Kubernetes operator patterns (helpful)
- Development environment setup (see Development Guide)
Step-by-Step Guide β
Step 1: Understand the Interface β
The Datasource interface is defined in internal/datasource/interface.go:
// Datasource defines the interface that all datasource adapters must implement
type Datasource interface {
// QueryNodes retrieves active node rows from the datasource
QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error)
// Close closes the datasource connection
io.Closer
}You need to implement:
QueryNodes()- Query node data from your datasourceClose()- Clean up resources (connections, files, etc.)
Step 2: Study the MySQL Reference Implementation β
The MySQL adapter (internal/datasource/mysql.go) is a complete reference implementation. Key sections:
// Adapter struct - holds connection
type MySQLAdapter struct {
db *sql.DB
}
// Constructor - establishes connection
func NewMySQLAdapter(config Config) (*MySQLAdapter, error) {
// 1. Build connection string
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", ...)
// 2. Open connection
db, err := sql.Open("mysql", dsn)
// 3. Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// 4. Test connection
if err := db.PingContext(ctx); err != nil {
return nil, err
}
return &MySQLAdapter{db: db}, nil
}
// QueryNodes - query and map data
func (a *MySQLAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// 1. Build query
query := fmt.Sprintf("SELECT %s FROM %s", columns, table)
// 2. Execute query
rows, err := a.db.QueryContext(ctx, query)
// 3. Scan results
var nodes []NodeRow
for rows.Next() {
// Map columns to NodeRow
// Filter active nodes
}
return nodes, nil
}
// Close - cleanup
func (a *MySQLAdapter) Close() error {
if a.db != nil {
return a.db.Close()
}
return nil
}Step 3: Create Your Adapter File β
Create a new file in internal/datasource/ for your adapter:
# Example: PostgreSQL
touch internal/datasource/postgres.go
# Example: MongoDB
touch internal/datasource/mongodb.go
# Example: REST API
touch internal/datasource/rest.goStep 4: Implement the Adapter β
Use this template:
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
...
*/
package datasource
import (
"context"
"fmt"
// Import your datasource driver
// _ "github.com/lib/pq" // PostgreSQL
)
// YourAdapter implements the Datasource interface for [YourDatasource]
type YourAdapter struct {
// Connection handle (e.g., *sql.DB, *mongo.Client, *http.Client)
conn interface{} // Replace with actual type
}
// NewYourAdapter creates a new [YourDatasource] datasource adapter
func NewYourAdapter(config Config) (*YourAdapter, error) {
// 1. Build connection string/config
// 2. Establish connection
// 3. Configure connection pool/settings
// 4. Test connection
// 5. Return adapter
return &YourAdapter{conn: conn}, nil
}
// QueryNodes queries active nodes from [YourDatasource]
func (a *YourAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// 1. Build query/request
// Use config.Table for table/collection name
// Use config.ValueMappings for required columns
// Use config.ExtraMappings for extra columns
// 2. Execute query
// 3. Scan results into []NodeRow
var nodes []NodeRow
for /* iterate results */ {
row := NodeRow{
UID: "", // Required
HostOrURL: "", // Required
Activate: "", // Required
Extra: make(map[string]string), // Optional extra fields
}
// Map extra columns from config.ExtraMappings
for key, col := range config.ExtraMappings {
row.Extra[key] = "" // Get value from result
}
// Filter: only include active nodes with valid hostOrUrl
if isActive(row.Activate) && row.HostOrURL != "" {
nodes = append(nodes, row)
}
}
return nodes, nil
}
// Close closes the datasource connection
func (a *YourAdapter) Close() error {
// Cleanup resources
// Close connections
// Release handles
return nil
}
// Helper: check if node is active
func isActive(value string) bool {
switch value {
case "1", "true", "TRUE", "True", "yes", "YES", "Yes":
return true
default:
return false
}
}Important Details:
Required Fields
Every NodeRow must have:
UID- Unique node identifierHostOrURL- Node URL/hostnameActivate- Activation status (truthy/falsy)Extra- Map for additional fields
Filtering
Always filter out:
- Inactive nodes (
activateis false/0) - Nodes without
hostOrUrl
Step 5: Register Your Adapter β
Update the factory function in internal/datasource/interface.go:
// SourceType represents the type of datasource
type SourceType string
const (
SourceTypeMySQL SourceType = "mysql"
SourceTypePostgreSQL SourceType = "postgresql"
SourceTypeYours SourceType = "yourdatasource" // Add your type
)
// NewDatasource creates a new datasource adapter based on the source type
func NewDatasource(sourceType SourceType, config Config) (Datasource, error) {
switch sourceType {
case SourceTypeMySQL:
return NewMySQLAdapter(config)
case SourceTypePostgreSQL:
return NewPostgreSQLAdapter(config)
case SourceTypeYours: // Add your case
return NewYourAdapter(config)
default:
return nil, fmt.Errorf("unsupported datasource type: %s", sourceType)
}
}Step 6: Add API Support β
Update the CRD API types in api/v1/lynqhub_types.go:
// SourceType defines the type of external data source
type SourceType string
const (
SourceTypeMySQL SourceType = "mysql"
SourceTypePostgreSQL SourceType = "postgresql"
SourceTypeYours SourceType = "yourdatasource" // Add
)
// LynqHubSourceSpec defines the data source configuration
type LynqHubSourceSpec struct {
// Type of the data source
// +kubebuilder:validation:Enum=mysql;postgresql;yourdatasource
// +kubebuilder:validation:Required
Type SourceType `json:"type"`
// MySQL database configuration
// +optional
MySQL *MySQLSourceSpec `json:"mysql,omitempty"`
// PostgreSQL database configuration
// +optional
PostgreSQL *PostgreSQLSourceSpec `json:"postgresql,omitempty"`
// Your datasource configuration
// +optional
YourDatasource *YourDatasourceSpec `json:"yourdatasource,omitempty"` // Add
// ... rest of spec
}
// YourDatasourceSpec defines [YourDatasource] configuration
type YourDatasourceSpec struct {
// Host address
// +kubebuilder:validation:Required
Host string `json:"host"`
// Port number
// +kubebuilder:default=5432 // Your default port
// +optional
Port int32 `json:"port,omitempty"`
// Add your configuration fields
// Follow same pattern as MySQLSourceSpec
}Step 7: Update Controller Logic β
The controller (internal/controller/lynqhub_controller.go) may need updates for password extraction:
// buildDatasourceConfig builds datasource configuration from LynqHub spec
func (r *LynqHubReconciler) buildDatasourceConfig(registry *lynqv1.LynqHub, password string) (datasource.Config, string, error) {
switch registry.Spec.Source.Type {
case lynqv1.SourceTypeMySQL:
// ... existing MySQL logic
case lynqv1.SourceTypeYours: // Add your case
yours := registry.Spec.Source.YourDatasource
if yours == nil {
return datasource.Config{}, "", fmt.Errorf("YourDatasource configuration is nil")
}
config := datasource.Config{
Host: yours.Host,
Port: yours.Port,
Username: yours.Username,
Password: password,
Database: yours.Database,
}
return config, yours.Table, nil
default:
return datasource.Config{}, "", fmt.Errorf("unsupported source type: %s", registry.Spec.Source.Type)
}
}If your datasource uses password secrets, update queryDatabase():
// Get password from Secret (MySQL/PostgreSQL/YourDatasource)
password := ""
if registry.Spec.Source.MySQL != nil && registry.Spec.Source.MySQL.PasswordRef != nil {
// ... MySQL password logic
} else if registry.Spec.Source.YourDatasource != nil && registry.Spec.Source.YourDatasource.PasswordRef != nil {
// Add your password logic
secret := &corev1.Secret{}
if err := r.Get(ctx, types.NamespacedName{
Name: registry.Spec.Source.YourDatasource.PasswordRef.Name,
Namespace: registry.Namespace,
}, secret); err != nil {
return nil, fmt.Errorf("failed to get password secret: %w", err)
}
password = string(secret.Data[registry.Spec.Source.YourDatasource.PasswordRef.Key])
}Step 8: Write Tests β
Create comprehensive tests in internal/datasource/your_test.go:
package datasource
import (
"context"
"testing"
)
func TestYourAdapter_QueryNodes(t *testing.T) {
// Test setup
adapter, err := NewYourAdapter(Config{
Host: "localhost",
Port: 5432,
// ... test config
})
if err != nil {
t.Fatalf("Failed to create adapter: %v", err)
}
defer adapter.Close()
// Test query
nodes, err := adapter.QueryNodes(context.Background(), QueryConfig{
Table: "nodes",
ValueMappings: ValueMappings{
UID: "id",
HostOrURL: "url",
Activate: "active",
},
})
if err != nil {
t.Fatalf("QueryNodes failed: %v", err)
}
// Verify results
if len(nodes) == 0 {
t.Error("Expected nodes, got none")
}
}
func TestYourAdapter_Close(t *testing.T) {
adapter, _ := NewYourAdapter(Config{...})
err := adapter.Close()
if err != nil {
t.Errorf("Close failed: %v", err)
}
}Test Coverage Goals:
- β Connection establishment
- β Query execution
- β Result mapping
- β Error handling
- β Filtering logic
- β Resource cleanup
Step 9: Add Documentation β
Create documentation in docs/datasource-[yours].md:
# [YourDatasource] Datasource Configuration
This guide covers configuring Lynq with [YourDatasource].
## Prerequisites
- [YourDatasource] version X.X+
- Network connectivity from operator to datasource
- Read-only credentials
## Basic Configuration
\```yaml
apiVersion: operator.lynq.sh/v1
kind: LynqHub
metadata:
name: my-hub
spec:
source:
type: yourdatasource
yourdatasource:
host: your-host.example.com
port: 5432
username: node_reader
passwordRef:
name: yourdatasource-credentials
key: password
database: nodes_db
table: nodes
syncInterval: 1m
valueMappings:
uid: node_id
hostOrUrl: node_url
activate: is_active
\```
## Configuration Reference
[Document all configuration options...]Update main datasource docs (docs/datasource.md):
## Supported Datasources
| Datasource | Status | Version | Guide |
|------------|--------|---------|-------|
| MySQL | β
Stable | v1.0+ | [MySQL Guide](#mysql-connection) |
| PostgreSQL | β
Stable | v1.2+ | [PostgreSQL Guide](datasource-postgresql.md) |
| [Yours] | β
Stable | v1.X+ | [[Yours] Guide](datasource-yours.md) |Step 10: Update Release Documentation β
Update relevant documentation files:
README.md:
## Supported Datasources
- β
MySQL 5.7+ / 8.0+
- β
PostgreSQL 12+
- β
[YourDatasource] X.X+docs/roadmap.md:
## v1.X
### New Features
- β
**[YourDatasource] Support**
- Full [YourDatasource] integration
- Connection pooling
- SSL/TLS supportStep 11: Create Examples β
Add example manifests in config/samples/:
# Create example directory
mkdir -p config/samples/yourdatasource
# Add example files
touch config/samples/yourdatasource/hub.yaml
touch config/samples/yourdatasource/secret.yaml
touch config/samples/yourdatasource/template.yamlExample files:
config/samples/yourdatasource/secret.yaml:
apiVersion: v1
kind: Secret
metadata:
name: yourdatasource-credentials
namespace: default
type: Opaque
stringData:
password: "your-password-here"config/samples/yourdatasource/hub.yaml:
apiVersion: operator.lynq.sh/v1
kind: LynqHub
metadata:
name: yourdatasource-hub
namespace: default
spec:
source:
type: yourdatasource
yourdatasource:
host: your-host.example.com
port: 5432
username: tenant_reader
passwordRef:
name: yourdatasource-credentials
key: password
database: tenants_db
table: nodes
syncInterval: 1m
valueMappings:
uid: id
hostOrUrl: url
activate: activeStep 12: Test End-to-End β
Test your implementation thoroughly:
# 1. Build
make build
# 2. Run tests
make test
# 3. Run linter
make lint
# 4. Install CRDs
make install
# 5. Run locally
make run
# 6. Apply examples
kubectl apply -f config/samples/yourdatasource/secret.yaml
kubectl apply -f config/samples/yourdatasource/hub.yaml
# 7. Verify
kubectl get lynqhubs
kubectl describe lynqhub yourdatasource-hub
kubectl get lynqnodesTest Checklist:
- [ ] Connection succeeds
- [ ] Query returns expected rows
- [ ] Active filtering works
- [ ] Extra mappings work
- [ ] LynqNode CRs are created
- [ ] Status updates correctly
- [ ] Errors are handled gracefully
- [ ] Resource cleanup works
Pull Request Checklist β
Before submitting your PR:
Code β
- [ ] Adapter implements
Datasourceinterface - [ ] Factory function updated
- [ ] API types added with validation tags
- [ ] Controller logic updated
- [ ] Error handling comprehensive
- [ ] Code follows Go conventions
- [ ] Comments added for exported functions
Tests β
- [ ] Unit tests written (>80% coverage)
- [ ] Integration tests pass
- [ ] Manual testing completed
- [ ] Edge cases covered
- [ ] Error cases tested
Documentation β
- [ ] Datasource guide created
- [ ] Configuration reference complete
- [ ] Examples added
- [ ] README updated
- [ ] Roadmap updated
- [ ] CHANGELOG entry added
Quality β
- [ ]
make testpasses - [ ]
make lintpasses - [ ]
make buildsucceeds - [ ] No breaking changes
- [ ] Backwards compatible
Common Issues and Solutions β
Issue: Connection Fails β
Symptoms: Adapter returns connection error
Solutions:
- Check network connectivity
- Verify credentials
- Test connection outside operator
- Check firewall rules
- Review connection string format
Issue: No Nodes Returned β
Symptoms: Query succeeds but returns empty array
Causes:
- All rows have
activate=false - Missing
hostOrUrlvalues - Wrong table name
- Wrong column mappings
Debug:
// Add debug logging
logger.Info("Query results",
"rowCount", len(rows),
"activeCount", len(nodes),
"table", config.Table)Issue: Extra Mappings Not Working β
Symptoms: Extra fields not populating in LynqNode annotations
Solution: Ensure column mapping is stable:
// Build column index map first
colIndex := make(map[string]int)
for i, col := range extraColumns {
colIndex[col] = i
}
// Then map using stable indices
for key, col := range config.ExtraMappings {
idx := colIndex[col]
row.Extra[key] = values[idx]
}Getting Help β
If you need assistance:
- Check Examples: Review MySQL adapter implementation
- Ask Questions: Open a GitHub Discussion
- Report Issues: File a bug report
- Contributing: See Contributing Guidelines
Recognition β
Your contribution will be:
- β Listed in release notes
- β Added to CONTRIBUTORS file
- β Mentioned in README
- β Included in documentation
- β Appreciated by the community! π
Example: PostgreSQL Adapter β
Here's a complete PostgreSQL adapter example for reference:
package datasource
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq" // PostgreSQL driver
)
type PostgreSQLAdapter struct {
db *sql.DB
}
func NewPostgreSQLAdapter(config Config) (*PostgreSQLAdapter, error) {
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
config.Host, config.Port, config.Username, config.Password, config.Database)
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open PostgreSQL connection: %w", err)
}
// Configure pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
_ = db.Close()
return nil, fmt.Errorf("failed to ping PostgreSQL: %w", err)
}
return &PostgreSQLAdapter{db: db}, nil
}
func (a *PostgreSQLAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// Build column list
columns := []string{
config.ValueMappings.UID,
config.ValueMappings.HostOrURL,
config.ValueMappings.Activate,
}
extraColumns := make([]string, 0, len(config.ExtraMappings))
for _, col := range config.ExtraMappings {
columns = append(columns, col)
extraColumns = append(extraColumns, col)
}
// Build query (PostgreSQL uses $1, $2 for parameters if needed)
query := fmt.Sprintf("SELECT %s FROM %s", joinColumnsPostgres(columns), config.Table)
// Execute
rows, err := a.db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query nodes: %w", err)
}
defer rows.Close()
// Scan results
var nodes []NodeRow
for rows.Next() {
row := NodeRow{Extra: make(map[string]string)}
scanDest := []interface{}{&row.UID, &row.HostOrURL, &row.Activate}
extraValues := make([]sql.NullString, len(extraColumns))
for i := range extraValues {
scanDest = append(scanDest, &extraValues[i])
}
if err := rows.Scan(scanDest...); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
// Map extra values
colIndex := make(map[string]int)
for i, col := range extraColumns {
colIndex[col] = i
}
for key, col := range config.ExtraMappings {
if idx, ok := colIndex[col]; ok {
if extraValues[idx].Valid {
row.Extra[key] = extraValues[idx].String
}
}
}
// Filter
if isActive(row.Activate) && row.HostOrURL != "" {
nodes = append(nodes, row)
}
}
return nodes, rows.Err()
}
func (a *PostgreSQLAdapter) Close() error {
if a.db != nil {
return a.db.Close()
}
return nil
}
func joinColumnsPostgres(columns []string) string {
result := ""
for i, col := range columns {
if i > 0 {
result += ", "
}
result += `"` + col + `"` // PostgreSQL uses double quotes
}
return result
}Next Steps β
After your PR is merged:
- Announce: Share your contribution in discussions
- Blog: Consider writing a blog post about your datasource
- Maintain: Help maintain and improve your adapter
- Support: Answer questions from users
- Evolve: Propose enhancements
Thank you for contributing to Lynq! Your datasource adapter will help the community build database-driven applications with their preferred data sources. π
