Skip to main content
Data Connectors enable agents to read from and write to databases, data warehouses, and various data sources, making it easy to work with structured and unstructured data across your organization.

Overview

The Data Connectors primitive provides agents with direct access to your data infrastructure. Whether you’re working with SQL databases, NoSQL stores, data warehouses, or cloud storage, agents can query, analyze, and manipulate data using natural language. Data Connectors are essential for:
  • Database Access: Query and update SQL and NoSQL databases
  • Data Analysis: Analyze data across multiple sources
  • Data Synchronization: Keep data in sync across systems
  • Reporting: Generate reports from live data
  • ETL Operations: Extract, transform, and load data
  • Data Validation: Verify data integrity and quality

SQL Databases

Connect to PostgreSQL, MySQL, SQL Server, and more

NoSQL Stores

Access MongoDB, Redis, DynamoDB, and other NoSQL databases

Data Warehouses

Query Snowflake, BigQuery, Redshift, and analytics platforms

Cloud Storage

Read and write to S3, GCS, Azure Blob, and object storage

How Data Connectors Work

When you configure data connectors for an agent:
  1. Connection: Agent establishes secure connection to data source
  2. Schema Discovery: Automatically detects tables, columns, and relationships
  3. Query Generation: Converts natural language to appropriate queries (SQL, NoSQL, etc.)
  4. Execution: Runs queries with proper security and access controls
  5. Result Processing: Formats and returns results in structured format
  6. Connection Pooling: Maintains efficient connection management
Security First: All database credentials are encrypted at rest and in transit. Connections use SSL/TLS when available.

Supported Data Sources

SQL Databases

{
  dataConnectors: {
    postgres: { enabled: true },
    mysql: { enabled: true },
    sqlserver: { enabled: true },
    oracle: { enabled: true },
    sqlite: { enabled: true }
  }
}

NoSQL Databases

{
  dataConnectors: {
    mongodb: { enabled: true },
    redis: { enabled: true },
    dynamodb: { enabled: true },
    cassandra: { enabled: true },
    firebase: { enabled: true }
  }
}

Data Warehouses

{
  dataConnectors: {
    snowflake: { enabled: true },
    bigquery: { enabled: true },
    redshift: { enabled: true },
    databricks: { enabled: true }
  }
}

Cloud Storage

{
  dataConnectors: {
    s3: { enabled: true },
    gcs: { enabled: true },
    azureBlob: { enabled: true }
  }
}

Code Examples

Basic SQL Query

import { Agentbase } from '@agentbase/sdk';

const agentbase = new Agentbase({
  apiKey: process.env.AGENTBASE_API_KEY
});

// Query PostgreSQL database
const result = await agentbase.runAgent({
  message: "Show me all customers who signed up in the last 30 days",
  dataConnectors: {
    postgres: {
      enabled: true,
      connection: {
        host: process.env.DB_HOST,
        port: 5432,
        database: "production",
        user: process.env.DB_USER,
        password: process.env.DB_PASSWORD,
        ssl: true
      }
    }
  }
});

console.log('Results:', result.data);
// Agent generates and executes SQL query

Connection String

// Use connection string format
const result = await agentbase.runAgent({
  message: "Count total orders by status",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
      // Format: postgresql://user:password@host:port/database
    }
  }
});

NoSQL Query

// Query MongoDB
const result = await agentbase.runAgent({
  message: "Find all products with rating above 4.5 stars",
  dataConnectors: {
    mongodb: {
      enabled: true,
      connection: {
        uri: process.env.MONGODB_URI,
        database: "ecommerce",
        collection: "products"
      }
    }
  }
});

// Agent generates MongoDB query
console.log('Products:', result.data);

Data Warehouse Query

// Query Snowflake data warehouse
const result = await agentbase.runAgent({
  message: "Show me total sales by region for Q4 2024",
  dataConnectors: {
    snowflake: {
      enabled: true,
      connection: {
        account: process.env.SNOWFLAKE_ACCOUNT,
        username: process.env.SNOWFLAKE_USER,
        password: process.env.SNOWFLAKE_PASSWORD,
        warehouse: "COMPUTE_WH",
        database: "ANALYTICS",
        schema: "SALES"
      }
    }
  }
});

console.log('Sales data:', result.data);

Multiple Data Sources

