Overview

kSync provides flexible storage options to persist events locally. The storage layer is responsible for maintaining the event log and ensuring data durability across application restarts.

IndexedDB

Browser-native storage with persistence and efficient querying

Memory

Fast in-memory storage for servers and testing environments

Custom

Implement your own storage backend for specific requirements

Automatic

kSync automatically chooses the best storage for your environment

Storage Implementations

IndexedDB Storage (Browser)

The default storage for browser environments, providing persistent storage across sessions:
import { createKSync, IndexedDBStorage } from '@klastra/ksync';

// Automatic (recommended)
const ksync = createKSync(); // Uses IndexedDB in browsers

// Explicit
const storage = new IndexedDBStorage();
const ksync = createKSync({ storage });
Features:
  • Persistent: Data survives browser restarts and crashes
  • Efficient: Optimized for event querying and retrieval
  • Indexed: Fast lookups by event type, timestamp, and version
  • Transactional: ACID compliance for data integrity
  • Quota Management: Automatic cleanup of old events

Memory Storage (Server/Testing)

Fast in-memory storage ideal for servers and testing:
import { createKSync, MemoryStorage } from '@klastra/ksync';

const storage = new MemoryStorage();
const ksync = createKSync({ storage });
Features:
  • Fast: Blazing fast read/write operations
  • Simple: No setup or configuration required
  • Ephemeral: Data is lost on restart (by design)
  • Testing: Perfect for unit tests and development

Storage Interface

All storage implementations follow the KSyncStorage interface:
interface KSyncStorage {
  // Store a single event
  storeEvent(event: KSyncEvent): Promise<void>;
  
  // Store multiple events (batch operation)
  storeEvents(events: KSyncEvent[]): Promise<void>;
  
  // Retrieve all events
  getEvents(fromVersion?: number): Promise<KSyncEvent[]>;
  
  // Get events by type
  getEventsByType(type: string, fromVersion?: number): Promise<KSyncEvent[]>;
  
  // Get the latest version number
  getLatestVersion(): Promise<number>;
  
  // Clear all events
  clear(): Promise<void>;
  
  // Close the storage connection
  close(): Promise<void>;
}

Custom Storage Implementation

Create your own storage backend for specific requirements:

SQLite Storage Example

import Database from 'better-sqlite3';
import { KSyncStorage, KSyncEvent } from '@klastra/ksync';

class SQLiteStorage implements KSyncStorage {
  private db: Database.Database;

  constructor(filename: string = 'ksync.db') {
    this.db = new Database(filename);
    this.initializeSchema();
  }

