🚀 feat(smtp): Refactor mail server

- Split email pipeline into async receive/process queues for max throughput
- Implement persistent queue storage - no more lost emails on crashes!
- Fix memory leaks when handling chunky payloads
- Update all dependencies
- Clean up code
This commit is contained in:
Danny Morabito 2024-12-03 13:17:18 +01:00
parent 0eba3efe24
commit 095791f44f
Signed by: dannym
GPG key ID: 7CC8056A5A04557E
3 changed files with 241 additions and 91 deletions

BIN
bun.lockb

Binary file not shown.

View file

@ -4,7 +4,7 @@
"description": "", "description": "",
"main": "src/index.ts", "main": "src/index.ts",
"scripts": { "scripts": {
"dev": "DEBUG='ndk:*' bun --watch src/index.ts", "start": "DEBUG='ndk:*' bun --watch src/index.ts",
"db:generate": "prisma generate", "db:generate": "prisma generate",
"db:migrate": "prisma migrate dev" "db:migrate": "prisma migrate dev"
}, },
@ -14,21 +14,21 @@
"@elysiajs/server-timing": "^1.1.0", "@elysiajs/server-timing": "^1.1.0",
"@elysiajs/swagger": "^1.1.6", "@elysiajs/swagger": "^1.1.6",
"@libsql/client": "^0.14.0", "@libsql/client": "^0.14.0",
"@nostr-dev-kit/ndk": "^2.10.6", "@nostr-dev-kit/ndk": "^2.10.7",
"@prisma/adapter-libsql": "^5.22.0", "@prisma/adapter-libsql": "^5.22.0",
"@prisma/client": "5.22.0", "@prisma/client": "5.22.0",
"elysia": "^1.1.25", "elysia": "^1.1.25",
"node-forge": "^1.3.1", "node-forge": "^1.3.1",
"smtp-server": "^3.13.0", "smtp-server": "^3.13.6",
"websocket-polyfill": "^1.0.0", "websocket-polyfill": "^1.0.0",
"winston": "^3.17.0" "winston": "^3.17.0"
}, },
"devDependencies": { "devDependencies": {
"@types/node-forge": "^1.3.9", "@types/node-forge": "^1.3.11",
"@types/smtp-server": "^3.5.10", "@types/smtp-server": "^3.5.10",
"bun-types": "latest", "bun-types": "latest",
"prisma": "5.22.0", "prisma": "5.22.0",
"typescript": "^5.3.2" "typescript": "^5.7.2"
}, },
"private": true "private": true
} }

View file