// Query across multiple databases
const result = await agentbase.runAgent({
  message: "Compare customer data between production DB and analytics warehouse",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.PRODUCTION_DB_URL,
      alias: "production"
    },
    snowflake: {
      enabled: true,
      connection: snowflakeConfig,
      alias: "analytics"
    }
  },
  system: `You have access to two databases:
  - production: Live PostgreSQL database
  - analytics: Snowflake data warehouse

  Compare customer counts and identify any discrepancies.`
});

// Agent queries both sources and compares

Write Operations

// Insert and update data
const result = await agentbase.runAgent({
  message: "Create a new customer record for John Doe ([email protected])",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL,
      permissions: {
        read: true,
        write: true // Enable write operations
      }
    }
  }
});

// Agent generates INSERT statement
console.log('Customer created:', result.data);

Cloud Storage Access

// Read from S3
const result = await agentbase.runAgent({
  message: "Read the latest sales report from S3",
  dataConnectors: {
    s3: {
      enabled: true,
      connection: {
        region: "us-east-1",
        accessKeyId: process.env.AWS_ACCESS_KEY_ID,
        secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
        bucket: "company-reports"
      }
    }
  }
});

// Agent lists and reads files from S3
console.log('Report data:', result.data);

Caching Queries

// Cache expensive queries
const result = await agentbase.runAgent({
  message: "Show me top 10 selling products this month",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL,
      caching: {
        enabled: true,
        ttl: 3600 // Cache for 1 hour
      }
    }
  }
});

// Subsequent identical queries use cached results

Use Cases

1. Analytics Dashboard

Generate real-time analytics from data warehouse:
const analyticsAgent = await agentbase.runAgent({
  message: "Create a summary of key business metrics for this week",
  dataConnectors: {
    snowflake: {
      enabled: true,
      connection: snowflakeConfig
    }
  },
  system: `Generate analytics report including:

  1. Total revenue this week vs last week
  2. New customer signups
  3. Top 5 products by sales
  4. Average order value
  5. Customer churn rate

  Present results in a structured format with percentage changes.`
});

// Agent queries warehouse and generates comprehensive report
console.log('Analytics:', analyticsAgent.report);

2. Data Quality Validation

Validate data integrity across systems:
const validationAgent = await agentbase.runAgent({
  message: "Check data quality in customer table",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
    }
  },
  system: `Perform data quality checks:

  1. Find duplicate email addresses
  2. Identify missing required fields
  3. Check for invalid phone number formats
  4. Find customers with future birth dates
  5. Detect orphaned records (references to non-existent data)

  Report issues with counts and examples.`
});

// Agent runs validation queries and reports issues
if (validationAgent.issues.length > 0) {
  console.log('Data quality issues:', validationAgent.issues);
}

3. ETL Pipeline

Extract, transform, and load data:
const etlAgent = await agentbase.runAgent({
  message: "Sync customer data from production to analytics warehouse",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.PRODUCTION_DB_URL,
      alias: "source"
    },
    snowflake: {
      enabled: true,
      connection: snowflakeConfig,
      alias: "destination",
      permissions: {
        read: true,
        write: true
      }
    }
  },
  system: `ETL Process:

  1. Extract: Get customers modified in last 24 hours from source
  2. Transform: Clean data, standardize formats, enrich with metadata
  3. Load: Upsert into destination warehouse
  4. Verify: Validate row counts match
  5. Log: Record sync statistics`
});

console.log('ETL completed:', etlAgent.stats);

4. Customer Lookup

Build customer service tools:
const customerLookup = await agentbase.runAgent({
  message: "Find customer information for email: [email protected]",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
    }
  },
  system: `Look up customer and return:

  - Basic info (name, email, phone)
  - Account status and tier
  - Recent orders (last 5)
  - Support tickets (open and recent closed)
  - Lifetime value
  - Last interaction date

  Format as a customer profile card.`
});

// Returns comprehensive customer data
console.log('Customer profile:', customerLookup.profile);

5. Report Generation

Generate custom reports from data:
const reportAgent = await agentbase.runAgent({
  message: "Generate monthly sales report for January 2024",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
    }
  },
  system: `Generate comprehensive sales report:

  1. Total sales by product category
  2. Sales by region
  3. Top 10 customers by revenue
  4. Sales rep performance
  5. Month-over-month growth
  6. Forecast for next month based on trends

  Include visualizations and insights.`
});

// Agent queries data and generates formatted report

6. Database Migration

