From 095791f44f9151913df33815d6471a8be194f3d4 Mon Sep 17 00:00:00 2001 From: Danny Morabito Date: Tue, 3 Dec 2024 13:17:18 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=20feat(smtp):=20Refactor=20mail=20?= =?UTF-8?q?server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Split email pipeline into async receive/process queues for max throughput - Implement persistent queue storage - no more lost emails on crashes! - Fix memory leaks when handling chunky payloads - Update all dependencies - Clean up code --- bun.lockb | Bin 56363 -> 56363 bytes package.json | 10 +- src/smtpServer.ts | 322 +++++++++++++++++++++++++++++++++------------- 3 files changed, 241 insertions(+), 91 deletions(-) diff --git a/bun.lockb b/bun.lockb index 57b31f4bfa68883de975a692381065d4421a52f5..af68f95ea97d559766114681b03bd81e5bc482df 100755 GIT binary patch delta 171 zcmV;c09600xC5)W1CTBt=!ddLwX_mly_7Ue(?|QwyhVWq9t)zyAQ!13mQZFLu}%st z0Vb2d2`96FEoD1EiU~p5-*eP=It+`j!pAULKisn&wn9*u_oN$gq#uhdaGK_K^2*RG zkKYYh1sn=C_@eu0mPS=!ew+oXN26nkvn6RNF#<6$lTn&v0XLJjnw|kSlU1960XCDd Zr4+O4n@K(aIFq5pAd|qr6SHXA!vQG%OAr76 delta 169 zcmV;a09OC2xC5)W1CTBtv%K-7s!4YWvMm@#8PEqy!KN|PphByft|Rj)mVv^7u}%st z0VI>b2`96FEoD1E;t&g^U^7IlJH>Fv?8Ub*@n_|tEFH`P { - let mailData = ''; - - stream.on('data', (chunk: Buffer) => { - mailData += chunk.toString(); - }); - - stream.on('end', async () => { - if (!session.envelope.mailFrom) { - logger.warn('Ignoring email without sender'); - callback(); - return; - } - try { - const parsedEmail = parseEmail(mailData); - for (let recipientEmail of session.envelope.rcptTo) { - const address = recipientEmail.address; - const parts = address.split('@'); - if (parts[1] !== process.env.BASE_DOMAIN) { - logger.warn('Not sending email to', address, 'because it is not in the allowed domain'); - continue; - } - const alias = parts[0]; - const user = await db.alias.findUnique({ - where: { - alias - }, - include: { - user: true - } - }); - if (!user) { - logger.warn('No user found for', alias, 'skipping'); - continue; - } - const timeRemainingInSubscription = user.user.subscriptionDuration === null ? Infinity : (user.user.subscriptionDuration * 1000) - Date.now() + user.user.lastPayment.getTime(); - if (timeRemainingInSubscription <= 0) { - logger.warn(`Subscription has expired for ${alias}`); - continue; - } - const recipient = user.npub; - const randomKeySinger = new NDKPrivateKeySigner(deriveNsecForEmail( - process.env.MASTER_NSEC!, - session.envelope.mailFrom?.address - )); - const ndk = getNDK(); - ndk.signer = randomKeySinger; - await ndk.connect(); - const ndkUser = ndk.getUser({ - npub: recipient - }); - const randomKeyUser = await randomKeySinger.user(); - const event = new NDKEvent(); - event.kind = NDKKind.Article; - event.content = parsedEmail.body; - event.created_at = Math.floor(Date.now() / 1000); - event.pubkey = randomKeyUser.pubkey; - event.tags.push(['p', ndkUser.pubkey]) - event.tags.push(['subject', parsedEmail.subject]); - event.tags.push(['email:localIP', session.localAddress]); - event.tags.push(['email:remoteIP', session.remoteAddress]); - event.tags.push(['email:isEmail', 'true']); - for (let to of session.envelope.rcptTo) - event.tags.push(['email:to', to.address]); - for (let header of Object.keys(parsedEmail.headers)) - event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]); - event.tags.push(['email:session', session.id]); - event.tags.push(['email:from', session.envelope.mailFrom?.address ?? '']); - - await event.sign(randomKeySinger); - const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser); - await encryptedEvent.publish(); - } - } catch (e) { - logger.error(JSON.stringify(e)); - } finally { - callback(); - } - }); - } + onData: (stream, session, callback) => this.handleEmailData(stream, session, callback, db) }); this.server.listen(port, '0.0.0.0'); logger.info(`SMTP Server running on port ${port}`); + + fs.mkdir(this.BACKUP_DIR, {recursive: true}).catch(err => { + logger.error('Failed to create backup directory:', err); + }); + + this.setupGracefulShutdown(); + this.recoverFromBackups(); } -} \ No newline at end of file + + private async recoverFromBackups() { + const files = await fs.readdir(this.BACKUP_DIR); + for (const file of files) { + try { + const data = JSON.parse(await fs.readFile(path.join(this.BACKUP_DIR, file), 'utf-8')); + this.emailQueue.push(data); + } catch (error) { + logger.error(`Failed to recover backup ${file}:`, error); + } + } + await this.processQueue(); + } + + private setupGracefulShutdown(): void { + const shutdown = async () => { + logger.info('Graceful shutdown initiated'); + + this.server.close(); + + while (this.isProcessing) + await new Promise(resolve => setTimeout(resolve, 100)); + + for (const email of this.emailQueue) + await this.backupEmail(email); + + logger.info('Graceful shutdown completed'); + process.exit(0); + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); + } + + private async handleEmailData(stream: SMTPServerDataStream, session: SMTPServerSession, callback: () => void, db: PrismaClient) { + const chunks: Buffer[] = []; + + stream.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + + stream.on('end', async () => { + if (!this.validateSender(session)) { + callback(); + return; + } + + const mailData = Buffer.concat(chunks).toString(); + + try { + const queuedEmail: QueuedEmail = { + id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + mailData, + session, + attempts: 0, + createdAt: Date.now() + }; + + this.emailQueue.push(queuedEmail); + await this.backupEmail(queuedEmail); + this.processQueue(); + } catch (e) { + logger.error(`Error processing recipients: ${e}`, e); + } + + callback(); + }); + } + + private async backupEmail(email: QueuedEmail): Promise { + try { + const backupPath = path.join(this.BACKUP_DIR, `${email.id}.json`); + await fs.writeFile(backupPath, JSON.stringify(email)); + } catch (error) { + logger.error('Failed to backup email:', error); + } + } + + private async processQueue(): Promise { + if (this.isProcessing) + return; + this.isProcessing = true; + while (this.emailQueue.length > 0) { + const email = this.emailQueue[0]; + if (email.attempts >= this.MAX_RETRIES) { + this.emailQueue.shift(); + continue; + } + + try { + const parsedEmail: ReturnType = parseEmail(email.mailData); + await this.processRecipients(email.session, parsedEmail, this.db); + + // Remove from queue and delete backup if successful + this.emailQueue.shift(); + await this.deleteBackup(email.id); + } catch (error) { + logger.error(`Failed to process email ${email.id}:`, error); + email.attempts++; + + this.emailQueue.push(this.emailQueue.shift()!); + } + } + this.isProcessing = false; + } + + private async deleteBackup(id: string): Promise { + try { + const backupPath = path.join(this.BACKUP_DIR, `${id}.json`); + await fs.unlink(backupPath); + } catch (error) { + logger.error(`[TOXIC DATA!!!] Failed to delete email backup ${id}.json:`, error); + } + } + + private validateSender(session: SMTPServerSession): boolean { + if (!session.envelope.mailFrom) { + logger.warn('Ignoring email without sender'); + return false; + } + return true; + } + + private async processRecipients(session: SMTPServerSession, parsedEmail: ReturnType, db: PrismaClient) { + for (const recipientEmail of session.envelope.rcptTo) { + const address = recipientEmail.address; + const [alias, domain] = address.split('@'); + + if (domain !== process.env.BASE_DOMAIN) { + logger.warn(`Not sending email to ${address} because it is not in the allowed domain`); + continue; + } + + const user = await this.getUser(alias, db); + if (!user || !this.isSubscriptionValid(user)) continue; + + await this.sendNostrLetter(session, parsedEmail, user.npub); + } + } + + private async getUser(alias: string, db: PrismaClient) { + const user = await db.alias.findUnique({ + where: {alias}, + include: {user: true} + }); + + if (!user) { + logger.warn('No user found for', alias, 'skipping'); + return null; + } + return user; + } + + private isSubscriptionValid(user: NonNullable>>): boolean { + // If there's no duration set, it's an unlimited subscription + if (user.user.subscriptionDuration === null) + return true; + + const subscriptionDurationMs = user.user.subscriptionDuration * 1000; + const lastPaymentTimestamp = user.user.lastPayment.getTime(); + const currentTimestamp = Date.now(); + + const subscriptionEndTime = lastPaymentTimestamp + subscriptionDurationMs; + const timeRemaining = subscriptionEndTime - currentTimestamp; + + if (timeRemaining <= 0) { + logger.warn(`Subscription has expired for ${user.alias}`); + return false; + } + return true; + } + + private async sendNostrLetter(session: SMTPServerSession, parsedEmail: ReturnType, recipient: string) { + const randomKeySinger = new NDKPrivateKeySigner( + deriveNsecForEmail(process.env.MASTER_NSEC!, (session.envelope.mailFrom as SMTPServerAddress).address) + ); + + const ndk = getNDK(); + ndk.signer = randomKeySinger; + await ndk.connect(); + + const ndkUser = ndk.getUser({npub: recipient}); + const randomKeyUser = await randomKeySinger.user(); + const event = new NDKEvent(); + event.kind = NDKKind.Article; + event.content = parsedEmail.body; + event.created_at = Math.floor(Date.now() / 1000); + event.pubkey = randomKeyUser.pubkey; + event.tags.push( + ['p', ndkUser.pubkey], + ['subject', parsedEmail.subject], + ['email:localIP', session.localAddress], + ['email:remoteIP', session.remoteAddress], + ['email:isEmail', 'true'], + ['email:session', session.id], + ['email:from', (session.envelope.mailFrom as SMTPServerAddress).address] + ); + + for (const to of session.envelope.rcptTo) { + event.tags.push(['email:to', to.address]); + } + + for (const header of Object.keys(parsedEmail.headers)) { + event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]); + } + + await event.sign(randomKeySinger); + const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser); + await encryptedEvent.publish(); + } +}