Multi-Database Workflow
This tutorial demonstrates how to work with multiple databases in a single KDeps workflow, using named connections, connection pooling, and cross-database operations.
Prerequisites
- KDeps installed (see Installation)
- PostgreSQL and/or MySQL databases set up
- Basic understanding of SQL
Overview
KDeps supports multiple database connections in a single workflow. You can:
- Define named connections for different databases
- Use connection pooling for performance
- Query multiple databases in the same workflow
- Perform cross-database operations
Step 1: Configure Multiple Connections
Create workflow.yaml with multiple database connections:
yaml
apiVersion: kdeps.io/v1
kind: Workflow
metadata:
name: multi-database
description: Workflow with multiple database connections
version: "1.0.0"
targetActionId: results
settings:
apiServerMode: true
apiServer:
hostIp: "127.0.0.1"
portNum: 3000
routes:
- path: /api/v1/data
methods: [GET, POST]
agentSettings:
timezone: Etc/UTC
pythonVersion: "3.12"
sqlConnections:
# PostgreSQL connection for analytics
analytics:
connection: "postgres://user:pass@localhost:5432/analytics"
pool:
maxConnections: 10
minConnections: 2
# MySQL connection for inventory
inventory:
connection: "mysql://user:pass@localhost:3306/inventory"
pool:
maxConnections: 5
minConnections: 1
# SQLite for local cache
cache:
connection: "sqlite://./cache.db"Step 2: Using Named Connections
Reference connections by name in SQL resources:
yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: analyticsQuery
name: Analytics Query
run:
sql:
connectionName: analytics # Use named connection
query: |
SELECT
date,
COUNT(*) as total_users,
COUNT(DISTINCT email) as unique_emails,
AVG(age) as avg_age
FROM users
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY date
ORDER BY date DESC
format: csvStep 3: Querying Multiple Databases
Query different databases in sequence:
yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: userData
name: User Data
requires:
- analyticsQuery
- inventoryQuery
run:
apiResponse:
success: true
response:
analytics: get('analyticsQuery')
inventory: get('inventoryQuery')With separate resources:
yaml
# resources/analytics.yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: analyticsQuery
name: Analytics Query
run:
sql:
connectionName: analytics
query: <span v-pre>"SELECT * FROM user_stats WHERE date = {{ get('date') }}"</span>
---
# resources/inventory.yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: inventoryQuery
name: Inventory Query
run:
sql:
connectionName: inventory
query: "SELECT * FROM products WHERE status = 'active'"Step 4: Cross-Database Operations
Combine data from multiple databases:
yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: combinedData
name: Combined Data
requires:
- analyticsQuery
- inventoryQuery
run:
python:
script: |
import json
# Get data from both databases
analytics = get('analyticsQuery')
inventory = get('inventoryQuery')
# Combine and process
result = {
'user_count': len(analytics),
'product_count': len(inventory),
'analytics': analytics,
'inventory': inventory
}
return resultStep 5: Batch Operations Across Databases
Perform batch updates on multiple databases:
yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: batchUpdate
name: Batch Update
run:
sql:
connectionName: analytics
query: |
UPDATE users
SET status = $1, updated_at = NOW()
WHERE id = $2
paramsBatch:
- ["active", 123]
- ["inactive", 456]
- ["pending", 789]
transaction: trueStep 6: Transaction Management
Use transactions for atomic operations:
yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: transactionalUpdate
name: Transactional Update
run:
sql:
connectionName: analytics
transaction: true
queries:
- query: |
INSERT INTO orders (user_id, total)
VALUES ($1, $2)
params: [get('user_id'), get('total')]
- query: |
UPDATE users
SET last_order_at = NOW()
WHERE id = $1
params: [get('user_id')]Complete Example
Here's a complete workflow that demonstrates multi-database operations:
yaml
apiVersion: kdeps.io/v1
kind: Workflow
metadata:
name: multi-database-demo
version: "1.0.0"
targetActionId: results
settings:
apiServerMode: true
apiServer:
hostIp: "127.0.0.1"
portNum: 3000
routes:
- path: /api/v1/data
methods: [GET, POST]
sqlConnections:
analytics:
connection: "postgres://user:pass@localhost:5432/analytics"
pool:
maxConnections: 10
inventory:
connection: "mysql://user:pass@localhost:3306/inventory"
pool:
maxConnections: 5
---
# resources/analytics.yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: analyticsQuery
name: Analytics Query
run:
sql:
connectionName: analytics
query: |
SELECT date, COUNT(*) as users
FROM users
GROUP BY date
ORDER BY date DESC
format: csv
---
# resources/inventory.yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: inventoryQuery
name: Inventory Query
run:
sql:
connectionName: inventory
query: |
SELECT name, quantity, price
FROM products
WHERE status = 'active'
format: json
---
# resources/results.yaml
apiVersion: kdeps.io/v1
kind: Resource
metadata:
actionId: results
name: Results
requires:
- analyticsQuery
- inventoryQuery
run:
apiResponse:
success: true
response:
analytics: get('analyticsQuery')
inventory: get('inventoryQuery')
timestamp: info('now')Connection Pooling
Configure connection pools for better performance:
yaml
sqlConnections:
analytics:
connection: "postgres://user:pass@localhost:5432/analytics"
pool:
maxConnections: 20 # Maximum connections
minConnections: 5 # Minimum connections
maxIdleTime: "30m" # Close idle connections
maxLifetime: "1h" # Recycle connectionsPool Settings
- maxConnections: Maximum number of connections in the pool
- minConnections: Minimum number of connections to maintain
- maxIdleTime: Close connections idle for this duration
- maxLifetime: Recycle connections after this duration
Supported Databases
KDeps supports multiple database types:
| Database | Connection String Format |
|---|---|
| PostgreSQL | postgres://user:pass@host:port/db |
| MySQL | mysql://user:pass@host:port/db |
| SQLite | sqlite://./path/to/db.db |
| SQL Server | sqlserver://user:pass@host:port?database=db |
Best Practices
- Use Named Connections: Name connections for clarity and reusability
- Configure Pools: Set appropriate pool sizes based on workload
- Use Transactions: Enable transactions for atomic operations
- Handle Errors: Add error handling for database operations
- Optimize Queries: Use indexes and efficient queries
Error Handling
Handle database errors gracefully:
yaml
run:
sql:
connectionName: analytics
query: "SELECT * FROM users WHERE id = $1"
params: [get('user_id')]
onError:
apiResponse:
success: false
response:
error: "Database query failed"
message: get('error')Performance Tips
- Connection Pooling: Use connection pools to reuse connections
- Batch Operations: Use
paramsBatchfor multiple similar queries - Transactions: Group related queries in transactions
- Indexes: Ensure proper database indexes
- Query Optimization: Use efficient SQL queries
Troubleshooting
Connection Errors
- Verify connection strings are correct
- Check database is running and accessible
- Ensure credentials are valid
- Check network connectivity
Pool Exhaustion
- Increase
maxConnectionsif needed - Check for connection leaks
- Monitor connection usage
Transaction Errors
- Ensure
transaction: trueis set - Check for deadlocks
- Verify transaction isolation level
Next Steps
- Advanced SQL: Learn about SQL resource features
- Batch Processing: Use items iteration for large datasets
- Error Handling: Implement comprehensive error handling
- Performance: Optimize queries and connection pools
Related Documentation
- SQL Resource - Complete SQL configuration reference
- Workflow Configuration - Connection settings
- Python Resource - Data processing with Python
- Unified API - Accessing data across resources