Migrate data between databases:
const migrationAgent = await agentbase.runAgent({
  message: "Migrate users table from MySQL to PostgreSQL",
  dataConnectors: {
    mysql: {
      enabled: true,
      connection: mysqlConfig,
      alias: "source"
    },
    postgres: {
      enabled: true,
      connection: postgresConfig,
      alias: "destination",
      permissions: {
        read: true,
        write: true
      }
    }
  },
  system: `Migration process:

  1. Read schema from source users table
  2. Create equivalent table in destination if not exists
  3. Copy data in batches of 1000
  4. Validate data integrity
  5. Create indexes
  6. Report migration statistics`
});

Best Practices

Security

Read-Only by Default: Data connectors are read-only by default. Explicitly enable write permissions only when needed.
// Good: Read-only database user
dataConnectors: {
  postgres: {
    enabled: true,
    connection: {
      user: "readonly_user", // User with SELECT only
      password: process.env.DB_READONLY_PASSWORD
    }
  }
}

// For write operations, use dedicated user
dataConnectors: {
  postgres: {
    enabled: true,
    connection: {
      user: "app_writer",
      password: process.env.DB_WRITER_PASSWORD
    },
    permissions: {
      read: true,
      write: true
    }
  }
}
// Restrict to specific schemas or tables
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    scope: {
      schemas: ["public", "analytics"],
      tables: ["customers", "orders", "products"],
      excludeTables: ["admin_users", "api_keys"]
    }
  }
}
// Configure connection pool
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    pool: {
      min: 2,
      max: 10,
      acquireTimeout: 30000,
      idleTimeout: 10000
    }
  }
}
// Log all queries for audit trail
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    logging: {
      enabled: true,
      logQueries: true,
      logResults: false, // Don't log sensitive data
      logErrors: true
    }
  }
}

Performance

Cache Frequently Accessed Data: Enable caching for queries that run frequently and don’t need real-time data.
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    timeout: 30000, // 30 second timeout
    statementTimeout: 20000 // 20 second query timeout
  }
}
system: `When querying data:
- Always use LIMIT clause for large tables
- Default to 100 rows unless more are specifically needed
- Use pagination for large result sets
- Warn if query would return more than 1000 rows`
message: `Analyze query performance and suggest indexes`,
system: `When running queries:
- Use EXPLAIN to analyze query plans
- Identify missing indexes
- Suggest index creation for slow queries
- Avoid table scans on large tables`
// Process data in batches
const result = await agentbase.runAgent({
  message: "Update all customer records to add loyalty_points field",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL,
      permissions: { write: true }
    }
  },
  system: `Update customers in batches:
  - Process 1000 records at a time
  - Use transactions for consistency
  - Commit after each batch
  - Log progress`
});

Data Integrity

system: `For write operations:
- Always use transactions
- Validate data before commit
- Rollback on any error
- Log transaction details`
system: `Before inserting/updating data:
- Validate required fields are present
- Check data types match schema
- Verify foreign key references exist
- Ensure unique constraints won't be violated
- Confirm data is within valid ranges`
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    errorHandling: {
      retryOnConnectionError: true,
      maxRetries: 3,
      retryDelay: 1000,
      logErrors: true
    }
  }
}

Integration with Other Primitives

With RAG

Combine database queries with semantic search:
const result = await agentbase.runAgent({
  message: "Find similar customer support cases to this one",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
    }
  },
  datastores: [{
    id: "ds_support_cases",
    name: "Support Cases Knowledge Base"
  }]
});

// Agent queries database and semantic knowledge base
Learn more: RAG Primitive

With Workflow

Automate data processing workflows:
const dataWorkflow = {
  name: "daily_data_sync",
  steps: [
    {
      id: "extract",
      type: "agent_task",
      config: {
        message: "Extract new records from source database",
        dataConnectors: { mysql: { enabled: true } }
      }
    },
    {
      id: "transform",
      type: "agent_task",
      config: {
        message: "Transform and clean data"
      }
    },
    {
      id: "load",
      type: "agent_task",
      config: {
        message: "Load into destination warehouse",
        dataConnectors: { snowflake: { enabled: true } }
      }
    }
  ]
};
Learn more: Workflow Primitive

With Memory

Remember query patterns and preferences:
const result = await agentbase.runAgent({
  message: "Show me the usual sales report",
  memory: {
    namespace: `user_${userId}`,
    enabled: true
  },
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL
    }
  }
});