@ -1,101 +1,251 @@
import {SMTPServer} from "smtp-server"; import {SMTPServer, SMTPServerAddress, SMTPServerDataStream, SMTPServerSession} from "smtp-server";
import {deriveNsecForEmail, getNDK} from "./utils"; import {deriveNsecForEmail, getNDK, logger} from "./utils";
import {NDKEvent, NDKKind, NDKPrivateKeySigner} from "@nostr-dev-kit/ndk"; import {NDKEvent, NDKKind, NDKPrivateKeySigner} from "@nostr-dev-kit/ndk";
import {PrismaClient} from "@prisma/client"; import {PrismaClient} from "@prisma/client";
import {logger} from "./utils/logs";
import {encryptEventForRecipient, parseEmail} from "@arx/utils"; import {encryptEventForRecipient, parseEmail} from "@arx/utils";
import * as path from "node:path";
import fs from 'node:fs/promises';
interface QueuedEmail {
id: string;
mailData: string;
session: SMTPServerSession;
attempts: number;
createdAt: number;
}
export class NostrSmtpServer { export class NostrSmtpServer {
private server: SMTPServer; private server: SMTPServer;
private emailQueue: QueuedEmail[] = [];
private isProcessing: boolean = false;
private readonly MAX_RETRIES = 3;
private readonly BACKUP_DIR = path.join(process.cwd(), 'email-backups');
constructor(db: PrismaClient, port: number) { constructor(private db: PrismaClient, port: number) {
this.server = new SMTPServer({ this.server = new SMTPServer({
authOptional: true, authOptional: true,
logger: false, logger: false,
onData: (stream, session, callback) => this.handleEmailData(stream, session, callback, db)
onData: (stream, session, callback) => {
let mailData = '';
stream.on('data', (chunk: Buffer) => {
mailData += chunk.toString();
});
stream.on('end', async () => {
if (!session.envelope.mailFrom) {
logger.warn('Ignoring email without sender');
callback();
return;
}
try {
const parsedEmail = parseEmail(mailData);
for (let recipientEmail of session.envelope.rcptTo) {
const address = recipientEmail.address;
const parts = address.split('@');
if (parts[1] !== process.env.BASE_DOMAIN) {
logger.warn('Not sending email to', address, 'because it is not in the allowed domain');
continue;
}
const alias = parts[0];
const user = await db.alias.findUnique({
where: {
alias
},
include: {
user: true
}
});
if (!user) {
logger.warn('No user found for', alias, 'skipping');
continue;
}
const timeRemainingInSubscription = user.user.subscriptionDuration === null ? Infinity : (user.user.subscriptionDuration * 1000) - Date.now() + user.user.lastPayment.getTime();
if (timeRemainingInSubscription <= 0) {
logger.warn(`Subscription has expired for ${alias}`);
continue;
}
const recipient = user.npub;
const randomKeySinger = new NDKPrivateKeySigner(deriveNsecForEmail(
process.env.MASTER_NSEC!,
session.envelope.mailFrom?.address
));
const ndk = getNDK();
ndk.signer = randomKeySinger;
await ndk.connect();
const ndkUser = ndk.getUser({
npub: recipient
});
const randomKeyUser = await randomKeySinger.user();
const event = new NDKEvent();
event.kind = NDKKind.Article;
event.content = parsedEmail.body;
event.created_at = Math.floor(Date.now() / 1000);
event.pubkey = randomKeyUser.pubkey;
event.tags.push(['p', ndkUser.pubkey])
event.tags.push(['subject', parsedEmail.subject]);
event.tags.push(['email:localIP', session.localAddress]);
event.tags.push(['email:remoteIP', session.remoteAddress]);
event.tags.push(['email:isEmail', 'true']);
for (let to of session.envelope.rcptTo)
event.tags.push(['email:to', to.address]);
for (let header of Object.keys(parsedEmail.headers))
event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]);
event.tags.push(['email:session', session.id]);
event.tags.push(['email:from', session.envelope.mailFrom?.address ?? '']);
await event.sign(randomKeySinger);
const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser);
await encryptedEvent.publish();
}
} catch (e) {
logger.error(JSON.stringify(e));
} finally {
callback();
}
});
}
}); });
this.server.listen(port, '0.0.0.0'); this.server.listen(port, '0.0.0.0');
logger.info(`SMTP Server running on port ${port}`); logger.info(`SMTP Server running on port ${port}`);
fs.mkdir(this.BACKUP_DIR, {recursive: true}).catch(err => {
logger.error('Failed to create backup directory:', err);
});
this.setupGracefulShutdown();
this.recoverFromBackups();
} }
}
private async recoverFromBackups() {
const files = await fs.readdir(this.BACKUP_DIR);
for (const file of files) {
try {
const data = JSON.parse(await fs.readFile(path.join(this.BACKUP_DIR, file), 'utf-8'));
this.emailQueue.push(data);
} catch (error) {
logger.error(`Failed to recover backup ${file}:`, error);
}
}
await this.processQueue();
}
private setupGracefulShutdown(): void {
const shutdown = async () => {
logger.info('Graceful shutdown initiated');
this.server.close();
while (this.isProcessing)
await new Promise(resolve => setTimeout(resolve, 100));
for (const email of this.emailQueue)
await this.backupEmail(email);
logger.info('Graceful shutdown completed');
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async handleEmailData(stream: SMTPServerDataStream, session: SMTPServerSession, callback: () => void, db: PrismaClient) {
const chunks: Buffer[] = [];
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', async () => {
if (!this.validateSender(session)) {
callback();
return;
}
const mailData = Buffer.concat(chunks).toString();
try {
const queuedEmail: QueuedEmail = {
id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
mailData,
session,
attempts: 0,
createdAt: Date.now()
};
this.emailQueue.push(queuedEmail);
await this.backupEmail(queuedEmail);
this.processQueue();
} catch (e) {
logger.error(`Error processing recipients: ${e}`, e);
}
callback();
});
}
private async backupEmail(email: QueuedEmail): Promise<void> {
try {
const backupPath = path.join(this.BACKUP_DIR, `${email.id}.json`);
await fs.writeFile(backupPath, JSON.stringify(email));
} catch (error) {
logger.error('Failed to backup email:', error);
}
}
private async processQueue(): Promise<void> {
if (this.isProcessing)
return;
this.isProcessing = true;
while (this.emailQueue.length > 0) {
const email = this.emailQueue[0];
if (email.attempts >= this.MAX_RETRIES) {
this.emailQueue.shift();
continue;
}
try {
const parsedEmail: ReturnType<typeof parseEmail> = parseEmail(email.mailData);
await this.processRecipients(email.session, parsedEmail, this.db);
// Remove from queue and delete backup if successful
this.emailQueue.shift();
await this.deleteBackup(email.id);
} catch (error) {
logger.error(`Failed to process email ${email.id}:`, error);
email.attempts++;
this.emailQueue.push(this.emailQueue.shift()!);
}
}
this.isProcessing = false;
}
private async deleteBackup(id: string): Promise<void> {
try {
const backupPath = path.join(this.BACKUP_DIR, `${id}.json`);
await fs.unlink(backupPath);
} catch (error) {
logger.error(`[TOXIC DATA!!!] Failed to delete email backup ${id}.json:`, error);
}
}
private validateSender(session: SMTPServerSession): boolean {
if (!session.envelope.mailFrom) {
logger.warn('Ignoring email without sender');
return false;
}
return true;
}
private async processRecipients(session: SMTPServerSession, parsedEmail: ReturnType<typeof parseEmail>, db: PrismaClient) {
for (const recipientEmail of session.envelope.rcptTo) {
const address = recipientEmail.address;
const [alias, domain] = address.split('@');
if (domain !== process.env.BASE_DOMAIN) {
logger.warn(`Not sending email to ${address} because it is not in the allowed domain`);
continue;
}
const user = await this.getUser(alias, db);
if (!user || !this.isSubscriptionValid(user)) continue;
await this.sendNostrLetter(session, parsedEmail, user.npub);
}
}
private async getUser(alias: string, db: PrismaClient) {
const user = await db.alias.findUnique({
where: {alias},
include: {user: true}
});
if (!user) {
logger.warn('No user found for', alias, 'skipping');
return null;
}
return user;
}
private isSubscriptionValid(user: NonNullable<Awaited<ReturnType<NostrSmtpServer['getUser']>>>): boolean {
// If there's no duration set, it's an unlimited subscription
if (user.user.subscriptionDuration === null)
return true;
const subscriptionDurationMs = user.user.subscriptionDuration * 1000;
const lastPaymentTimestamp = user.user.lastPayment.getTime();
const currentTimestamp = Date.now();
const subscriptionEndTime = lastPaymentTimestamp + subscriptionDurationMs;
const timeRemaining = subscriptionEndTime - currentTimestamp;
if (timeRemaining <= 0) {
logger.warn(`Subscription has expired for ${user.alias}`);
return false;
}
return true;
}
private async sendNostrLetter(session: SMTPServerSession, parsedEmail: ReturnType<typeof parseEmail>, recipient: string) {
const randomKeySinger = new NDKPrivateKeySigner(
deriveNsecForEmail(process.env.MASTER_NSEC!, (session.envelope.mailFrom as SMTPServerAddress).address)
);
const ndk = getNDK();
ndk.signer = randomKeySinger;
await ndk.connect();
const ndkUser = ndk.getUser({npub: recipient});
const randomKeyUser = await randomKeySinger.user();
const event = new NDKEvent();
event.kind = NDKKind.Article;
event.content = parsedEmail.body;
event.created_at = Math.floor(Date.now() / 1000);
event.pubkey = randomKeyUser.pubkey;
event.tags.push(
['p', ndkUser.pubkey],
['subject', parsedEmail.subject],
['email:localIP', session.localAddress],
['email:remoteIP', session.remoteAddress],
['email:isEmail', 'true'],
['email:session', session.id],
['email:from', (session.envelope.mailFrom as SMTPServerAddress).address]
);
for (const to of session.envelope.rcptTo) {
event.tags.push(['email:to', to.address]);
}
for (const header of Object.keys(parsedEmail.headers)) {
event.tags.push([`email:header:${header}`, parsedEmail.headers[header]]);
}
await event.sign(randomKeySinger);
const encryptedEvent = await encryptEventForRecipient(ndk, event, ndkUser);
await encryptedEvent.publish();
}
}