✨ Feat: Implement support for multiple CCNs
This commit is contained in:
parent
097f02938d
commit
a8ffce918e
7 changed files with 778 additions and 169 deletions
367
index.ts
367
index.ts
|
@ -16,8 +16,9 @@ import {
|
|||
decryptEvent,
|
||||
} from './eventEncryptionDecryption.ts';
|
||||
import {
|
||||
getCCNPrivateKey,
|
||||
getCCNPubkey,
|
||||
createNewCCN,
|
||||
getActiveCCN,
|
||||
getAllCCNs,
|
||||
isAddressableEvent,
|
||||
isArray,
|
||||
isCCNReplaceableEvent,
|
||||
|
@ -115,6 +116,7 @@ export function runMigrations(db: Database, latestVersion: number) {
|
|||
function addEventToDb(
|
||||
decryptedEvent: nostrTools.VerifiedEvent,
|
||||
encryptedEvent: nostrTools.VerifiedEvent,
|
||||
ccnPubkey: string,
|
||||
) {
|
||||
const existingEvent = sql`
|
||||
SELECT * FROM events WHERE id = ${decryptedEvent.id}
|
||||
|
@ -131,6 +133,7 @@ function addEventToDb(
|
|||
WHERE kind = ${decryptedEvent.kind}
|
||||
AND pubkey = ${decryptedEvent.pubkey}
|
||||
AND created_at < ${decryptedEvent.created_at}
|
||||
AND ccn_pubkey = ${ccnPubkey}
|
||||
`(db);
|
||||
}
|
||||
|
||||
|
@ -143,6 +146,7 @@ function addEventToDb(
|
|||
WHERE kind = ${decryptedEvent.kind}
|
||||
AND pubkey = ${decryptedEvent.pubkey}
|
||||
AND created_at < ${decryptedEvent.created_at}
|
||||
AND ccn_pubkey = ${ccnPubkey}
|
||||
AND id IN (
|
||||
SELECT event_id FROM event_tags
|
||||
WHERE tag_name = 'd'
|
||||
|
@ -163,6 +167,7 @@ function addEventToDb(
|
|||
SET replaced = 1
|
||||
WHERE kind = ${decryptedEvent.kind}
|
||||
AND created_at < ${decryptedEvent.created_at}
|
||||
AND ccn_pubkey = ${ccnPubkey}
|
||||
AND id IN (
|
||||
SELECT event_id FROM event_tags
|
||||
WHERE tag_name = 'd'
|
||||
|
@ -176,7 +181,7 @@ function addEventToDb(
|
|||
}
|
||||
|
||||
sql`
|
||||
INSERT INTO events (id, original_id, pubkey, created_at, kind, content, sig, first_seen) VALUES (
|
||||
INSERT INTO events (id, original_id, pubkey, created_at, kind, content, sig, first_seen, ccn_pubkey) VALUES (
|
||||
${decryptedEvent.id},
|
||||
${encryptedEvent.id},
|
||||
${decryptedEvent.pubkey},
|
||||
|
@ -184,7 +189,8 @@ function addEventToDb(
|
|||
${decryptedEvent.kind},
|
||||
${decryptedEvent.content},
|
||||
${decryptedEvent.sig},
|
||||
unixepoch()
|
||||
unixepoch(),
|
||||
${ccnPubkey}
|
||||
)
|
||||
`(db);
|
||||
if (decryptedEvent.tags) {
|
||||
|
@ -225,9 +231,58 @@ function cleanupOldChunks() {
|
|||
sql`DELETE FROM event_chunks WHERE created_at < ${cutoffTime}`(db);
|
||||
}
|
||||
|
||||
async function setupAndSubscribeToExternalEvents() {
|
||||
const ccnPubkey = await getCCNPubkey();
|
||||
let knownOriginalEventsCache: string[] = [];
|
||||
|
||||
function updateKnownEventsCache() {
|
||||
knownOriginalEventsCache = sql`SELECT original_id FROM events`(db).flatMap(
|
||||
(row) => row.original_id,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a subscription event handler for processing encrypted events.
|
||||
* This handler decrypts and adds valid events to the database.
|
||||
* @param database The database instance to use
|
||||
* @returns An event handler function
|
||||
*/
|
||||
function createSubscriptionEventHandler(database: Database) {
|
||||
return async (event: nostrTools.Event) => {
|
||||
if (knownOriginalEventsCache.indexOf(event.id) >= 0) return;
|
||||
if (!nostrTools.verifyEvent(event)) {
|
||||
log.warn('Invalid event received');
|
||||
return;
|
||||
}
|
||||
if (encryptedEventIsInDb(event)) return;
|
||||
try {
|
||||
const decryptedEvent = await decryptEvent(database, event);
|
||||
addEventToDb(decryptedEvent, event, decryptedEvent.ccnPubkey);
|
||||
updateKnownEventsCache();
|
||||
} catch (e) {
|
||||
if (e instanceof EventAlreadyExistsException) return;
|
||||
if (e instanceof ChunkedEventReceived) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes an event to relays, handling both single events and chunked events
|
||||
* @param encryptedEvent The encrypted event or array of chunked events
|
||||
*/
|
||||
async function publishToRelays(
|
||||
encryptedEvent: nostrTools.Event | nostrTools.Event[],
|
||||
): Promise<void> {
|
||||
if (Array.isArray(encryptedEvent)) {
|
||||
for (const chunk of encryptedEvent) {
|
||||
await Promise.any(pool.publish(relays, chunk));
|
||||
}
|
||||
} else {
|
||||
await Promise.any(pool.publish(relays, encryptedEvent));
|
||||
}
|
||||
}
|
||||
|
||||
function setupAndSubscribeToExternalEvents() {
|
||||
const isInitialized = sql`
|
||||
SELECT name FROM sqlite_master WHERE type='table' AND name='migration_history'
|
||||
`(db)[0];
|
||||
|
@ -241,72 +296,23 @@ async function setupAndSubscribeToExternalEvents() {
|
|||
|
||||
runMigrations(db, latestVersion);
|
||||
|
||||
const allCCNs = sql`SELECT pubkey FROM ccns`(db);
|
||||
const ccnPubkeys = allCCNs.map((ccn) => ccn.pubkey);
|
||||
|
||||
pool.subscribeMany(
|
||||
relays,
|
||||
[
|
||||
{
|
||||
'#p': [ccnPubkey],
|
||||
'#p': ccnPubkeys,
|
||||
kinds: [1059],
|
||||
},
|
||||
],
|
||||
{
|
||||
async onevent(event: nostrTools.Event) {
|
||||
if (timer) {
|
||||
timerCleaned = true;
|
||||
clearTimeout(timer);
|
||||
}
|
||||
if (knownOriginalEvents.indexOf(event.id) >= 0) return;
|
||||
if (!nostrTools.verifyEvent(event)) {
|
||||
log.warn('Invalid event received');
|
||||
return;
|
||||
}
|
||||
if (encryptedEventIsInDb(event)) return;
|
||||
try {
|
||||
const decryptedEvent = await decryptEvent(db, event);
|
||||
addEventToDb(decryptedEvent, event);
|
||||
} catch (e) {
|
||||
if (e instanceof EventAlreadyExistsException) return;
|
||||
if (e instanceof ChunkedEventReceived) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
onevent: createSubscriptionEventHandler(db),
|
||||
},
|
||||
);
|
||||
|
||||
let timerCleaned = false;
|
||||
|
||||
const knownOriginalEvents = sql`SELECT original_id FROM events`(db).flatMap(
|
||||
(row) => row.original_id,
|
||||
);
|
||||
|
||||
const timer = setTimeout(async () => {
|
||||
// if nothing is found in 10 seconds, create a new CCN, TODO: change logic
|
||||
const ccnCreationEventTemplate = {
|
||||
kind: 0,
|
||||
content: JSON.stringify({
|
||||
display_name: 'New CCN',
|
||||
name: 'New CCN',
|
||||
bot: true,
|
||||
}),
|
||||
created_at: Math.floor(Date.now() / 1000),
|
||||
tags: [['p', ccnPubkey]],
|
||||
};
|
||||
const ccnCreationEvent = nostrTools.finalizeEvent(
|
||||
ccnCreationEventTemplate,
|
||||
await getCCNPrivateKey(),
|
||||
);
|
||||
const encryptedCCNCreationEvent =
|
||||
await createEncryptedEvent(ccnCreationEvent);
|
||||
if (timerCleaned) return; // in case we get an event before the timer is cleaned
|
||||
if (Array.isArray(encryptedCCNCreationEvent)) {
|
||||
for (const event of encryptedCCNCreationEvent)
|
||||
await Promise.any(pool.publish(relays, event));
|
||||
} else {
|
||||
await Promise.any(pool.publish(relays, encryptedCCNCreationEvent));
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
updateKnownEventsCache();
|
||||
setInterval(cleanupOldChunks, CHUNK_CLEANUP_INTERVAL);
|
||||
}
|
||||
|
||||
|
@ -326,6 +332,49 @@ class UserConnection {
|
|||
this.subscriptions = subscriptions;
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a response to the client
|
||||
* @param responseArray The response array to send
|
||||
*/
|
||||
sendResponse(responseArray: unknown[]): void {
|
||||
this.socket.send(JSON.stringify(responseArray));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a notice to the client
|
||||
* @param message The message to send
|
||||
*/
|
||||
sendNotice(message: string): void {
|
||||
this.sendResponse(['NOTICE', message]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an event to the client
|
||||
* @param subscriptionId The subscription ID
|
||||
* @param event The event to send
|
||||
*/
|
||||
sendEvent(subscriptionId: string, event: NostrEvent): void {
|
||||
this.sendResponse(['EVENT', subscriptionId, event]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an end of stored events message
|
||||
* @param subscriptionId The subscription ID
|
||||
*/
|
||||
sendEOSE(subscriptionId: string): void {
|
||||
this.sendResponse(['EOSE', subscriptionId]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an OK response
|
||||
* @param eventId The event ID
|
||||
* @param success Whether the operation was successful
|
||||
* @param message The message to send
|
||||
*/
|
||||
sendOK(eventId: string, success: boolean, message: string): void {
|
||||
this.sendResponse(['OK', eventId, success, message]);
|
||||
}
|
||||
}
|
||||
|
||||
function filtersMatchingEvent(
|
||||
|
@ -370,7 +419,13 @@ function handleRequest(connection: UserConnection, request: NostrClientREQ) {
|
|||
)}`,
|
||||
);
|
||||
|
||||
let query = sqlPartial`SELECT * FROM events WHERE replaced = 0`;
|
||||
const activeCCN = getActiveCCN(connection.db);
|
||||
if (!activeCCN) {
|
||||
connection.sendNotice('No active CCN found');
|
||||
return log.warn('No active CCN found');
|
||||
}
|
||||
|
||||
let query = sqlPartial`SELECT * FROM events WHERE replaced = 0 AND ccn_pubkey = ${activeCCN.pubkey}`;
|
||||
|
||||
const filtersAreNotEmpty = filters.some((filter) => {
|
||||
return Object.values(filter).some((value) => {
|
||||
|
@ -593,9 +648,9 @@ function handleRequest(connection: UserConnection, request: NostrClientREQ) {
|
|||
sig: events[i].sig,
|
||||
};
|
||||
|
||||
connection.socket.send(JSON.stringify(['EVENT', subscriptionId, event]));
|
||||
connection.sendEvent(subscriptionId, event);
|
||||
}
|
||||
connection.socket.send(JSON.stringify(['EOSE', subscriptionId]));
|
||||
connection.sendEOSE(subscriptionId);
|
||||
|
||||
connection.subscriptions.set(subscriptionId, filters);
|
||||
}
|
||||
|
@ -606,20 +661,24 @@ async function handleEvent(
|
|||
) {
|
||||
const valid = nostrTools.verifyEvent(event);
|
||||
if (!valid) {
|
||||
connection.socket.send(JSON.stringify(['NOTICE', 'Invalid event']));
|
||||
connection.sendNotice('Invalid event');
|
||||
return log.warn('Invalid event');
|
||||
}
|
||||
|
||||
const encryptedEvent = await createEncryptedEvent(event);
|
||||
const activeCCN = getActiveCCN(connection.db);
|
||||
if (!activeCCN) {
|
||||
connection.sendNotice('No active CCN found');
|
||||
return log.warn('No active CCN found');
|
||||
}
|
||||
|
||||
const encryptedEvent = await createEncryptedEvent(event, connection.db);
|
||||
try {
|
||||
if (Array.isArray(encryptedEvent)) {
|
||||
await Promise.all(
|
||||
encryptedEvent.map((chunk) => Promise.any(pool.publish(relays, chunk))),
|
||||
);
|
||||
addEventToDb(event, encryptedEvent[0]);
|
||||
await publishToRelays(encryptedEvent);
|
||||
addEventToDb(event, encryptedEvent[0], activeCCN.pubkey);
|
||||
} else {
|
||||
addEventToDb(event, encryptedEvent);
|
||||
await Promise.any(pool.publish(relays, encryptedEvent));
|
||||
addEventToDb(event, encryptedEvent, activeCCN.pubkey);
|
||||
await publishToRelays(encryptedEvent);
|
||||
}
|
||||
} catch (e) {
|
||||
if (e instanceof EventAlreadyExistsException) {
|
||||
|
@ -628,13 +687,13 @@ async function handleEvent(
|
|||
}
|
||||
}
|
||||
|
||||
connection.socket.send(JSON.stringify(['OK', event.id, true, 'Event added']));
|
||||
connection.sendOK(event.id, true, 'Event added');
|
||||
|
||||
const filtersThatMatchEvent = filtersMatchingEvent(event, connection);
|
||||
|
||||
for (let i = 0; i < filtersThatMatchEvent.length; i++) {
|
||||
const filter = filtersThatMatchEvent[i];
|
||||
connection.socket.send(JSON.stringify(['EVENT', filter, event]));
|
||||
connection.sendEvent(filter, event);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -648,6 +707,153 @@ function handleClose(connection: UserConnection, subscriptionId: string) {
|
|||
connection.subscriptions.delete(subscriptionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Activates a CCN by setting it as the active one in the database
|
||||
* @param database The database instance to use
|
||||
* @param pubkey The public key of the CCN to activate
|
||||
*/
|
||||
function activateCCN(database: Database, pubkey: string): void {
|
||||
sql`UPDATE ccns SET is_active = 0`(database);
|
||||
sql`UPDATE ccns SET is_active = 1 WHERE pubkey = ${pubkey}`(database);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles errors in socket operations, logs them and sends a notification to the client
|
||||
* @param connection The WebSocket connection
|
||||
* @param operation The operation that failed
|
||||
* @param error The error that occurred
|
||||
*/
|
||||
function handleSocketError(
|
||||
connection: UserConnection,
|
||||
operation: string,
|
||||
error: unknown,
|
||||
): void {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
log.error(`Error ${operation}: ${errorMessage}`);
|
||||
connection.sendNotice(`Failed to ${operation}`);
|
||||
}
|
||||
|
||||
async function handleCreateCCN(
|
||||
connection: UserConnection,
|
||||
data: { name: string; seed?: string },
|
||||
): Promise<void> {
|
||||
try {
|
||||
if (!data.name || typeof data.name !== 'string') {
|
||||
connection.sendNotice('Name is required');
|
||||
return;
|
||||
}
|
||||
|
||||
const newCcn = await createNewCCN(connection.db, data.name, data.seed);
|
||||
|
||||
activateCCN(connection.db, newCcn.pubkey);
|
||||
|
||||
pool.subscribeMany(
|
||||
relays,
|
||||
[
|
||||
{
|
||||
'#p': [newCcn.pubkey],
|
||||
kinds: [1059],
|
||||
},
|
||||
],
|
||||
{
|
||||
onevent: createSubscriptionEventHandler(connection.db),
|
||||
},
|
||||
);
|
||||
|
||||
connection.sendResponse([
|
||||
'OK',
|
||||
'CCN CREATED',
|
||||
true,
|
||||
JSON.stringify({
|
||||
pubkey: newCcn.pubkey,
|
||||
name: data.name,
|
||||
}),
|
||||
]);
|
||||
|
||||
log.info(`CCN created: ${data.name}`);
|
||||
} catch (error: unknown) {
|
||||
handleSocketError(connection, 'create CCN', error);
|
||||
}
|
||||
}
|
||||
|
||||
function handleGetCCNs(connection: UserConnection): void {
|
||||
try {
|
||||
const ccns = getAllCCNs(connection.db);
|
||||
connection.sendResponse(['OK', 'CCN LIST', true, JSON.stringify(ccns)]);
|
||||
} catch (error: unknown) {
|
||||
handleSocketError(connection, 'get CCNs', error);
|
||||
}
|
||||
}
|
||||
|
||||
function handleActivateCCN(
|
||||
connection: UserConnection,
|
||||
data: { pubkey: string },
|
||||
): void {
|
||||
try {
|
||||
if (!data.pubkey || typeof data.pubkey !== 'string') {
|
||||
connection.sendNotice('CCN pubkey is required');
|
||||
return;
|
||||
}
|
||||
|
||||
const ccnExists = sql`
|
||||
SELECT COUNT(*) as count FROM ccns WHERE pubkey = ${data.pubkey}
|
||||
`(connection.db)[0].count;
|
||||
|
||||
if (ccnExists === 0) {
|
||||
connection.sendNotice('CCN not found');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const subscriptionId of connection.subscriptions.keys()) {
|
||||
connection.sendResponse([
|
||||
'CLOSED',
|
||||
subscriptionId,
|
||||
'Subscription closed due to CCN activation',
|
||||
]);
|
||||
}
|
||||
|
||||
connection.subscriptions.clear();
|
||||
log.info('All subscriptions cleared due to CCN activation');
|
||||
|
||||
activateCCN(connection.db, data.pubkey);
|
||||
|
||||
const activatedCCN = sql`
|
||||
SELECT pubkey, name FROM ccns WHERE pubkey = ${data.pubkey}
|
||||
`(connection.db)[0];
|
||||
|
||||
connection.sendResponse([
|
||||
'OK',
|
||||
'CCN ACTIVATED',
|
||||
true,
|
||||
JSON.stringify(activatedCCN),
|
||||
]);
|
||||
|
||||
log.info(`CCN activated: ${activatedCCN.name}`);
|
||||
} catch (error: unknown) {
|
||||
handleSocketError(connection, 'activate CCN', error);
|
||||
}
|
||||
}
|
||||
|
||||
function handleCCNCommands(
|
||||
connection: UserConnection,
|
||||
command: string,
|
||||
data: unknown,
|
||||
) {
|
||||
switch (command) {
|
||||
case 'CREATE':
|
||||
return handleCreateCCN(
|
||||
connection,
|
||||
data as { name: string; seed?: string },
|
||||
);
|
||||
case 'LIST':
|
||||
return handleGetCCNs(connection);
|
||||
case 'ACTIVATE':
|
||||
return handleActivateCCN(connection, data as { pubkey: string });
|
||||
default:
|
||||
return log.warn('Invalid CCN command');
|
||||
}
|
||||
}
|
||||
|
||||
Deno.serve({
|
||||
port: 6942,
|
||||
handler: (request) => {
|
||||
|
@ -672,14 +878,16 @@ Deno.serve({
|
|||
const data = JSON.parse(event.data);
|
||||
if (!isArray(data)) return log.warn('Invalid request');
|
||||
|
||||
const msg = n.clientMsg().parse(data);
|
||||
switch (msg[0]) {
|
||||
const msgType = data[0];
|
||||
switch (msgType) {
|
||||
case 'REQ':
|
||||
return handleRequest(connection, n.clientREQ().parse(data));
|
||||
case 'EVENT':
|
||||
return handleEvent(connection, n.clientEVENT().parse(data)[1]);
|
||||
case 'CLOSE':
|
||||
return handleClose(connection, n.clientCLOSE().parse(data)[1]);
|
||||
case 'CCN':
|
||||
return handleCCNCommands(connection, data[1] as string, data[2]);
|
||||
default:
|
||||
return log.warn('Invalid request');
|
||||
}
|
||||
|
@ -688,6 +896,11 @@ Deno.serve({
|
|||
|
||||
return response;
|
||||
}
|
||||
return new Response('Eve Relay');
|
||||
return new Response(
|
||||
Deno.readTextFileSync(`${import.meta.dirname}/public/landing.html`),
|
||||
{
|
||||
headers: { 'Content-Type': 'text/html' },
|
||||
},
|
||||
);
|
||||
},
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue