import Database from 'better-sqlite3';
import { KSyncStorage, KSyncEvent } from '@klastra/ksync';
class SQLiteStorage implements KSyncStorage {
private db: Database.Database;
constructor(filename: string = 'ksync.db') {
this.db = new Database(filename);
this.initializeSchema();
}
private initializeSchema() {
this.db.exec(`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
data TEXT NOT NULL,
timestamp INTEGER NOT NULL,
clientId TEXT NOT NULL,
version INTEGER NOT NULL UNIQUE
);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_version ON events(version);
`);
}
async storeEvent(event: KSyncEvent): Promise<void> {
const stmt = this.db.prepare(`
INSERT INTO events (id, type, data, timestamp, clientId, version)
VALUES (?, ?, ?, ?, ?, ?)
`);
stmt.run(
event.id,
event.type,
JSON.stringify(event.data),
event.timestamp,
event.clientId,
event.version
);
}
async storeEvents(events: KSyncEvent[]): Promise<void> {
const stmt = this.db.prepare(`
INSERT INTO events (id, type, data, timestamp, clientId, version)
VALUES (?, ?, ?, ?, ?, ?)
`);
const transaction = this.db.transaction((events: KSyncEvent[]) => {
for (const event of events) {
stmt.run(
event.id,
event.type,
JSON.stringify(event.data),
event.timestamp,
event.clientId,
event.version
);
}
});
transaction(events);
}
async getEvents(fromVersion?: number): Promise<KSyncEvent[]> {
const stmt = fromVersion
? this.db.prepare('SELECT * FROM events WHERE version > ? ORDER BY version')
: this.db.prepare('SELECT * FROM events ORDER BY version');
const rows = fromVersion ? stmt.all(fromVersion) : stmt.all();
return rows.map(row => ({
id: row.id,
type: row.type,
data: JSON.parse(row.data),
timestamp: row.timestamp,
clientId: row.clientId,
version: row.version,
}));
}
async getEventsByType(type: string, fromVersion?: number): Promise<KSyncEvent[]> {
const stmt = fromVersion
? this.db.prepare('SELECT * FROM events WHERE type = ? AND version > ? ORDER BY version')
: this.db.prepare('SELECT * FROM events WHERE type = ? ORDER BY version');
const rows = fromVersion ? stmt.all(type, fromVersion) : stmt.all(type);
return rows.map(row => ({
id: row.id,
type: row.type,
data: JSON.parse(row.data),
timestamp: row.timestamp,
clientId: row.clientId,
version: row.version,
}));
}
async getLatestVersion(): Promise<number> {
const result = this.db.prepare('SELECT MAX(version) as maxVersion FROM events').get();
return result?.maxVersion || 0;
}
async clear(): Promise<void> {
this.db.exec('DELETE FROM events');
}
async close(): Promise<void> {
this.db.close();
}
}
// Usage
const storage = new SQLiteStorage('myapp.db');
const ksync = createKSync({ storage });