  private initializeSchema() {
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS events (
        id TEXT PRIMARY KEY,
        type TEXT NOT NULL,
        data TEXT NOT NULL,
        timestamp INTEGER NOT NULL,
        clientId TEXT NOT NULL,
        version INTEGER NOT NULL UNIQUE
      );
      
      CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
      CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
      CREATE INDEX IF NOT EXISTS idx_events_version ON events(version);
    `);
  }

  async storeEvent(event: KSyncEvent): Promise<void> {
    const stmt = this.db.prepare(`
      INSERT INTO events (id, type, data, timestamp, clientId, version)
      VALUES (?, ?, ?, ?, ?, ?)
    `);
    
    stmt.run(
      event.id,
      event.type,
      JSON.stringify(event.data),
      event.timestamp,
      event.clientId,
      event.version
    );
  }

  async storeEvents(events: KSyncEvent[]): Promise<void> {
    const stmt = this.db.prepare(`
      INSERT INTO events (id, type, data, timestamp, clientId, version)
      VALUES (?, ?, ?, ?, ?, ?)
    `);

    const transaction = this.db.transaction((events: KSyncEvent[]) => {
      for (const event of events) {
        stmt.run(
          event.id,
          event.type,
          JSON.stringify(event.data),
          event.timestamp,
          event.clientId,
          event.version
        );
      }
    });

    transaction(events);
  }

  async getEvents(fromVersion?: number): Promise<KSyncEvent[]> {
    const stmt = fromVersion
      ? this.db.prepare('SELECT * FROM events WHERE version > ? ORDER BY version')
      : this.db.prepare('SELECT * FROM events ORDER BY version');
    
    const rows = fromVersion ? stmt.all(fromVersion) : stmt.all();
    
    return rows.map(row => ({
      id: row.id,
      type: row.type,
      data: JSON.parse(row.data),
      timestamp: row.timestamp,
      clientId: row.clientId,
      version: row.version,
    }));
  }

  async getEventsByType(type: string, fromVersion?: number): Promise<KSyncEvent[]> {
    const stmt = fromVersion
      ? this.db.prepare('SELECT * FROM events WHERE type = ? AND version > ? ORDER BY version')
      : this.db.prepare('SELECT * FROM events WHERE type = ? ORDER BY version');
    
    const rows = fromVersion ? stmt.all(type, fromVersion) : stmt.all(type);
    
    return rows.map(row => ({
      id: row.id,
      type: row.type,
      data: JSON.parse(row.data),
      timestamp: row.timestamp,
      clientId: row.clientId,
      version: row.version,
    }));
  }

  async getLatestVersion(): Promise<number> {
    const result = this.db.prepare('SELECT MAX(version) as maxVersion FROM events').get();
    return result?.maxVersion || 0;
  }

  async clear(): Promise<void> {
    this.db.exec('DELETE FROM events');
  }

  async close(): Promise<void> {
    this.db.close();
  }
}

// Usage
const storage = new SQLiteStorage('myapp.db');
const ksync = createKSync({ storage });

Redis Storage Example

import Redis from 'ioredis';
import { KSyncStorage, KSyncEvent } from '@klastra/ksync';

class RedisStorage implements KSyncStorage {
  private redis: Redis;
  private keyPrefix: string;

  constructor(options: { host?: string; port?: number; keyPrefix?: string } = {}) {
    this.redis = new Redis({
      host: options.host || 'localhost',
      port: options.port || 6379,
    });
    this.keyPrefix = options.keyPrefix || 'ksync:';
  }

  async storeEvent(event: KSyncEvent): Promise<void> {
    const pipeline = this.redis.pipeline();
    
    // Store event data
    pipeline.hset(
      `${this.keyPrefix}events`,
      event.id,
      JSON.stringify(event)
    );
    
    // Add to sorted set for ordering
    pipeline.zadd(
      `${this.keyPrefix}versions`,
      event.version,
      event.id
    );
    
    // Add to type-specific set
    pipeline.sadd(
      `${this.keyPrefix}types:${event.type}`,
      event.id
    );
    
    await pipeline.exec();
  }

  async storeEvents(events: KSyncEvent[]): Promise<void> {
    const pipeline = this.redis.pipeline();
    
    for (const event of events) {
      pipeline.hset(
        `${this.keyPrefix}events`,
        event.id,
        JSON.stringify(event)
      );
      
      pipeline.zadd(
        `${this.keyPrefix}versions`,
        event.version,
        event.id
      );
      
      pipeline.sadd(
        `${this.keyPrefix}types:${event.type}`,
        event.id
      );
    }
    
    await pipeline.exec();
  }

  async getEvents(fromVersion?: number): Promise<KSyncEvent[]> {
    const min = fromVersion ? `(${fromVersion}` : '-inf';
    const eventIds = await this.redis.zrangebyscore(
      `${this.keyPrefix}versions`,
      min,
      '+inf'
    );
    
    if (eventIds.length === 0) return [];
    
    const eventData = await this.redis.hmget(
      `${this.keyPrefix}events`,
      ...eventIds
    );
    
    return eventData
      .filter(data => data !== null)
      .map(data => JSON.parse(data!));
  }

  async getEventsByType(type: string, fromVersion?: number): Promise<KSyncEvent[]> {
    const eventIds = await this.redis.smembers(`${this.keyPrefix}types:${type}`);
    
    if (eventIds.length === 0) return [];
    
    const eventData = await this.redis.hmget(
      `${this.keyPrefix}events`,
      ...eventIds
    );
    
    const events = eventData
      .filter(data => data !== null)
      .map(data => JSON.parse(data!))
      .filter(event => !fromVersion || event.version > fromVersion)
      .sort((a, b) => a.version - b.version);
    
    return events;
  }

  async getLatestVersion(): Promise<number> {
    const result = await this.redis.zrevrange(
      `${this.keyPrefix}versions`,
      0,
      0,
      'WITHSCORES'
    );
    
    return result.length > 0 ? parseInt(result[1]) : 0;
  }

  async clear(): Promise<void> {
    const keys = await this.redis.keys(`${this.keyPrefix}*`);
    if (keys.length > 0) {
      await this.redis.del(...keys);
    }
  }

  async close(): Promise<void> {
    await this.redis.quit();
  }
}

// Usage
const storage = new RedisStorage({
  host: 'localhost',
  port: 6379,
  keyPrefix: 'myapp:ksync:'
});
const ksync = createKSync({ storage });

Storage Configuration

Automatic Storage Selection

kSync automatically selects the best storage for your environment:
const ksync = createKSync(); // Automatic selection

// Browser: Uses IndexedDB
// Node.js/Bun: Uses Memory storage
// Can be overridden with explicit storage option

Explicit Storage Configuration

import { createKSync, IndexedDBStorage, MemoryStorage } from '@klastra/ksync';

// Force IndexedDB (browser only)
const ksync1 = createKSync({
  storage: new IndexedDBStorage({
    dbName: 'myapp-ksync',
    version: 1,
  })
});

// Force Memory storage
const ksync2 = createKSync({
  storage: new MemoryStorage()
});

// Custom storage
const ksync3 = createKSync({
  storage: new SQLiteStorage('myapp.db')
});

Performance Considerations

Event Cleanup

Prevent memory/storage bloat by cleaning up old events:
// IndexedDB automatically cleans up old events
const storage = new IndexedDBStorage({
  maxEvents: 10000,        // Keep max 10k events
  maxAge: 30 * 24 * 60 * 60 * 1000, // Keep events for 30 days
});

// Manual cleanup
await storage.cleanup();

Batch Operations

Use batch operations for better performance:
// ✅ Good - Batch multiple events
const events = [
  { type: 'user-joined', data: { userId: '1' } },
  { type: 'user-joined', data: { userId: '2' } },
  { type: 'user-joined', data: { userId: '3' } },
];

await storage.storeEvents(events);

// ❌ Bad - Individual operations
for (const event of events) {
  await storage.storeEvent(event); // Multiple round trips
}

Indexing Strategy

Design your storage schema for efficient queries:
-- SQLite example
CREATE INDEX idx_events_type_version ON events(type, version);
CREATE INDEX idx_events_timestamp ON events(timestamp);
CREATE INDEX idx_events_client ON events(clientId);

-- For time-based queries
CREATE INDEX idx_events_type_timestamp ON events(type, timestamp);

Storage Migration

Handle storage schema changes gracefully:
class VersionedStorage implements KSyncStorage {
  private currentVersion = 2;
  
  async initialize() {
    const version = await this.getSchemaVersion();
    
    if (version < this.currentVersion) {
      await this.migrate(version, this.currentVersion);
    }
  }
  
  private async migrate(from: number, to: number) {
    console.log(`Migrating storage from v${from} to v${to}`);
    
    if (from === 1 && to === 2) {
      // Add new columns, transform data, etc.
      await this.migrateV1ToV2();
    }
  }
  
  private async migrateV1ToV2() {
    // Migration logic here
  }
}

Best Practices

Next Steps