import { randomBytes } from '@noble/ciphers/webcrypto'; import * as nostrTools from '@nostr/tools'; import { Database } from 'jsr:@db/sqlite'; import { NSchema as n } from 'jsr:@nostrify/nostrify'; import type { NostrClientREQ, NostrEvent, NostrFilter, } from 'jsr:@nostrify/types'; import { encodeBase64 } from 'jsr:@std/encoding@0.224/base64'; import { CHUNK_CLEANUP_INTERVAL, CHUNK_MAX_AGE } from './consts.ts'; import { ChunkedEventReceived, EventAlreadyExistsException, createEncryptedEvent, decryptEvent, } from './eventEncryptionDecryption.ts'; import { getCCNPrivateKey, getCCNPubkey, isAddressableEvent, isArray, isCCNReplaceableEvent, isLocalhost, isReplaceableEvent, isValidJSON, parseATagQuery, } from './utils.ts'; import { getEveFilePath } from './utils/files.ts'; import { log, setupLogger } from './utils/logs.ts'; import { mixQuery, sql, sqlPartial } from './utils/queries.ts'; await setupLogger(); if (!Deno.env.has('ENCRYPTION_KEY')) { log.error( `Missing ENCRYPTION_KEY. Please set it in your env.\nA new one has been generated for you: ENCRYPTION_KEY="${encodeBase64( randomBytes(32), )}"`, ); Deno.exit(1); } const db = new Database(await getEveFilePath('db')); const pool = new nostrTools.SimplePool(); const relays = [ 'wss://relay.arx-ccn.com/', 'wss://relay.dannymorabito.com/', 'wss://nos.lol/', 'wss://nostr.einundzwanzig.space/', 'wss://nostr.massmux.com/', 'wss://nostr.mom/', 'wss://nostr.wine/', 'wss://purplerelay.com/', 'wss://relay.damus.io/', 'wss://relay.goodmorningbitcoin.com/', 'wss://relay.lexingtonbitcoin.org/', 'wss://relay.nostr.band/', 'wss://relay.primal.net/', 'wss://relay.snort.social/', 'wss://strfry.iris.to/', 'wss://cache2.primal.net/v1', ]; export function runMigrations(db: Database, latestVersion: number) { const migrations = [...Deno.readDirSync(`${import.meta.dirname}/migrations`)]; migrations.sort((a, b) => { const aVersion = Number.parseInt(a.name.split('-')[0], 10); const bVersion = Number.parseInt(b.name.split('-')[0], 10); return aVersion - bVersion; }); for (const migrationFile of migrations) { const migrationVersion = Number.parseInt( migrationFile.name.split('-')[0], 10, ); if (migrationVersion > latestVersion) { log.info( `Running migration ${migrationFile.name} (version ${migrationVersion})`, ); const start = Date.now(); const migrationSql = Deno.readTextFileSync( `${import.meta.dirname}/migrations/${migrationFile.name}`, ); db.run('BEGIN TRANSACTION'); try { db.run(migrationSql); const end = Date.now(); const durationMs = end - start; sql` INSERT INTO migration_history (migration_version, migration_name, executed_at, duration_ms, status) VALUES (${migrationVersion}, ${migrationFile.name}, ${new Date().toISOString()}, ${durationMs}, 'success'); db.run("COMMIT TRANSACTION"); `(db); } catch (e) { db.run('ROLLBACK TRANSACTION'); const error = e instanceof Error ? e : typeof e === 'string' ? new Error(e) : new Error(JSON.stringify(e)); const end = Date.now(); const durationMs = end - start; sql` INSERT INTO migration_history (migration_version, migration_name, executed_at, duration_ms, status, error_message) VALUES (${migrationVersion}, ${migrationFile.name}, ${new Date().toISOString()}, ${durationMs}, 'failed', ${error.message}); `(db); throw e; } db.run('END TRANSACTION'); } } } function addEventToDb( decryptedEvent: nostrTools.VerifiedEvent, encryptedEvent: nostrTools.VerifiedEvent, ) { const existingEvent = sql` SELECT * FROM events WHERE id = ${decryptedEvent.id} `(db)[0]; if (existingEvent) throw new EventAlreadyExistsException(); try { db.run('BEGIN TRANSACTION'); if (isReplaceableEvent(decryptedEvent.kind)) { sql` UPDATE events SET replaced = 1 WHERE kind = ${decryptedEvent.kind} AND pubkey = ${decryptedEvent.pubkey} AND created_at < ${decryptedEvent.created_at} `(db); } if (isAddressableEvent(decryptedEvent.kind)) { const dTag = decryptedEvent.tags.find((tag) => tag[0] === 'd')?.[1]; if (dTag) { sql` UPDATE events SET replaced = 1 WHERE kind = ${decryptedEvent.kind} AND pubkey = ${decryptedEvent.pubkey} AND created_at < ${decryptedEvent.created_at} AND id IN ( SELECT event_id FROM event_tags WHERE tag_name = 'd' AND tag_id IN ( SELECT tag_id FROM event_tags_values WHERE value_position = 1 AND value = ${dTag} ) ) `(db); } } if (isCCNReplaceableEvent(decryptedEvent.kind)) { const dTag = decryptedEvent.tags.find((tag) => tag[0] === 'd')?.[1]; sql` UPDATE events SET replaced = 1 WHERE kind = ${decryptedEvent.kind} AND created_at < ${decryptedEvent.created_at} AND id IN ( SELECT event_id FROM event_tags WHERE tag_name = 'd' AND tag_id IN ( SELECT tag_id FROM event_tags_values WHERE value_position = 1 AND value = ${dTag} ) ) `(db); } sql` INSERT INTO events (id, original_id, pubkey, created_at, kind, content, sig, first_seen) VALUES ( ${decryptedEvent.id}, ${encryptedEvent.id}, ${decryptedEvent.pubkey}, ${decryptedEvent.created_at}, ${decryptedEvent.kind}, ${decryptedEvent.content}, ${decryptedEvent.sig}, unixepoch() ) `(db); if (decryptedEvent.tags) { for (let i = 0; i < decryptedEvent.tags.length; i++) { const tag = sql` INSERT INTO event_tags(event_id, tag_name, tag_index) VALUES ( ${decryptedEvent.id}, ${decryptedEvent.tags[i][0]}, ${i} ) RETURNING tag_id `(db)[0]; for (let j = 1; j < decryptedEvent.tags[i].length; j++) { sql` INSERT INTO event_tags_values(tag_id, value_position, value) VALUES ( ${tag.tag_id}, ${j}, ${decryptedEvent.tags[i][j]} ) `(db); } } } db.run('COMMIT TRANSACTION'); } catch (e) { db.run('ROLLBACK TRANSACTION'); throw e; } } function encryptedEventIsInDb(event: nostrTools.VerifiedEvent) { return sql` SELECT * FROM events WHERE original_id = ${event.id} `(db)[0]; } function cleanupOldChunks() { const cutoffTime = Math.floor((Date.now() - CHUNK_MAX_AGE) / 1000); sql`DELETE FROM event_chunks WHERE created_at < ${cutoffTime}`(db); } async function setupAndSubscribeToExternalEvents() { const ccnPubkey = await getCCNPubkey(); const isInitialized = sql` SELECT name FROM sqlite_master WHERE type='table' AND name='migration_history' `(db)[0]; if (!isInitialized) runMigrations(db, -1); const latestVersion = sql` SELECT migration_version FROM migration_history WHERE status = 'success' ORDER BY migration_version DESC LIMIT 1 `(db)[0]?.migration_version ?? -1; runMigrations(db, latestVersion); pool.subscribeMany( relays, [ { '#p': [ccnPubkey], kinds: [1059], }, ], { async onevent(event: nostrTools.Event) { if (timer) { timerCleaned = true; clearTimeout(timer); } if (knownOriginalEvents.indexOf(event.id) >= 0) return; if (!nostrTools.verifyEvent(event)) { log.warn('Invalid event received'); return; } if (encryptedEventIsInDb(event)) return; try { const decryptedEvent = await decryptEvent(db, event); addEventToDb(decryptedEvent, event); } catch (e) { if (e instanceof EventAlreadyExistsException) return; if (e instanceof ChunkedEventReceived) { return; } } }, }, ); let timerCleaned = false; const knownOriginalEvents = sql`SELECT original_id FROM events`(db).flatMap( (row) => row.original_id, ); const timer = setTimeout(async () => { // if nothing is found in 10 seconds, create a new CCN, TODO: change logic const ccnCreationEventTemplate = { kind: 0, content: JSON.stringify({ display_name: 'New CCN', name: 'New CCN', bot: true, }), created_at: Math.floor(Date.now() / 1000), tags: [['p', ccnPubkey]], }; const ccnCreationEvent = nostrTools.finalizeEvent( ccnCreationEventTemplate, await getCCNPrivateKey(), ); const encryptedCCNCreationEvent = await createEncryptedEvent(ccnCreationEvent); if (timerCleaned) return; // in case we get an event before the timer is cleaned if (Array.isArray(encryptedCCNCreationEvent)) { for (const event of encryptedCCNCreationEvent) await Promise.any(pool.publish(relays, event)); } else { await Promise.any(pool.publish(relays, encryptedCCNCreationEvent)); } }, 10000); setInterval(cleanupOldChunks, CHUNK_CLEANUP_INTERVAL); } await setupAndSubscribeToExternalEvents(); class UserConnection { public socket: WebSocket; public subscriptions: Map; public db: Database; constructor( socket: WebSocket, subscriptions: Map, db: Database, ) { this.socket = socket; this.subscriptions = subscriptions; this.db = db; } } function filtersMatchingEvent( event: NostrEvent, connection: UserConnection, ): string[] { const matching = []; for (const subscription of connection.subscriptions.keys()) { const filters = connection.subscriptions.get(subscription); if (!filters) continue; const isMatching = filters.every((filter) => Object.entries(filter).every(([type, value]) => { if (type === 'ids') return value.includes(event.id); if (type === 'kinds') return value.includes(event.kind); if (type === 'authors') return value.includes(event.pubkey); if (type === 'since') return event.created_at >= value; if (type === 'until') return event.created_at <= value; if (type === 'limit') return event.created_at <= value; if (type.startsWith('#')) { const tagName = type.slice(1); return event.tags.some( (tag: string[]) => tag[0] === tagName && value.includes(tag[1]), ); } return false; }), ); if (isMatching) matching.push(subscription); } return matching; } function handleRequest(connection: UserConnection, request: NostrClientREQ) { const [, subscriptionId, ...filters] = request; if (connection.subscriptions.has(subscriptionId)) { return log.warn('Duplicate subscription ID'); } log.info( `New subscription: ${subscriptionId} with filters: ${JSON.stringify( filters, )}`, ); let query = sqlPartial`SELECT * FROM events WHERE replaced = 0`; const filtersAreNotEmpty = filters.some((filter) => { return Object.values(filter).some((value) => { return value.length > 0; }); }); if (filtersAreNotEmpty) { query = mixQuery(query, sqlPartial`AND`); for (let i = 0; i < filters.length; i++) { // filters act as OR, filter groups act as AND query = mixQuery(query, sqlPartial`(`); const filter = Object.entries(filters[i]).filter(([type, value]) => { if (type === 'ids') return value.length > 0; if (type === 'authors') return value.length > 0; if (type === 'kinds') return value.length > 0; if (type.startsWith('#')) return value.length > 0; if (type === 'since') return value > 0; if (type === 'until') return value > 0; return false; }); for (let j = 0; j < filter.length; j++) { const [type, value] = filter[j]; if (type === 'ids') { const uniqueIds = [...new Set(value)]; query = mixQuery(query, sqlPartial`id IN (`); for (let k = 0; k < uniqueIds.length; k++) { const id = uniqueIds[k] as string; query = mixQuery(query, sqlPartial`${id}`); if (k < uniqueIds.length - 1) { query = mixQuery(query, sqlPartial`,`); } } query = mixQuery(query, sqlPartial`)`); } if (type === 'authors') { const uniqueAuthors = [...new Set(value)]; query = mixQuery(query, sqlPartial`pubkey IN (`); for (let k = 0; k < uniqueAuthors.length; k++) { const author = uniqueAuthors[k] as string; query = mixQuery(query, sqlPartial`${author}`); if (k < uniqueAuthors.length - 1) { query = mixQuery(query, sqlPartial`,`); } } query = mixQuery(query, sqlPartial`)`); } if (type === 'kinds') { const uniqueKinds = [...new Set(value)]; query = mixQuery(query, sqlPartial`kind IN (`); for (let k = 0; k < uniqueKinds.length; k++) { const kind = uniqueKinds[k] as number; query = mixQuery(query, sqlPartial`${kind}`); if (k < uniqueKinds.length - 1) { query = mixQuery(query, sqlPartial`,`); } } query = mixQuery(query, sqlPartial`)`); } if (type.startsWith('#')) { const tag = type.slice(1); const uniqueValues = [...new Set(value)]; query = mixQuery(query, sqlPartial`(`); for (let k = 0; k < uniqueValues.length; k++) { const tagValue = uniqueValues[k] as string; if (tag === 'a') { const aTagInfo = parseATagQuery(tagValue); if (aTagInfo.dTag && aTagInfo.dTag !== '') { if (isCCNReplaceableEvent(aTagInfo.kind)) { // CCN replaceable event reference query = mixQuery( query, sqlPartial`id IN ( SELECT e.id FROM events e JOIN event_tags t ON e.id = t.event_id JOIN event_tags_values v ON t.tag_id = v.tag_id WHERE e.kind = ${aTagInfo.kind} AND t.tag_name = 'd' AND v.value_position = 1 AND v.value = ${aTagInfo.dTag} )`, ); } else { // Addressable event reference query = mixQuery( query, sqlPartial`id IN ( SELECT e.id FROM events e JOIN event_tags t ON e.id = t.event_id JOIN event_tags_values v ON t.tag_id = v.tag_id WHERE e.kind = ${aTagInfo.kind} AND e.pubkey = ${aTagInfo.pubkey} AND t.tag_name = 'd' AND v.value_position = 1 AND v.value = ${aTagInfo.dTag} )`, ); } } else { // Replaceable event reference query = mixQuery( query, sqlPartial`id IN ( SELECT id FROM events WHERE kind = ${aTagInfo.kind} AND pubkey = ${aTagInfo.pubkey} )`, ); } } else { // Regular tag handling (unchanged) query = mixQuery( query, sqlPartial`id IN ( SELECT t.event_id FROM event_tags t WHERE t.tag_name = ${tag} AND t.tag_id IN ( SELECT v.tag_id FROM event_tags_values v WHERE v.value_position = 1 AND v.value = ${tagValue} ) )`, ); } if (k < uniqueValues.length - 1) { query = mixQuery(query, sqlPartial`OR`); } } query = mixQuery(query, sqlPartial`)`); } if (type === 'since') { query = mixQuery(query, sqlPartial`created_at >= ${value}`); } if (type === 'until') { query = mixQuery(query, sqlPartial`created_at <= ${value}`); } if (j < filter.length - 1) query = mixQuery(query, sqlPartial`AND`); } query = mixQuery(query, sqlPartial`)`); if (i < filters.length - 1) query = mixQuery(query, sqlPartial`OR`); } } query = mixQuery(query, sqlPartial`ORDER BY created_at ASC`); log.debug(query.query, ...query.values); const events = connection.db.prepare(query.query).all(...query.values); for (let i = 0; i < events.length; i++) { const rawTags = sql`SELECT * FROM event_tags_view WHERE event_id = ${ events[i].id }`(connection.db); const tagsByIndex = new Map< number, { name: string; values: Map; } >(); for (const tag of rawTags) { let tagData = tagsByIndex.get(tag.tag_index); if (!tagData) { tagData = { name: tag.tag_name, values: new Map(), }; tagsByIndex.set(tag.tag_index, tagData); } tagData.values.set(tag.tag_value_position, tag.tag_value); } const tagsArray = Array.from(tagsByIndex.entries()) .sort(([indexA], [indexB]) => indexA - indexB) .map(([_, tagData]) => { const { name, values } = tagData; return [ name, ...Array.from(values.entries()) .sort(([posA], [posB]) => posA - posB) .map(([_, value]) => value), ]; }); const event = { id: events[i].id, pubkey: events[i].pubkey, created_at: events[i].created_at, kind: events[i].kind, tags: tagsArray, content: events[i].content, sig: events[i].sig, }; connection.socket.send(JSON.stringify(['EVENT', subscriptionId, event])); } connection.socket.send(JSON.stringify(['EOSE', subscriptionId])); connection.subscriptions.set(subscriptionId, filters); } async function handleEvent( connection: UserConnection, event: nostrTools.Event, ) { const valid = nostrTools.verifyEvent(event); if (!valid) { connection.socket.send(JSON.stringify(['NOTICE', 'Invalid event'])); return log.warn('Invalid event'); } const encryptedEvent = await createEncryptedEvent(event); try { if (Array.isArray(encryptedEvent)) { await Promise.all( encryptedEvent.map((chunk) => Promise.any(pool.publish(relays, chunk))), ); addEventToDb(event, encryptedEvent[0]); } else { addEventToDb(event, encryptedEvent); await Promise.any(pool.publish(relays, encryptedEvent)); } } catch (e) { if (e instanceof EventAlreadyExistsException) { log.warn('Event already exists'); return; } } connection.socket.send(JSON.stringify(['OK', event.id, true, 'Event added'])); const filtersThatMatchEvent = filtersMatchingEvent(event, connection); for (let i = 0; i < filtersThatMatchEvent.length; i++) { const filter = filtersThatMatchEvent[i]; connection.socket.send(JSON.stringify(['EVENT', filter, event])); } } function handleClose(connection: UserConnection, subscriptionId: string) { if (!connection.subscriptions.has(subscriptionId)) { return log.warn( `Closing unknown subscription? That's weird. Subscription ID: ${subscriptionId}`, ); } connection.subscriptions.delete(subscriptionId); } Deno.serve({ port: 6942, handler: (request) => { if (request.headers.get('upgrade') === 'websocket') { if (!isLocalhost(request)) { return new Response( 'Forbidden. Please read the Arx-CCN documentation for more information on how to interact with the relay.', { status: 403 }, ); } const { socket, response } = Deno.upgradeWebSocket(request); const connection = new UserConnection(socket, new Map(), db); socket.onopen = () => log.info('User connected'); socket.onmessage = (event) => { log.debug(`Received: ${event.data}`); if (typeof event.data !== 'string' || !isValidJSON(event.data)) { return log.warn('Invalid request'); } const data = JSON.parse(event.data); if (!isArray(data)) return log.warn('Invalid request'); const msg = n.clientMsg().parse(data); switch (msg[0]) { case 'REQ': return handleRequest(connection, n.clientREQ().parse(data)); case 'EVENT': return handleEvent(connection, n.clientEVENT().parse(data)[1]); case 'CLOSE': return handleClose(connection, n.clientCLOSE().parse(data)[1]); default: return log.warn('Invalid request'); } }; socket.onclose = () => log.info('User disconnected'); return response; } return new Response('Eve Relay'); }, });