// Agent remembers user's preferred report format and filters
Learn more: Memory Primitive

Performance Considerations

Query Optimization

  • Query Planning: Analyze query plans before execution
  • Index Usage: Ensure queries use appropriate indexes
  • Result Limiting: Always limit result sets to needed rows
  • Caching: Cache frequently accessed, slowly changing data
// Monitor query performance
const metrics = await agentbase.getDataConnectorMetrics({
  connector: "postgres",
  timeRange: "1h"
});

console.log('Avg query time:', metrics.avgQueryTime);
console.log('Slow queries:', metrics.slowQueries);
console.log('Cache hit rate:', metrics.cacheHitRate);

Connection Management

  • Pool Size: Configure appropriate connection pool sizes
  • Connection Reuse: Reuse connections across queries
  • Timeout Management: Set appropriate timeouts
  • Cleanup: Close idle connections
dataConnectors: {
  postgres: {
    enabled: true,
    pool: {
      min: 2,
      max: 10,
      acquireTimeout: 30000,
      idleTimeout: 10000,
      evictionRunInterval: 10000
    }
  }
}

Cost Optimization

Monitor Warehouse Usage: Data warehouse queries can be expensive. Monitor usage and optimize expensive queries.
// Set query cost limits
dataConnectors: {
  snowflake: {
    enabled: true,
    connection: snowflakeConfig,
    costLimits: {
      maxQueryCost: 10.00, // USD
      alertThreshold: 0.8,
      blockExpensiveQueries: true
    }
  }
}

Troubleshooting

Problem: Cannot connect to databaseSolutions:
  • Verify connection credentials are correct
  • Check network connectivity and firewall rules
  • Ensure database server is running
  • Verify SSL/TLS settings match requirements
  • Check IP whitelist if applicable
// Test connection
const test = await agentbase.testDataConnector({
  type: "postgres",
  connectionString: process.env.DATABASE_URL
});

if (!test.success) {
  console.error('Connection error:', test.error);
  console.log('Suggestion:', test.suggestion);
}
Problem: Queries timing outSolutions:
  • Increase timeout limits
  • Optimize slow queries with indexes
  • Reduce result set size
  • Use query caching
  • Consider breaking into smaller queries
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    timeout: 60000, // Increase to 60 seconds
    statementTimeout: 45000
  }
}
Problem: Access denied errorsSolutions:
  • Verify user has required permissions
  • Check table/schema access rights
  • Enable write permissions if needed
  • Review database user grants
  • Check row-level security policies
// Grant appropriate permissions
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    permissions: {
      read: true,
      write: true, // Enable if needed
      delete: false // Explicitly disable dangerous operations
    }
  }
}
Problem: Agent can’t see tables or columnsSolutions:
  • Verify user has permissions to read schema
  • Check search_path for PostgreSQL
  • Specify schemas explicitly
  • Refresh schema cache
// Explicitly specify schemas
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    scope: {
      schemas: ["public", "app_schema"],
      refreshSchema: true
    }
  }
}

Advanced Patterns

Query Result Streaming

Stream large result sets:
dataConnectors: {
  postgres: {
    enabled: true,
    connectionString: process.env.DATABASE_URL,
    streaming: {
      enabled: true,
      batchSize: 1000
    }
  }
}

// Agent streams results in batches

Multi-tenancy

Isolate data by tenant:
const result = await agentbase.runAgent({
  message: "Show customer orders",
  dataConnectors: {
    postgres: {
      enabled: true,
      connectionString: process.env.DATABASE_URL,
      tenantIsolation: {
        enabled: true,
        tenantId: currentTenant.id,
        tenantColumn: "tenant_id"
      }
    }
  }
});

// Agent automatically filters all queries by tenant_id

Change Data Capture

Monitor database changes:
const cdc = await agentbase.createDataConnectorListener({
  connector: "postgres",
  tables: ["orders", "customers"],
  events: ["insert", "update", "delete"],
  callback: async (change) => {
    await agentbase.runAgent({
      message: `Process database change: ${change.operation} on ${change.table}`,
      context: { change }
    });
  }
});

Additional Resources

API Reference

Complete data connectors API documentation

Supported Databases

Full list of supported data sources

Security Guide

Best practices for secure database access
Pro Tip: Use read-only replicas for analytics queries to avoid impacting production database performance. Agent can automatically route queries to appropriate databases.