diff --git a/bun.lockb b/bun.lockb index 57b31f4..af68f95 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 1f7d950..582753b 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "", "main": "src/index.ts", "scripts": { - "dev": "DEBUG='ndk:*' bun --watch src/index.ts", + "start": "DEBUG='ndk:*' bun --watch src/index.ts", "db:generate": "prisma generate", "db:migrate": "prisma migrate dev" }, @@ -14,21 +14,21 @@ "@elysiajs/server-timing": "^1.1.0", "@elysiajs/swagger": "^1.1.6", "@libsql/client": "^0.14.0", - "@nostr-dev-kit/ndk": "^2.10.6", + "@nostr-dev-kit/ndk": "^2.10.7", "@prisma/adapter-libsql": "^5.22.0", "@prisma/client": "5.22.0", "elysia": "^1.1.25", "node-forge": "^1.3.1", - "smtp-server": "^3.13.0", + "smtp-server": "^3.13.6", "websocket-polyfill": "^1.0.0", "winston": "^3.17.0" }, "devDependencies": { - "@types/node-forge": "^1.3.9", + "@types/node-forge": "^1.3.11", "@types/smtp-server": "^3.5.10", "bun-types": "latest", "prisma": "5.22.0", - "typescript": "^5.3.2" + "typescript": "^5.7.2" }, "private": true } diff --git a/src/smtpServer.ts b/src/smtpServer.ts index 36cb852..933d387 100644 --- a/src/smtpServer.ts +++ b/src/smtpServer.ts @@ -1,101 +1,251 @@ -import {SMTPServer} from "smtp-server"; -import {deriveNsecForEmail, getNDK} from "./utils"; +import {SMTPServer, SMTPServerAddress, SMTPServerDataStream, SMTPServerSession} from "smtp-server"; +import {deriveNsecForEmail, getNDK, logger} from "./utils"; import {NDKEvent, NDKKind, NDKPrivateKeySigner} from "@nostr-dev-kit/ndk"; import {PrismaClient} from "@prisma/client"; -import {logger} from "./utils/logs"; import {encryptEventForRecipient, parseEmail} from "@arx/utils"; +import * as path from "node:path"; +import fs from 'node:fs/promises'; + +interface QueuedEmail { + id: string; + mailData: string; + session: SMTPServerSession; + attempts: number; + createdAt: number; +} export class NostrSmtpServer { private server: SMTPServer; + private emailQueue: QueuedEmail[] = []; + private isProcessing: boolean = false; + private readonly MAX_RETRIES = 3; + private readonly BACKUP_DIR = path.join(process.cwd(), 'email-backups'); - constructor(db: PrismaClient, port: number) { + constructor(private db: PrismaClient, port: number) { this.server = new SMTPServer({ authOptional: true, logger: false, - - onData: (stream, session, callback) => { - let mailData = ''; - - stream.on('data', (chunk: Buffer) => { - mailData += chunk.toString(); - }); - - stream.on('end', async () => { - if (!session.envelope.mailFrom) { - logger.warn('Ignoring email without sender'); - callback(); - return; - } - try { - const parsedEmail = parseEmail(mailData); - for (let recipientEmail of session.envelope.rcptTo) { - const address = recipientEmail.address; - const parts = address.split('@'); - if (parts[1] !== process.env.BASE_DOMAIN) { - logger.warn('Not sending email to', address, 'because it is not in the allowed domain'); - continue; - } - const alias = parts[0]; - const user = await db.alias.findUnique({ - where: { - alias - }, - include: { - user: true - } - }); - if (!user) { - logger.warn('No user found for', alias, 'skipping'); - continue; - } - const timeRemainingInSubscription = user.user.subscriptionDuration === null ? Infinity : (user.user.subscriptionDuration * 1000) - Date.now() + user.user.lastPayment.getTime(); - if (timeRemainingInSubscription <= 0) { - logger.warn(`Subscription has expired for ${alias}`); - continue; - } - const recipient = user.npub; - const randomKeySinger = new NDKPrivateKeySigner(deriveNsecForEmail( - process.env.MASTER_NSEC!, - session.envelope.mailFrom?.address - )); - const ndk = getNDK(); - ndk.signer = randomKeySinger; - await ndk.connect(); - const ndkUser = ndk.getUser({ - npub: recipient - }); - const randomKeyUser = await randomKeySinger.user(); - const event = new NDKEvent(); - event.kind = NDKKind.Article; - event.content = parsedEmail.body; - event.created_at = Math.floor(Date.now() / 1000); - event.pubkey = randomKeyUser.pubkey; - event.tags.push(['p', ndkUser.pubkey]) - event.tags.push(['subject', parsedEmail.subject]); - event.tags.push(['email:localIP', session.localAddress]); - event.tags.push(['email:remoteIP', session.remoteAddress]); - event.tags.push(['email:isEmail', 'true']); - for (let to of session.envelope.rcptTo) - event.tags.push(['email:to', to.address]); - for (let header of Object.keys(parsedEmail.headers)) - event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]); - event.tags.push(['email:session', session.id]); - event.tags.push(['email:from', session.envelope.mailFrom?.address ?? '']); - - await event.sign(randomKeySinger); - const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser); - await encryptedEvent.publish(); - } - } catch (e) { - logger.error(JSON.stringify(e)); - } finally { - callback(); - } - }); - } + onData: (stream, session, callback) => this.handleEmailData(stream, session, callback, db) }); this.server.listen(port, '0.0.0.0'); logger.info(`SMTP Server running on port ${port}`); + + fs.mkdir(this.BACKUP_DIR, {recursive: true}).catch(err => { + logger.error('Failed to create backup directory:', err); + }); + + this.setupGracefulShutdown(); + this.recoverFromBackups(); } -} \ No newline at end of file + + private async recoverFromBackups() { + const files = await fs.readdir(this.BACKUP_DIR); + for (const file of files) { + try { + const data = JSON.parse(await fs.readFile(path.join(this.BACKUP_DIR, file), 'utf-8')); + this.emailQueue.push(data); + } catch (error) { + logger.error(`Failed to recover backup ${file}:`, error); + } + } + await this.processQueue(); + } + + private setupGracefulShutdown(): void { + const shutdown = async () => { + logger.info('Graceful shutdown initiated'); + + this.server.close(); + + while (this.isProcessing) + await new Promise(resolve => setTimeout(resolve, 100)); + + for (const email of this.emailQueue) + await this.backupEmail(email); + + logger.info('Graceful shutdown completed'); + process.exit(0); + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); + } + + private async handleEmailData(stream: SMTPServerDataStream, session: SMTPServerSession, callback: () => void, db: PrismaClient) { + const chunks: Buffer[] = []; + + stream.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + + stream.on('end', async () => { + if (!this.validateSender(session)) { + callback(); + return; + } + + const mailData = Buffer.concat(chunks).toString(); + + try { + const queuedEmail: QueuedEmail = { + id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + mailData, + session, + attempts: 0, + createdAt: Date.now() + }; + + this.emailQueue.push(queuedEmail); + await this.backupEmail(queuedEmail); + this.processQueue(); + } catch (e) { + logger.error(`Error processing recipients: ${e}`, e); + } + + callback(); + }); + } + + private async backupEmail(email: QueuedEmail): Promise { + try { + const backupPath = path.join(this.BACKUP_DIR, `${email.id}.json`); + await fs.writeFile(backupPath, JSON.stringify(email)); + } catch (error) { + logger.error('Failed to backup email:', error); + } + } + + private async processQueue(): Promise { + if (this.isProcessing) + return; + this.isProcessing = true; + while (this.emailQueue.length > 0) { + const email = this.emailQueue[0]; + if (email.attempts >= this.MAX_RETRIES) { + this.emailQueue.shift(); + continue; + } + + try { + const parsedEmail: ReturnType = parseEmail(email.mailData); + await this.processRecipients(email.session, parsedEmail, this.db); + + // Remove from queue and delete backup if successful + this.emailQueue.shift(); + await this.deleteBackup(email.id); + } catch (error) { + logger.error(`Failed to process email ${email.id}:`, error); + email.attempts++; + + this.emailQueue.push(this.emailQueue.shift()!); + } + } + this.isProcessing = false; + } + + private async deleteBackup(id: string): Promise { + try { + const backupPath = path.join(this.BACKUP_DIR, `${id}.json`); + await fs.unlink(backupPath); + } catch (error) { + logger.error(`[TOXIC DATA!!!] Failed to delete email backup ${id}.json:`, error); + } + } + + private validateSender(session: SMTPServerSession): boolean { + if (!session.envelope.mailFrom) { + logger.warn('Ignoring email without sender'); + return false; + } + return true; + } + + private async processRecipients(session: SMTPServerSession, parsedEmail: ReturnType, db: PrismaClient) { + for (const recipientEmail of session.envelope.rcptTo) { + const address = recipientEmail.address; + const [alias, domain] = address.split('@'); + + if (domain !== process.env.BASE_DOMAIN) { + logger.warn(`Not sending email to ${address} because it is not in the allowed domain`); + continue; + } + + const user = await this.getUser(alias, db); + if (!user || !this.isSubscriptionValid(user)) continue; + + await this.sendNostrLetter(session, parsedEmail, user.npub); + } + } + + private async getUser(alias: string, db: PrismaClient) { + const user = await db.alias.findUnique({ + where: {alias}, + include: {user: true} + }); + + if (!user) { + logger.warn('No user found for', alias, 'skipping'); + return null; + } + return user; + } + + private isSubscriptionValid(user: NonNullable>>): boolean { + // If there's no duration set, it's an unlimited subscription + if (user.user.subscriptionDuration === null) + return true; + + const subscriptionDurationMs = user.user.subscriptionDuration * 1000; + const lastPaymentTimestamp = user.user.lastPayment.getTime(); + const currentTimestamp = Date.now(); + + const subscriptionEndTime = lastPaymentTimestamp + subscriptionDurationMs; + const timeRemaining = subscriptionEndTime - currentTimestamp; + + if (timeRemaining <= 0) { + logger.warn(`Subscription has expired for ${user.alias}`); + return false; + } + return true; + } + + private async sendNostrLetter(session: SMTPServerSession, parsedEmail: ReturnType, recipient: string) { + const randomKeySinger = new NDKPrivateKeySigner( + deriveNsecForEmail(process.env.MASTER_NSEC!, (session.envelope.mailFrom as SMTPServerAddress).address) + ); + + const ndk = getNDK(); + ndk.signer = randomKeySinger; + await ndk.connect(); + + const ndkUser = ndk.getUser({npub: recipient}); + const randomKeyUser = await randomKeySinger.user(); + const event = new NDKEvent(); + event.kind = NDKKind.Article; + event.content = parsedEmail.body; + event.created_at = Math.floor(Date.now() / 1000); + event.pubkey = randomKeyUser.pubkey; + event.tags.push( + ['p', ndkUser.pubkey], + ['subject', parsedEmail.subject], + ['email:localIP', session.localAddress], + ['email:remoteIP', session.remoteAddress], + ['email:isEmail', 'true'], + ['email:session', session.id], + ['email:from', (session.envelope.mailFrom as SMTPServerAddress).address] + ); + + for (const to of session.envelope.rcptTo) { + event.tags.push(['email:to', to.address]); + } + + for (const header of Object.keys(parsedEmail.headers)) { + event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]); + } + + await event.sign(randomKeySinger); + const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser); + await encryptedEvent.publish(); + } +}