RTDB Overview
Cloudillo’s RTDB (Real-Time Database) provides Firebase-like functionality for structured JSON data with queries, subscriptions, and real-time synchronization. It integrates seamlessly with Cloudillo’s federated architecture while maintaining privacy and user control.
Overview
The RTDB system provides:
- Real-time synchronization: Changes propagate to all connected clients instantly
- Offline support: Works offline, syncs when connection returns
- Collaborative editing: Multiple users can edit the same data concurrently
- Query capabilities: Filter, sort, and paginate data
- WebSocket-based: Efficient, bidirectional communication
- Privacy-focused: Data stored on user’s chosen node
Real-Time Database (RTDB)
The RTDB system provides Firebase-like functionality for structured data:
Technology: redb - Lightweight embedded database (171 KiB package size)
Features:
- JSON document storage
- Query filters (equals, greater than, less than, in, not-in, array-contains, array-contains-any, array-contains-all)
- Sorting and pagination
- Computed values (increment, decrement, multiply, concat, min, max, aggregate, functions)
- Atomic transactions with temporary references
- Real-time subscriptions via WebSocket
- Document locking (soft/advisory and hard/enforced)
- Aggregate queries with groupBy (sum, avg, min, max)
Use Cases:
- User profiles and settings
- Task lists and project management
- E-commerce catalogs
- Analytics and reporting
- Structured forms and surveys
Learn more: RTDB with redb
CRDT Collaborative Editing (Separate System)
Cloudillo also provides a separate CRDT API for collaborative editing:
Technology: Yrs - Rust implementation of Yjs CRDT
Features:
- Conflict-free replicated data types (CRDTs)
- Rich data structures (Text, Map, Array, XML)
- Automatic conflict resolution
- Time-travel and versioning
- Awareness (presence, cursors)
- Yjs ecosystem compatibility
Use Cases:
- Collaborative text editors (Google Docs-like)
- Shared whiteboards
- Real-time collaborative forms
- Collaborative spreadsheets
- Multiplayer game state
Learn more: CRDT Collaborative Editing
Comparison: RTDB vs CRDT
| Feature |
RTDB (redb) |
CRDT (Yrs) |
| Purpose |
Structured data storage |
Concurrent editing |
| Queries |
Rich (filter, sort, paginate, aggregate) |
Limited (document-based) |
| Conflict Resolution |
Last-write-wins + document locking |
Automatic merge (CRDT) |
| Locking |
Soft (advisory) and hard (enforced) |
Not applicable |
| Aggregations |
Server-side (sum, avg, min, max, groupBy) |
Not applicable |
| Best For |
Traditional database needs |
Collaborative editing |
| API Style |
Firebase-like |
Yjs-compatible |
Note: These are separate, complementary systems. Use RTDB for structured data with queries, and CRDT for collaborative editing scenarios.
Core Concept: Database-as-File
Both systems use the same foundational concept: databases/documents are special files in the Cloudillo file system.
How It Works
-
File Metadata (MetaAdapter) stores:
- Database ID, name, owner
- Creation timestamp, last accessed
- Permission rules
- Configuration (max size, retention policy)
-
Database Content (RtdbAdapter or CrdtAdapter) stores:
- Actual data (documents, CRDT state)
- Indexes (for query performance)
- Snapshots (for fast loading)
-
File ID serves as database identifier:
/ws/rtdb/:fileId // WebSocket connection endpoint
Benefits
✅ Natural Integration: Databases managed like files
✅ Permission Reuse: File permissions apply to databases
✅ Federation Ready: Databases can be shared across instances
✅ Content Addressing: Database snapshots are tamper-proof
✅ Discoverable: Find databases through file APIs
Example
// Create database file
const response = await fetch('/api/db', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` },
body: JSON.stringify({
name: 'My Tasks',
type: 'redb', // or 'yrs' for CRDT
permissions: {
public_read: false,
readers: ['bob.example.com'],
writers: ['bob.example.com']
}
})
});
const { fileId } = await response.json();
// fileId: "f1~abc123..."
// Connect to database via WebSocket
const ws = new WebSocket(`wss://cl-o.alice.example.com/ws/rtdb/${fileId}`);
Architecture Overview
Components
┌─────────────────────────────────────────────────────┐
│ Client Application │
│ - JavaScript/TypeScript │
│ - React hooks / Vue composables │
│ - WebSocket connection │
└─────────────────────────────────────────────────────┘
↓ WebSocket
┌─────────────────────────────────────────────────────┐
│ Cloudillo Server │
│ ┌─────────────────────────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ - Authentication │ │
│ │ - Message routing │ │
│ │ - Subscription management │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Database Manager │ │
│ │ - Instance lifecycle (load/evict) │ │
│ │ - Snapshot management │ │
│ │ - Memory limits │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ ↓ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ RtdbAdapter │ │ CrdtAdapter │ │
│ │ (redb) │ │ (Yrs) │ │
│ └─────────────┘ └──────────────┘ │
│ ↓ ↓ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Storage Layer │ │
│ │ - MetaAdapter (metadata) │ │
│ │ - BlobAdapter (snapshots, data) │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Request Flow
Query-Based (redb):
Client → WebSocket message (query/subscribe)
↓
WebSocket Handler → Authenticate
↓
Database Manager → Get or load instance
↓
RtdbAdapter → Execute query
↓
Return results + subscribe to changes
↓
Client receives data + real-time updates
CRDT-Based (Yrs):
Client → WebSocket connection
↓
WebSocket Handler → Authenticate
↓
Database Manager → Get or load instance
↓
Yrs Sync Protocol → Exchange state vectors
↓
Bidirectional updates → Merge with CRDT algorithm
↓
Both clients stay in sync
Permission Model
Connection-Time Access Check
Permissions are checked once at WebSocket connection time using file_access::check_file_access_with_scope(). This function evaluates multiple access sources:
- Scoped tokens: Share links with restricted access
- Ownership: File owner has full access
- Tenant roles: Role-based access within the tenant
- FSHR action tokens: Federation-based file sharing permissions
The result determines whether the connection operates in read_only or read_write mode. Clients can also request a specific access level via the ?access=read or ?access=write query parameter.
Info
There is no per-operation permission check — access level is determined at connection time and applies for the duration of the WebSocket session.
Future: Fine-Grained Permissions
Planned for future releases:
- Per-collection permissions: Different access per table
- Per-document permissions: Filter queries by ownership
- Runtime rules: JavaScript-like expressions evaluated at runtime
- Attribute-based: Permissions based on user attributes
WebSocket Protocol
Both systems (RTDB and CRDT) use WebSocket for real-time communication, though with different protocols:
Connection
const ws = new WebSocket(
`wss://cl-o.example.com/ws/rtdb/${fileId}`,
{
headers: {
'Authorization': `Bearer ${accessToken}`
}
}
);
JSON messages with type field:
// Client → Server
{
"type": "query", // or "subscribe", "create", "update", "delete"
"id": 123, // Request ID for correlation
// ... type-specific fields
}
// Server → Client
{
"type": "queryResult", // or "change", "error"
"id": 123, // Matches request ID
// ... response data
}
Lifecycle
Client connects
↓
Server authenticates
↓
Server loads database instance
↓
Client sends queries/subscriptions
↓
Server sends results + change notifications
↓
Client disconnects
↓
Server cleans up subscriptions
Query-Based RTDB (redb)
- Package size: ~171 KiB
- Database file size: ~50 KiB minimum
- Query speed: ~10,000 queries/second (in-memory)
- Write speed: ~1,000 writes/second
- Connection capacity: ~1,000 concurrent per database
- Memory usage: ~10 MB per active database instance
CRDT-Based (Yrs)
- Sync speed: ~50 ms for typical documents
- Conflict resolution: Automatic, deterministic
- Memory usage: ~5-20 MB per active document
- Update latency: <10 ms for local network
- Scalability: Tested with 100+ concurrent editors
Optimization Strategies
- Snapshots: Periodic full-state saves reduce sync time
- Compression: zstd compression for storage
- Eviction: LRU eviction for inactive databases
- Indexing: Secondary indexes for fast queries
- Batching: Batch updates for efficiency
Storage Strategy
RTDB data is stored directly in per-tenant redb database files. The RtdbAdapter handles persistence through ACID transactions — each write operation is committed atomically.
For details on the storage layout, see RTDB with redb.
Federation Support
Databases can be shared across Cloudillo instances through the file sharing mechanism (FSHR action tokens). Access from remote users is granted via the same check_file_access_with_scope() system used for local access control.
Note
Full database replication (read-only replicas, bidirectional sync) is planned for a future release. Currently, remote users connect directly to the origin instance via WebSocket.
Security Considerations
Authentication
- WebSocket connections require valid access token
- Token validated on connection establishment
- Token can expire during connection (disconnected)
Authorization
- Permissions checked on connection
- Every read/write operation validated
- Subscriptions filtered by permissions
Data Validation
- Schema validation (optional)
- Size limits per database
- Rate limiting per user
- Malicious update detection
Encryption
- TLS/WSS for all connections
- Optional client-side encryption (future)
- Content-addressed snapshots prevent tampering
Choosing Between RTDB and CRDT
Use RTDB (redb) When:
✅ You need structured data with schemas
✅ Complex queries are important (filters, sorts, aggregates)
✅ Computed values and aggregations are needed
✅ Document locking for exclusive editing is required
✅ Traditional database patterns fit your use case
✅ Atomic transactions are required
✅ You want minimal package size
Use CRDT (Yrs) When:
✅ Multiple users edit the same data concurrently
✅ Conflict-free merging is critical
✅ Rich text editing is needed
✅ Offline-first design is important
✅ You want Yjs ecosystem compatibility
✅ Time-travel/versioning is valuable
Can You Use Both?
Yes! Many applications benefit from both:
- Yrs for collaborative document editing
- redb for user profiles, settings, and structured data
Example: A collaborative task management app might use:
- Yrs for the task description (rich text, concurrent editing)
- redb for task metadata (assignee, due date, status)
API Overview
Database Management
POST /api/db # Create database
GET /api/db # List databases
GET /api/db/:fileId # Get metadata
PATCH /api/db/:fileId # Update metadata
DELETE /api/db/:fileId # Delete database
WebSocket Connection
GET /ws/rtdb/:fileId
Upgrade: websocket
Authorization: Bearer <token>
Export/Import
GET /api/db/:fileId/export?format=json
POST /api/db/:fileId/import
Next Steps
redb Implementation
The query-based RTDB uses redb, a lightweight embedded database, to provide Firebase-like functionality with minimal overhead. This approach is ideal for structured data, complex queries, and traditional database operations.
Why redb?
redb is chosen for its exceptional characteristics:
- Tiny footprint: 171 KiB package size (vs. 1+ MB for most databases)
- Pure Rust: Memory-safe, no unsafe code
- ACID transactions: Full transactional guarantees
- Zero-copy reads: Excellent performance
- Embedded: No separate database server
- LMDB-inspired: Proven B-tree architecture
Architecture
Layered Design
┌──────────────────────────────────────┐
│ Client Application (JavaScript) │
│ - Query API │
│ - Subscriptions │
│ - Transactions │
└──────────────────────────────────────┘
↓ WebSocket
┌──────────────────────────────────────┐
│ WebSocket Handler │
│ - Message parsing │
│ - Authentication │
│ - Subscription tracking │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ RtdbAdapter Trait │
│ - transaction() → Transaction │
│ - query(), get() │
│ - subscribe() │
│ - acquire_lock(), release_lock() │
│ - create_index(), stats() │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ redb Implementation │
│ - Key-value storage │
│ - Secondary indexes │
│ - Query execution │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ Real-Time Layer │
│ - tokio::broadcast channels │
│ - Change event propagation │
│ - Subscription filtering │
└──────────────────────────────────────┘
RtdbAdapter Trait
The core interface for database operations. All methods are tenant-aware (tn_id parameter). Write operations go through the separate Transaction trait.
#[async_trait]
pub trait RtdbAdapter: Debug + Send + Sync {
/// Begin a new transaction for write operations
async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
/// Close a database instance, flushing pending changes
async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
/// Query documents with optional filtering, sorting, and pagination
async fn query(&self, tn_id: TnId, db_id: &str, path: &str, opts: QueryOptions)
-> ClResult<Vec<Value>>;
/// Get a single document at a specific path
async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
/// Subscribe to real-time changes (returns a stream of ChangeEvents)
async fn subscribe(&self, tn_id: TnId, db_id: &str, opts: SubscriptionOptions)
-> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
/// Create an index on a field for query performance
async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
-> ClResult<()>;
/// Get database statistics (size, record count, table count)
async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
/// Export all documents from a database
async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
/// Acquire a lock on a document path
async fn acquire_lock(&self, tn_id: TnId, db_id: &str, path: &str,
user_id: &str, mode: LockMode, conn_id: &str) -> ClResult<Option<LockInfo>>;
/// Release a lock on a document path
async fn release_lock(&self, tn_id: TnId, db_id: &str, path: &str,
user_id: &str, conn_id: &str) -> ClResult<()>;
/// Check if a path has an active lock
async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str)
-> ClResult<Option<LockInfo>>;
/// Release all locks held by a specific user (on disconnect)
async fn release_all_locks(&self, tn_id: TnId, db_id: &str,
user_id: &str, conn_id: &str) -> ClResult<()>;
}
Transaction Trait
All write operations (create, update, delete) are performed within a transaction:
#[async_trait]
pub trait Transaction: Send + Sync {
/// Create a new document with auto-generated ID
async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
/// Update an existing document (full replacement)
async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
/// Delete a document at a path
async fn delete(&mut self, path: &str) -> ClResult<()>;
/// Read a document (with read-your-own-writes semantics)
async fn get(&self, path: &str) -> ClResult<Option<Value>>;
/// Commit all changes atomically
async fn commit(&mut self) -> ClResult<()>;
/// Rollback all changes
async fn rollback(&mut self) -> ClResult<()>;
}
Data Model
Collections and Documents
Data is organized into collections containing JSON documents:
database/
├── users/
│ ├── user_001
│ ├── user_002
│ └── ...
├── posts/
│ ├── post_abc
│ ├── post_def
│ └── ...
└── comments/
└── ...
Document Structure
Documents are JSON objects with auto-generated IDs:
{
"_id": "user_001",
"_createdAt": 1738483200,
"_updatedAt": 1738486800,
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"active": true
}
System Fields (auto-managed):
_id: Unique document identifier
_createdAt: Creation timestamp (Unix)
_updatedAt: Last modification timestamp
Path Syntax
Paths use slash-separated segments:
users // Collection
users/user_001 // Specific document
posts/post_abc/comments // Sub-collection
Query System
QueryOptions
pub struct QueryOptions {
pub filter: Option<QueryFilter>,
pub sort: Option<Vec<SortField>>, // Multiple sort fields supported
pub limit: Option<u32>,
pub offset: Option<u32>,
pub aggregate: Option<AggregateOptions>,
}
pub struct SortField {
pub field: String,
pub ascending: bool, // true for ascending, false for descending
}
QueryFilter
QueryFilter is a flat struct (not an enum) where each field is a HashMap<String, Value>. Multiple conditions within the struct are ANDed implicitly — a document must satisfy all specified constraints. All field names use camelCase serialization.
#[serde(rename_all = "camelCase")]
pub struct QueryFilter {
pub equals: HashMap<String, Value>,
pub not_equals: HashMap<String, Value>,
pub greater_than: HashMap<String, Value>,
pub greater_than_or_equal: HashMap<String, Value>,
pub less_than: HashMap<String, Value>,
pub less_than_or_equal: HashMap<String, Value>,
pub in_array: HashMap<String, Vec<Value>>,
pub array_contains: HashMap<String, Value>,
pub not_in_array: HashMap<String, Vec<Value>>,
pub array_contains_any: HashMap<String, Vec<Value>>,
pub array_contains_all: HashMap<String, Vec<Value>>,
}
Info
There are no And/Or combinators — multiple conditions are ANDed implicitly. Each HashMap maps field names to expected values.
Query Examples
Simple query:
{
"type": "query",
"id": 1,
"path": "users",
"filter": {
"equals": { "active": true }
},
"sort": [{ "field": "name", "ascending": true }],
"limit": 50
}
Complex query (multiple conditions are ANDed):
{
"type": "query",
"id": 2,
"path": "posts",
"filter": {
"equals": { "published": true },
"greaterThan": { "views": 100 }
},
"sort": [{ "field": "createdAt", "ascending": false }],
"limit": 20
}
WebSocket Protocol
Message Types
Client → Server
1. Query
{
"type": "query",
"id": 1,
"path": "users",
"filter": { "equals": { "active": true } },
"limit": 50
}
2. Subscribe
{
"type": "subscribe",
"id": 2,
"path": "posts",
"filter": { "equals": { "published": true } }
}
3. Get (single document)
{
"type": "get",
"id": 3,
"path": "users/user_001"
}
4. Transaction (wraps create/update/delete operations)
{
"type": "transaction",
"id": 4,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "Hello", "author": "alice" }
},
{
"type": "update",
"path": "users/alice",
"data": { "postCount": { "$op": "increment", "by": 1 } }
},
{
"type": "replace",
"path": "users/alice",
"data": { "name": "Alice", "role": "admin" }
},
{
"type": "delete",
"path": "posts/post_old"
}
]
}
Info
All write operations (create, update, replace, delete) must be wrapped in a transaction message. There are no standalone write message types. The update operation merges fields into the existing document, while replace does a full document replacement.
5. Lock
{
"type": "lock",
"id": 5,
"path": "todos/task_123",
"mode": "hard"
}
6. Unlock
{
"type": "unlock",
"id": 6,
"path": "todos/task_123"
}
7. Create Index
{
"type": "createIndex",
"id": 7,
"path": "users",
"field": "email"
}
8. Ping
{
"type": "ping",
"id": 8
}
Server → Client
1. Query Result
{
"type": "queryResult",
"id": 1,
"data": [
{ "_id": "user_001", "name": "Alice", "active": true },
{ "_id": "user_002", "name": "Bob", "active": true }
],
"total": 2
}
2. Get Result
{
"type": "getResult",
"id": 3,
"data": { "_id": "user_001", "name": "Alice", "active": true }
}
3. Subscribe Result
{
"type": "subscribeResult",
"id": 2,
"subscriptionId": "sub_abc123",
"data": [
{ "_id": "post_001", "title": "Hello", "published": true }
]
}
4. Change Event
Change events use a single event object with an action field, not a changes array:
{
"type": "change",
"subscriptionId": "sub_abc123",
"event": {
"action": "create",
"path": "posts",
"data": { "_id": "post_002", "title": "New Post", "published": true }
}
}
Possible action values: create, update, delete, lock, unlock, ready
The ready action is sent after the initial subscription data has been delivered.
5. Transaction Result
{
"type": "transactionResult",
"id": 4,
"results": [
{ "ref": "$post", "id": "post_new_001" },
{ "id": "users/alice" },
{ "id": "posts/post_old" }
]
}
6. Lock Result
{
"type": "lockResult",
"id": 5,
"locked": true
}
If the lock is denied:
{
"type": "lockResult",
"id": 5,
"locked": false,
"holder": "bob@example.com",
"mode": "hard"
}
7. Error
{
"type": "error",
"id": 4,
"code": "permission_denied",
"message": "Insufficient permissions to update this document"
}
8. Pong
{
"type": "pong",
"id": 8
}
Real-Time Subscriptions
Subscription Flow
Client sends subscribe message
↓
Server validates permissions
↓
Server creates broadcast channel
↓
Server executes initial query
↓
Server sends subscribeResult with data
↓
Server watches for changes matching filter
↓
On change: Server sends change event
↓
Client updates local state
Implementation
Subscription Structure:
id: Unique subscription identifier
path: Collection path being subscribed to
filter: Optional query filter to match changes
sender: Broadcast channel for sending change events
Notification Algorithm:
Algorithm: Notify Subscribers on Change
Input: db_id, path, change_event
Output: None (side effect: sends to all matching subscribers)
1. For each active subscription:
a. Check if subscription.path matches change path
b. If path matches:
- Evaluate if change data matches subscription filter
- If matches: send change_event through subscriber's broadcast channel
- If no match: skip this subscriber
2. Return
This ensures:
- Only subscribed collections receive notifications
- Filter conditions prevent unnecessary updates
- All matching subscribers notified in parallel via broadcast channels
Change Event Types
ChangeEvent is a tagged enum with #[serde(tag = "action")] serialization:
#[serde(tag = "action", rename_all = "camelCase")]
pub enum ChangeEvent {
Create { path: Box<str>, data: Value },
Update { path: Box<str>, data: Value, old_data: Option<Value> },
Delete { path: Box<str>, old_data: Option<Value> },
Lock { path: Box<str>, data: Value },
Unlock { path: Box<str>, data: Value },
Ready { path: Box<str>, data: Option<Value> },
}
This serializes as {"action": "create", "path": "...", "data": {...}} — the action field determines the variant.
Transactions
Atomic Operations
Transactions ensure multiple operations execute atomically:
{
"type": "transaction",
"id": 10,
"operations": [
{
"type": "update",
"path": "accounts/alice",
"data": { "balance": { "$op": "increment", "by": -100 } }
},
{
"type": "update",
"path": "accounts/bob",
"data": { "balance": { "$op": "increment", "by": 100 } }
}
]
}
Guarantees:
- All operations succeed or all fail
- Intermediate states never visible
- Sequential consistency
Temporary References
Reference documents created within the same transaction:
{
"type": "transaction",
"id": 11,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "My Post", "author": "alice" }
},
{
"type": "create",
"path": "comments",
"data": {
"postId": { "$ref": "$post" },
"text": "First comment!",
"author": "alice"
}
}
]
}
How it works:
- First operation creates post, saves ID as
$post
- Second operation references
$post, replaced with actual ID
- Comment gets correct post ID even though it wasn’t known initially
Document Locking
The RTDB supports document-level locking for exclusive or advisory editing access.
Lock Modes
- Soft lock (advisory): Other clients can still write but receive a notification that the document is locked. Useful for signaling editing intent.
- Hard lock (enforced): The server rejects writes from other clients while the lock is held. Only the lock holder (identified by
conn_id) can modify the document.
Lock/Unlock Messages
Client → Server:
{ "type": "lock", "id": 1, "path": "todos/task_123", "mode": "soft" }
{ "type": "unlock", "id": 2, "path": "todos/task_123" }
Server → Client:
{ "type": "lockResult", "id": 1, "locked": true }
{ "type": "lockResult", "id": 1, "locked": false, "holder": "bob@example.com", "mode": "hard" }
TTL-Based Expiration
Locks expire automatically after a TTL (time-to-live) period. This prevents permanently locked documents when clients disconnect unexpectedly or crash without releasing their locks. The server cleans up expired locks during its periodic maintenance cycle.
Connection-Based Echo Suppression
The server tracks lock ownership by conn_id. When a lock change event is broadcast to subscribers, the originating connection is excluded from the notification (echo suppression), similar to how write operations suppress echoes. This prevents the client that acquired the lock from receiving its own lock notification.
Lock Status in Change Events
Active subscriptions receive lock/unlock events as part of the change stream:
{
"type": "change",
"subscriptionId": "sub_abc123",
"event": {
"action": "lock",
"path": "todos/task_123",
"data": { "holder": "alice@example.com", "mode": "hard" }
}
}
Aggregate Queries
The RTDB supports server-side aggregate computations on query results.
Aggregate Request
Add the aggregate option to a query or subscribe message:
{
"type": "query",
"id": 1,
"path": "tasks",
"filter": { "equals": { "completed": false } },
"aggregate": {
"groupBy": "status",
"ops": [
{ "op": "sum", "field": "hours" },
{ "op": "avg", "field": "hours" }
]
}
}
Aggregate Operations
| Operation |
Description |
sum |
Sum of a numeric field |
avg |
Average of a numeric field |
min |
Minimum value of a field |
max |
Maximum value of a field |
Each group always includes a count of matching documents.
Aggregate Response
{
"type": "queryResult",
"id": 1,
"aggregate": {
"groups": [
{ "group": "todo", "count": 12, "sum_hours": 36, "avg_hours": 3.0 },
{ "group": "in_progress", "count": 5, "sum_hours": 20, "avg_hours": 4.0 }
]
}
}
Incremental Aggregate Subscriptions
When aggregate is used with a subscribe message, the server computes aggregates incrementally. On each change event that affects the subscribed path and filter, the server recalculates the affected groups and sends an updated aggregate snapshot rather than the full document set. This keeps aggregate subscriptions efficient even for large collections.
Computed Values
Field Operations
Modify field values with special operations:
Increment:
{
"views": { "$op": "increment", "by": 1 }
}
Append (to array):
{
"tags": { "$op": "append", "value": "javascript" }
}
Remove (from array):
{
"tags": { "$op": "remove", "value": "draft" }
}
Decrement:
{
"stock": { "$op": "decrement", "by": 1 }
}
Multiply:
{
"price": { "$op": "multiply", "by": 1.1 }
}
Concat (string concatenation):
{
"fullName": { "$op": "concat", "values": ["firstName", " ", "lastName"] }
}
Min (set to minimum of current and given value):
{
"lowestScore": { "$op": "min", "value": 42 }
}
Max (set to maximum of current and given value):
{
"highScore": { "$op": "max", "value": 99 }
}
Set if not exists:
{
"createdAt": { "$op": "setIfNotExists", "value": 1738483200 }
}
Query Operations
Aggregate data within queries:
Count:
{
"type": "query",
"id": 12,
"path": "posts",
"query": { "$query": "count" },
"filter": { "equals": { "published": true } }
}
Sum:
{
"type": "query",
"id": 13,
"path": "orders",
"query": { "$query": "sum", "field": "total" }
}
Average:
{
"type": "query",
"id": 14,
"path": "reviews",
"query": { "$query": "avg", "field": "rating" }
}
Function Operations
Server-side functions for computed values:
Now (current timestamp):
{
"createdAt": { "$fn": "now" }
}
Slugify (URL-safe string):
{
"slug": { "$fn": "slugify", "input": "Hello World!" }
// Results in: "hello-world"
}
Lowercase (convert string to lowercase):
{
"emailNormalized": { "$fn": "lowercase", "input": "Alice@Example.COM" }
// Results in: "alice@example.com"
}
Hash (SHA256):
{
"passwordHash": { "$fn": "hash", "input": "password123" }
}
Indexing
Secondary Indexes
Improve query performance:
Index Definition:
name: Unique name for the index (e.g., “idx_email”)
fields: Vector of field names to index (single or compound)
unique: Boolean flag ensuring no duplicate values (for unique constraints)
Create Index Algorithm:
Algorithm: Create Index
Input: db_id, collection_path, index_definition
Output: Result<()>
1. Validate index definition:
- Check name uniqueness (not duplicate of existing index)
- Verify all fields exist in collection schema
- If unique=true: verify collection has no duplicate values for these fields
2. Build index structure:
- Scan existing documents in collection
- For each document, extract values for indexed fields
- Build index data structure (B-tree for efficient lookups)
3. Store index metadata:
- Save index definition in metadata adapter
- Record index name, fields, and unique flag
4. Return success
This ensures:
- Efficient lookups on indexed fields
- Query optimizer can use index automatically
- Unique constraints enforced at index level
Index Usage
Queries automatically use indexes when available:
{
"type": "query",
"path": "users",
"filter": { "equals": { "email": "alice@example.com" } }
}
// Uses idx_email if available, otherwise full scan
Index Strategies
Single-field indexes:
fields: vec!["email"] // For email lookups
fields: vec!["createdAt"] // For sorting by date
Compound indexes:
fields: vec!["category", "price"] // For category + price queries
Unique indexes:
unique: true // Ensures no duplicates (e.g., email, username)
Client SDK Example
JavaScript/TypeScript
import { RtdbClient } from '@cloudillo/rtdb'
import { getRtdbUrl } from '@cloudillo/core'
// Create RTDB client
const rtdb = new RtdbClient({
dbId: 'my-database-id',
auth: {
getToken: () => bus.accessToken
},
serverUrl: getRtdbUrl(bus.idTag!, 'my-database-id', bus.accessToken!)
})
// Connect
await rtdb.connect()
// Query data
const users = await rtdb.collection('users')
.where('active', '==', true)
.get()
console.log(users.docs.map(doc => doc.data()))
// Subscribe to changes
const unsubscribe = rtdb.collection('posts')
.where('published', '==', true)
.onSnapshot((snapshot) => {
snapshot.docChanges().forEach((change) => {
if (change.type === 'added') {
console.log('New post:', change.doc.data())
}
if (change.type === 'modified') {
console.log('Modified post:', change.doc.data())
}
if (change.type === 'removed') {
console.log('Removed post:', change.doc.id)
}
})
})
// Create document via batch
const batch = rtdb.batch()
batch.create(rtdb.collection('users'), {
name: 'Charlie',
email: 'charlie@example.com',
age: 28
})
const results = await batch.commit()
// Update document via batch
const batch2 = rtdb.batch()
batch2.update(rtdb.ref('users/' + results[0].id), {
age: 29
})
await batch2.commit()
// Cleanup
unsubscribe()
await rtdb.disconnect()
React Example
import { useEffect, useState } from 'react'
import { useAuth } from '@cloudillo/react'
import { RtdbClient } from '@cloudillo/rtdb'
import { getRtdbUrl } from '@cloudillo/core'
interface Task {
title: string
completed: boolean
priority: number
}
function TaskList({ dbId }: { dbId: string }) {
const [auth] = useAuth()
const [tasks, setTasks] = useState<(Task & { id: string })[]>([])
const [rtdb, setRtdb] = useState<RtdbClient | null>(null)
// Initialize RTDB client
useEffect(() => {
if (!auth?.token || !auth?.idTag) return
const client = new RtdbClient({
dbId,
auth: { getToken: () => auth.token },
serverUrl: getRtdbUrl(auth.idTag, dbId, auth.token!)
})
client.connect()
setRtdb(client)
return () => { client.disconnect() }
}, [auth?.token, auth?.idTag, dbId])
// Subscribe to incomplete tasks
useEffect(() => {
if (!rtdb) return
const unsubscribe = rtdb.collection<Task>('tasks')
.where('completed', '==', false)
.onSnapshot((snapshot) => {
setTasks(snapshot.docs.map(doc => ({
id: doc.id,
...doc.data()
})))
})
return () => unsubscribe()
}, [rtdb])
return (
<ul>
{tasks.map(task => (
<li key={task.id}>{task.title} (priority: {task.priority})</li>
))}
</ul>
)
}
Query Optimization
Use indexes:
// Create index for frequently queried fields
await db.createIndex('users', { fields: ['email'], unique: true });
Limit results:
// Always use limit for large collections
const recent = await db.collection('posts')
.orderBy('createdAt', 'desc')
.limit(20)
.get();
Use pagination:
const page1 = await db.collection('posts').limit(20).get();
const page2 = await db.collection('posts').limit(20).offset(20).get();
Subscription Optimization
Filter subscriptions:
// Only subscribe to relevant data
db.collection('messages')
.where('conversationId', '==', conversationId)
.onSnapshot(handler); // Not all messages
Cleanup subscriptions:
// Always unsubscribe when component unmounts
useEffect(() => {
const unsubscribe = db.collection('...').onSnapshot(handler);
return () => unsubscribe();
}, []);
Memory Management
Database instances are managed by the adapter implementation. The RTDB adapter provides a close_db() method to flush pending changes and release resources. Database instances are closed automatically when no more WebSocket connections reference them.
Error Handling
Error Handling
Errors use the common ClResult/ClError system shared across all Cloudillo adapters. The server translates these into WebSocket error messages with appropriate error codes.
Client Handling
try {
await db.collection('users').doc(id).update(data);
} catch (error) {
if (error.code === 'permission_denied') {
console.error('No permission to update');
} else if (error.code === 'document_not_found') {
console.error('Document does not exist');
} else {
console.error('Unknown error:', error);
}
}
Security Best Practices
Permission Rules
// Create database with strict permissions
await fetch('/api/db', {
method: 'POST',
body: JSON.stringify({
name: 'Private Data',
type: 'redb',
permissions: {
public_read: false,
public_write: false,
readers: [], // Only owner can read
writers: [] // Only owner can write
}
})
});
// Validate on client before sending
function validateUser(data) {
if (!data.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)) {
throw new Error('Invalid email');
}
if (!data.name || data.name.length < 2) {
throw new Error('Name too short');
}
return true;
}
if (validateUser(userData)) {
await db.collection('users').add(userData);
}
Data Integrity
Write operations enforce several integrity guarantees:
- Hard locks: When a document has a hard lock, only the lock holder (identified by
conn_id) can modify it
- ACID transactions: All operations within a transaction succeed or fail atomically
- Read-your-own-writes: Transaction-local reads see uncommitted changes from the same transaction
See Also