Contributing a New Datasource
This guide walks you through implementing a new datasource adapter for Lynq. The operator uses an adapter pattern to support multiple data sources, making it easy to add support for new databases or data sources.
Overview
Lynq uses a pluggable datasource architecture:
Key Benefits:
- ✅ Clean interface - Only 2 methods to implement
- ✅ Reference implementation - MySQL adapter as example
- ✅ Type-safe - Strongly typed configuration
- ✅ Testable - Easy to mock and test
- ✅ Isolated - No changes to core controller logic
Prerequisites
Before starting:
- Go 1.22+ installed
- Familiarity with:
- Go interfaces
- SQL or your target datasource
- Kubernetes operator patterns (helpful)
- Development environment setup (see Development Guide)
Step-by-Step Guide
Step 1: Understand the Interface
The Datasource interface is defined in internal/datasource/interface.go:
// Datasource defines the interface that all datasource adapters must implement
type Datasource interface {
// QueryNodes retrieves active node rows from the datasource
QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error)
// Close closes the datasource connection
io.Closer
}You need to implement:
QueryNodes()- Query node data from your datasourceClose()- Clean up resources (connections, files, etc.)
Step 2: Study the MySQL Reference Implementation
The MySQL adapter (internal/datasource/mysql.go) is a complete reference implementation. Key sections:
// Adapter struct - holds connection
type MySQLAdapter struct {
db *sql.DB
}
// Constructor - establishes connection
func NewMySQLAdapter(config Config) (*MySQLAdapter, error) {
// 1. Build connection string
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", ...)
// 2. Open connection
db, err := sql.Open("mysql", dsn)
// 3. Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// 4. Test connection
if err := db.PingContext(ctx); err != nil {
return nil, err
}
return &MySQLAdapter{db: db}, nil
}
// QueryNodes - query and map data
func (a *MySQLAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// 1. Build query
query := fmt.Sprintf("SELECT %s FROM %s", columns, table)
// 2. Execute query
rows, err := a.db.QueryContext(ctx, query)
// 3. Scan results
var nodes []NodeRow
for rows.Next() {
// Map columns to NodeRow
// Filter active nodes
}
return nodes, nil
}
// Close - cleanup
func (a *MySQLAdapter) Close() error {
if a.db != nil {
return a.db.Close()
}
return nil
}Step 3: Create Your Adapter File
Create a new file in internal/datasource/ for your adapter:
# Example: PostgreSQL
touch internal/datasource/postgres.go
# Example: MongoDB
touch internal/datasource/mongodb.go
# Example: REST API
touch internal/datasource/rest.goStep 4: Implement the Adapter
Use this template:
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
...
*/
package datasource
import (
"context"
"fmt"
// Import your datasource driver
// _ "github.com/lib/pq" // PostgreSQL
)
// YourAdapter implements the Datasource interface for [YourDatasource]
type YourAdapter struct {
// Connection handle (e.g., *sql.DB, *mongo.Client, *http.Client)
conn interface{} // Replace with actual type
}
// NewYourAdapter creates a new [YourDatasource] datasource adapter
func NewYourAdapter(config Config) (*YourAdapter, error) {
// 1. Build connection string/config
// 2. Establish connection
// 3. Configure connection pool/settings
// 4. Test connection
// 5. Return adapter
return &YourAdapter{conn: conn}, nil
}
// QueryNodes queries active nodes from [YourDatasource]
func (a *YourAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// 1. Build query/request
// Use config.Table for table/collection name
// Use config.ValueMappings for required columns
// Use config.ExtraMappings for extra columns
// 2. Execute query
// 3. Scan results into []NodeRow
var nodes []NodeRow
for /* iterate results */ {
row := NodeRow{
UID: "", // Required
HostOrURL: "", // Required
Activate: "", // Required
Extra: make(map[string]string), // Optional extra fields
}
// Map extra columns from config.ExtraMappings
for key, col := range config.ExtraMappings {
row.Extra[key] = "" // Get value from result
}
// Filter: only include active nodes with valid hostOrUrl
if isActive(row.Activate) && row.HostOrURL != "" {
nodes = append(nodes, row)
}
}
return nodes, nil
}
// Close closes the datasource connection
func (a *YourAdapter) Close() error {
// Cleanup resources
// Close connections
// Release handles
return nil
}
// Helper: check if node is active
func isActive(value string) bool {
switch value {
case "1", "true", "TRUE", "True", "yes", "YES", "Yes":
return true
default:
return false
}
}Important Details:
Required Fields
Every NodeRow must have:
UID- Unique node identifierHostOrURL- Node URL/hostnameActivate- Activation status (truthy/falsy)Extra- Map for additional fields
Filtering
Always filter out:
- Inactive nodes (
activateis false/0) - Nodes without
hostOrUrl
Step 5: Register Your Adapter
Update the factory function in internal/datasource/interface.go:
// SourceType represents the type of datasource
type SourceType string
const (
SourceTypeMySQL SourceType = "mysql"
SourceTypePostgreSQL SourceType = "postgresql"
SourceTypeYours SourceType = "yourdatasource" // Add your type
)
// NewDatasource creates a new datasource adapter based on the source type
func NewDatasource(sourceType SourceType, config Config) (Datasource, error) {
switch sourceType {
case SourceTypeMySQL:
return NewMySQLAdapter(config)
case SourceTypePostgreSQL:
return NewPostgreSQLAdapter(config)
case SourceTypeYours: // Add your case
return NewYourAdapter(config)
default:
return nil, fmt.Errorf("unsupported datasource type: %s", sourceType)
}
}Step 6: Add API Support
Update the CRD API types in api/v1/lynqhub_types.go:
// SourceType defines the type of external data source
type SourceType string
const (
SourceTypeMySQL SourceType = "mysql"
SourceTypePostgreSQL SourceType = "postgresql"
SourceTypeYours SourceType = "yourdatasource" // Add
)
// LynqHubSourceSpec defines the data source configuration
type LynqHubSourceSpec struct {
// Type of the data source
// +kubebuilder:validation:Enum=mysql;postgresql;yourdatasource
// +kubebuilder:validation:Required
Type SourceType `json:"type"`
// MySQL database configuration
// +optional
MySQL *MySQLSourceSpec `json:"mysql,omitempty"`
// PostgreSQL database configuration
// +optional
PostgreSQL *PostgreSQLSourceSpec `json:"postgresql,omitempty"`
// Your datasource configuration
// +optional
YourDatasource *YourDatasourceSpec `json:"yourdatasource,omitempty"` // Add
// ... rest of spec
}
// YourDatasourceSpec defines [YourDatasource] configuration
type YourDatasourceSpec struct {
// Host address
// +kubebuilder:validation:Required
Host string `json:"host"`
// Port number
// +kubebuilder:default=5432 // Your default port
// +optional
Port int32 `json:"port,omitempty"`
// Add your configuration fields
// Follow same pattern as MySQLSourceSpec
}Step 7: Update Controller Logic
The controller (internal/controller/lynqhub_controller.go) may need updates for password extraction:
// buildDatasourceConfig builds datasource configuration from LynqHub spec
func (r *LynqHubReconciler) buildDatasourceConfig(registry *lynqv1.LynqHub, password string) (datasource.Config, string, error) {
switch registry.Spec.Source.Type {
case lynqv1.SourceTypeMySQL:
// ... existing MySQL logic
case lynqv1.SourceTypeYours: // Add your case
yours := registry.Spec.Source.YourDatasource
if yours == nil {
return datasource.Config{}, "", fmt.Errorf("YourDatasource configuration is nil")
}
config := datasource.Config{
Host: yours.Host,
Port: yours.Port,
Username: yours.Username,
Password: password,
Database: yours.Database,
}
return config, yours.Table, nil
default:
return datasource.Config{}, "", fmt.Errorf("unsupported source type: %s", registry.Spec.Source.Type)
}
}If your datasource uses password secrets, update queryDatabase():
// Get password from Secret (MySQL/PostgreSQL/YourDatasource)
password := ""
if registry.Spec.Source.MySQL != nil && registry.Spec.Source.MySQL.PasswordRef != nil {
// ... MySQL password logic
} else if registry.Spec.Source.YourDatasource != nil && registry.Spec.Source.YourDatasource.PasswordRef != nil {
// Add your password logic
secret := &corev1.Secret{}
if err := r.Get(ctx, types.NamespacedName{
Name: registry.Spec.Source.YourDatasource.PasswordRef.Name,
Namespace: registry.Namespace,
}, secret); err != nil {
return nil, fmt.Errorf("failed to get password secret: %w", err)
}
password = string(secret.Data[registry.Spec.Source.YourDatasource.PasswordRef.Key])
}Step 8: Write Tests
Create comprehensive tests in internal/datasource/your_test.go:
package datasource
import (
"context"
"testing"
)
func TestYourAdapter_QueryNodes(t *testing.T) {
// Test setup
adapter, err := NewYourAdapter(Config{
Host: "localhost",
Port: 5432,
// ... test config
})
if err != nil {
t.Fatalf("Failed to create adapter: %v", err)
}
defer adapter.Close()
// Test query
nodes, err := adapter.QueryNodes(context.Background(), QueryConfig{
Table: "nodes",
ValueMappings: ValueMappings{
UID: "id",
HostOrURL: "url",
Activate: "active",
},
})
if err != nil {
t.Fatalf("QueryNodes failed: %v", err)
}
// Verify results
if len(nodes) == 0 {
t.Error("Expected nodes, got none")
}
}
func TestYourAdapter_Close(t *testing.T) {
adapter, _ := NewYourAdapter(Config{...})
err := adapter.Close()
if err != nil {
t.Errorf("Close failed: %v", err)
}
}Test Coverage Goals:
- ✅ Connection establishment
- ✅ Query execution
- ✅ Result mapping
- ✅ Error handling
- ✅ Filtering logic
- ✅ Resource cleanup
Step 9: Add Documentation
Create documentation in docs/datasource-[yours].md:
# [YourDatasource] Datasource Configuration
This guide covers configuring Lynq with [YourDatasource].
## Prerequisites
- [YourDatasource] version X.X+
- Network connectivity from operator to datasource
- Read-only credentials
## Basic Configuration
\```yaml
apiVersion: operator.lynq.sh/v1
kind: LynqHub
metadata:
name: my-hub
spec:
source:
type: yourdatasource
yourdatasource:
host: your-host.example.com
port: 5432
username: node_reader
passwordRef:
name: yourdatasource-credentials
key: password
database: nodes_db
table: nodes
syncInterval: 1m
valueMappings:
uid: node_id
hostOrUrl: node_url
activate: is_active
\```
## Configuration Reference
[Document all configuration options...]Update main datasource docs (docs/datasource.md):
## Supported Datasources
| Datasource | Status | Version | Guide |
|------------|--------|---------|-------|
| MySQL | ✅ Stable | v1.0+ | [MySQL Guide](#mysql-connection) |
| PostgreSQL | ✅ Stable | v1.2+ | [PostgreSQL Guide](datasource-postgresql.md) |
| [Yours] | ✅ Stable | v1.X+ | [[Yours] Guide](datasource-yours.md) |Step 10: Update Release Documentation
Update relevant documentation files:
README.md:
## Supported Datasources
- ✅ MySQL 5.7+ / 8.0+
- ✅ PostgreSQL 12+
- ✅ [YourDatasource] X.X+docs/roadmap.md:
## v1.X
### New Features
- ✅ **[YourDatasource] Support**
- Full [YourDatasource] integration
- Connection pooling
- SSL/TLS supportStep 11: Create Examples
Add example manifests in config/samples/:
# Create example directory
mkdir -p config/samples/yourdatasource
# Add example files
touch config/samples/yourdatasource/hub.yaml
touch config/samples/yourdatasource/secret.yaml
touch config/samples/yourdatasource/template.yamlExample files:
config/samples/yourdatasource/secret.yaml:
apiVersion: v1
kind: Secret
metadata:
name: yourdatasource-credentials
namespace: default
type: Opaque
stringData:
password: "your-password-here"config/samples/yourdatasource/hub.yaml:
apiVersion: operator.lynq.sh/v1
kind: LynqHub
metadata:
name: yourdatasource-hub
namespace: default
spec:
source:
type: yourdatasource
yourdatasource:
host: your-host.example.com
port: 5432
username: tenant_reader
passwordRef:
name: yourdatasource-credentials
key: password
database: tenants_db
table: nodes
syncInterval: 1m
valueMappings:
uid: id
hostOrUrl: url
activate: activeStep 12: Test End-to-End
Test your implementation thoroughly:
# 1. Build
make build
# 2. Run tests
make test
# 3. Run linter
make lint
# 4. Install CRDs
make install
# 5. Run locally
make run
# 6. Apply examples
kubectl apply -f config/samples/yourdatasource/secret.yaml
kubectl apply -f config/samples/yourdatasource/hub.yaml
# 7. Verify
kubectl get lynqhubs
kubectl describe lynqhub yourdatasource-hub
kubectl get lynqnodesTest Checklist:
- [ ] Connection succeeds
- [ ] Query returns expected rows
- [ ] Active filtering works
- [ ] Extra mappings work
- [ ] LynqNode CRs are created
- [ ] Status updates correctly
- [ ] Errors are handled gracefully
- [ ] Resource cleanup works
Pull Request Checklist
Before submitting your PR:
Code
- [ ] Adapter implements
Datasourceinterface - [ ] Factory function updated
- [ ] API types added with validation tags
- [ ] Controller logic updated
- [ ] Error handling comprehensive
- [ ] Code follows Go conventions
- [ ] Comments added for exported functions
Tests
- [ ] Unit tests written (>80% coverage)
- [ ] Integration tests pass
- [ ] Manual testing completed
- [ ] Edge cases covered
- [ ] Error cases tested
Documentation
- [ ] Datasource guide created
- [ ] Configuration reference complete
- [ ] Examples added
- [ ] README updated
- [ ] Roadmap updated
- [ ] CHANGELOG entry added
Quality
- [ ]
make testpasses - [ ]
make lintpasses - [ ]
make buildsucceeds - [ ] No breaking changes
- [ ] Backwards compatible
Common Issues and Solutions
Issue: Connection Fails
Symptoms: Adapter returns connection error
Solutions:
- Check network connectivity
- Verify credentials
- Test connection outside operator
- Check firewall rules
- Review connection string format
Issue: No Nodes Returned
Symptoms: Query succeeds but returns empty array
Causes:
- All rows have
activate=false - Missing
hostOrUrlvalues - Wrong table name
- Wrong column mappings
Debug:
// Add debug logging
logger.Info("Query results",
"rowCount", len(rows),
"activeCount", len(nodes),
"table", config.Table)Issue: Extra Mappings Not Working
Symptoms: Extra fields not populating in LynqNode annotations
Solution: Ensure column mapping is stable:
// Build column index map first
colIndex := make(map[string]int)
for i, col := range extraColumns {
colIndex[col] = i
}
// Then map using stable indices
for key, col := range config.ExtraMappings {
idx := colIndex[col]
row.Extra[key] = values[idx]
}Getting Help
If you need assistance:
- Check Examples: Review MySQL adapter implementation
- Ask Questions: Open a GitHub Discussion
- Report Issues: File a bug report
- Contributing: See Contributing Guidelines
Recognition
Your contribution will be:
- ✅ Listed in release notes
- ✅ Added to CONTRIBUTORS file
- ✅ Mentioned in README
- ✅ Included in documentation
- ✅ Appreciated by the community! 🎉
Example: PostgreSQL Adapter
Here's a complete PostgreSQL adapter example for reference:
package datasource
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq" // PostgreSQL driver
)
type PostgreSQLAdapter struct {
db *sql.DB
}
func NewPostgreSQLAdapter(config Config) (*PostgreSQLAdapter, error) {
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
config.Host, config.Port, config.Username, config.Password, config.Database)
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open PostgreSQL connection: %w", err)
}
// Configure pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
_ = db.Close()
return nil, fmt.Errorf("failed to ping PostgreSQL: %w", err)
}
return &PostgreSQLAdapter{db: db}, nil
}
func (a *PostgreSQLAdapter) QueryNodes(ctx context.Context, config QueryConfig) ([]NodeRow, error) {
// Build column list
columns := []string{
config.ValueMappings.UID,
config.ValueMappings.HostOrURL,
config.ValueMappings.Activate,
}
extraColumns := make([]string, 0, len(config.ExtraMappings))
for _, col := range config.ExtraMappings {
columns = append(columns, col)
extraColumns = append(extraColumns, col)
}
// Build query (PostgreSQL uses $1, $2 for parameters if needed)
query := fmt.Sprintf("SELECT %s FROM %s", joinColumnsPostgres(columns), config.Table)
// Execute
rows, err := a.db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query nodes: %w", err)
}
defer rows.Close()
// Scan results
var nodes []NodeRow
for rows.Next() {
row := NodeRow{Extra: make(map[string]string)}
scanDest := []interface{}{&row.UID, &row.HostOrURL, &row.Activate}
extraValues := make([]sql.NullString, len(extraColumns))
for i := range extraValues {
scanDest = append(scanDest, &extraValues[i])
}
if err := rows.Scan(scanDest...); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
// Map extra values
colIndex := make(map[string]int)
for i, col := range extraColumns {
colIndex[col] = i
}
for key, col := range config.ExtraMappings {
if idx, ok := colIndex[col]; ok {
if extraValues[idx].Valid {
row.Extra[key] = extraValues[idx].String
}
}
}
// Filter
if isActive(row.Activate) && row.HostOrURL != "" {
nodes = append(nodes, row)
}
}
return nodes, rows.Err()
}
func (a *PostgreSQLAdapter) Close() error {
if a.db != nil {
return a.db.Close()
}
return nil
}
func joinColumnsPostgres(columns []string) string {
result := ""
for i, col := range columns {
if i > 0 {
result += ", "
}
result += `"` + col + `"` // PostgreSQL uses double quotes
}
return result
}Next Steps
After your PR is merged:
- Announce: Share your contribution in discussions
- Blog: Consider writing a blog post about your datasource
- Maintain: Help maintain and improve your adapter
- Support: Answer questions from users
- Evolve: Propose enhancements
Thank you for contributing to Lynq! Your datasource adapter will help the community build database-driven applications with their preferred data sources. 🚀
