const {getDB, initDB} = require('./database'); const {logger, auditLogger} = require('./config/logger'); const BalanceAuditor = require('./balance_audit'); const cron = require('node-cron'); const fs = require('fs'); const path = require('path'); /** * 余额监控服务 * 定期检查用户余额一致性,发现异常时发送警报 */ class BalanceMonitor { constructor() { this.db = null; this.auditor = new BalanceAuditor(); this.alertThreshold = 10; // 差异超过10元时发送警报 this.maxProblematicUsers = 50; // 最多报告50个问题用户 } /** * 初始化监控服务 */ async init() { await initDB(); this.db = getDB(); await this.auditor.init(); // 初始化审计器的数据库连接 console.log('余额监控服务已初始化'); logger.info('Balance monitor service initialized'); } /** * 执行快速余额检查 * 只检查最近有转账活动的用户 */ async quickBalanceCheck() { try { console.log('开始执行快速余额检查...'); // 获取最近7天有转账活动的用户 const [activeUsers] = await this.db.execute(` SELECT DISTINCT u.id, u.username, u.real_name, u.balance, u.role FROM users u INNER JOIN transfers t ON (u.id = t.from_user_id OR u.id = t.to_user_id) WHERE t.created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) AND u.role != 'system' ORDER BY u.id `); console.log(`检查 ${activeUsers.length} 个活跃用户的余额`); const problematicUsers = []; for (const user of activeUsers) { const auditResult = await this.auditor.auditUser(user); if (auditResult.isProblematic && Math.abs(auditResult.discrepancy) > this.alertThreshold) { problematicUsers.push(auditResult); // 记录警报日志 auditLogger.warn('Balance discrepancy detected', { userId: user.id, username: user.username, actualBalance: auditResult.actualBalance, theoreticalBalance: auditResult.theoreticalBalance, discrepancy: auditResult.discrepancy, checkTime: new Date().toISOString() }); } } if (problematicUsers.length > 0) { await this.sendAlert(problematicUsers, 'quick'); } console.log(`快速检查完成,发现 ${problematicUsers.length} 个问题用户`); } catch (error) { console.error('快速余额检查失败:', error); logger.error('Quick balance check failed', { error: error.message }); } } /** * 执行完整余额审计 */ async fullBalanceAudit() { try { console.log('开始执行完整余额审计...'); await this.auditor.init(); await this.auditor.auditAllUsers(); const report = this.auditor.generateReport(); // 如果发现严重问题,发送警报 if (report.summary.problematicUsersCount > 0) { const criticalUsers = this.auditor.auditResults.problematicUsers .filter(user => Math.abs(user.discrepancy) > this.alertThreshold) .slice(0, this.maxProblematicUsers); if (criticalUsers.length > 0) { await this.sendAlert(criticalUsers, 'full'); } } console.log('完整审计完成'); } catch (error) { console.error('完整余额审计失败:', error); logger.error('Full balance audit failed', { error: error.message }); } } /** * 发送余额异常警报 * @param {Array} problematicUsers - 问题用户列表 * @param {string} checkType - 检查类型 ('quick' 或 'full') */ async sendAlert(problematicUsers, checkType) { const timestamp = new Date().toISOString(); const alertData = { alertTime: timestamp, checkType: checkType, problematicUsersCount: problematicUsers.length, totalDiscrepancy: problematicUsers.reduce((sum, user) => sum + Math.abs(user.discrepancy), 0), users: problematicUsers.map(user => ({ userId: user.userId, username: user.username, actualBalance: user.actualBalance, theoreticalBalance: user.theoreticalBalance, discrepancy: user.discrepancy, shouldBeRefunded: user.shouldBeRefunded })) }; // 保存警报到文件 const alertPath = path.join(__dirname, 'logs', `balance_alert_${timestamp.replace(/[:.]/g, '-')}.json`); // 确保logs目录存在 const logsDir = path.join(__dirname, 'logs'); if (!fs.existsSync(logsDir)) { fs.mkdirSync(logsDir, { recursive: true }); } fs.writeFileSync(alertPath, JSON.stringify(alertData, null, 2), 'utf8'); // 记录到审计日志 auditLogger.error('Balance discrepancy alert', { checkType: checkType, problematicUsersCount: problematicUsers.length, totalDiscrepancy: alertData.totalDiscrepancy, alertFile: alertPath }); console.log(`余额异常警报已生成: ${alertPath}`); // 这里可以添加其他警报方式,如发送邮件、短信等 // await this.sendEmailAlert(alertData); // await this.sendSMSAlert(alertData); } /** * 检查特定用户的余额 * @param {number} userId - 用户ID * @returns {Object} 检查结果 */ async checkUserBalance(userId) { try { const [users] = await this.db.execute( 'SELECT id, username, real_name, balance, role FROM users WHERE id = ?', [userId] ); if (users.length === 0) { throw new Error(`用户 ${userId} 不存在`); } const auditResult = await this.auditor.auditUser(users[0]); // 记录检查日志 auditLogger.info('Manual balance check', { userId: userId, username: users[0].username, actualBalance: auditResult.actualBalance, theoreticalBalance: auditResult.theoreticalBalance, discrepancy: auditResult.discrepancy, isProblematic: auditResult.isProblematic, checkTime: new Date().toISOString() }); return auditResult; } catch (error) { logger.error('User balance check failed', { userId, error: error.message }); throw error; } } /** * 检查管理员操作的余额一致性 * 监控最近的管理员状态变更操作,检查是否正确调整了余额 */ async checkAdminOperations() { try { console.log('开始检查管理员操作的余额一致性...'); // 查询最近24小时内管理员修改的转账记录 const [adminModifiedTransfers] = await this.db.execute(` SELECT t.id, t.from_user_id, t.to_user_id, t.amount, t.status, t.admin_modified_at, t.admin_modified_by, t.admin_note, u_admin.username as admin_username FROM transfers t LEFT JOIN users u_admin ON t.admin_modified_by = u_admin.id WHERE t.admin_modified_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR) AND t.admin_modified_by IS NOT NULL ORDER BY t.admin_modified_at DESC `); if (adminModifiedTransfers.length === 0) { console.log('最近24小时内没有管理员操作记录'); return; } console.log(`检查 ${adminModifiedTransfers.length} 个管理员操作记录`); const suspiciousOperations = []; for (const transfer of adminModifiedTransfers) { // 检查涉及的用户余额是否正常 const userIds = [transfer.from_user_id, transfer.to_user_id].filter(id => id); for (const userId of userIds) { const [users] = await this.db.execute( 'SELECT id, username, balance FROM users WHERE id = ?', [userId] ); if (users.length > 0) { const auditResult = await this.auditor.auditUser(users[0]); // 如果发现余额异常,且与管理员操作时间相近,标记为可疑 if (auditResult.isProblematic && Math.abs(auditResult.discrepancy) >= this.alertThreshold) { suspiciousOperations.push({ transferId: transfer.id, userId: userId, username: users[0].username, adminUsername: transfer.admin_username, adminModifiedAt: transfer.admin_modified_at, adminNote: transfer.admin_note, transferStatus: transfer.status, transferAmount: transfer.amount, balanceDiscrepancy: auditResult.discrepancy, actualBalance: auditResult.actualBalance, theoreticalBalance: auditResult.theoreticalBalance }); } } } } if (suspiciousOperations.length > 0) { console.log(`⚠️ 发现 ${suspiciousOperations.length} 个可疑的管理员操作`); // 生成管理员操作警报 await this.generateAdminOperationAlert(suspiciousOperations); // 记录警报日志 logger.warn('Suspicious admin operations detected', { count: suspiciousOperations.length, operations: suspiciousOperations }); } else { console.log('✅ 管理员操作检查正常,未发现余额异常'); } } catch (error) { console.error('检查管理员操作失败:', error); logger.error('Admin operations check failed', { error: error.message }); } } /** * 生成管理员操作警报文件 * @param {Array} suspiciousOperations - 可疑操作列表 */ async generateAdminOperationAlert(suspiciousOperations) { const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const alertPath = path.join(__dirname, 'logs', `admin_operation_alert_${timestamp}.json`); const alertData = { alertType: 'admin_operation_balance_discrepancy', timestamp: new Date().toISOString(), summary: { suspiciousOperationsCount: suspiciousOperations.length, totalDiscrepancy: suspiciousOperations.reduce((sum, op) => sum + Math.abs(op.balanceDiscrepancy), 0) }, suspiciousOperations: suspiciousOperations, recommendations: [ '检查管理员是否在状态变更时正确设置了adjust_balance参数', '验证转账状态变更的合理性', '如有必要,手动修复用户余额并记录修复日志' ] }; // 确保logs目录存在 const logsDir = path.join(__dirname, 'logs'); if (!fs.existsSync(logsDir)) { fs.mkdirSync(logsDir, { recursive: true }); } // 写入警报文件 fs.writeFileSync(alertPath, JSON.stringify(alertData, null, 2)); console.log(`管理员操作警报已生成: ${alertPath}`); // 记录警报日志 auditLogger.warn('Admin operation alert generated', { suspiciousOperationsCount: suspiciousOperations.length, totalDiscrepancy: alertData.summary.totalDiscrepancy, alertFile: alertPath }); } /** * 启动定时监控任务 */ startScheduledTasks() { console.log('启动定时余额监控任务...'); // 每小时执行一次快速检查 cron.schedule('0 * * * *', async () => { console.log('执行定时快速余额检查'); await this.quickBalanceCheck(); }); // 每30分钟检查一次管理员操作 cron.schedule('*/30 * * * *', async () => { console.log('执行管理员操作余额一致性检查'); await this.checkAdminOperations(); }); // 每天凌晨2点执行完整审计 cron.schedule('0 2 * * *', async () => { console.log('执行定时完整余额审计'); await this.fullBalanceAudit(); }); // 每周日凌晨3点执行深度审计 cron.schedule('0 3 * * 0', async () => { console.log('执行定时深度余额审计'); await this.fullBalanceAudit(); }); logger.info('Balance monitoring scheduled tasks started'); console.log('定时监控任务已启动'); console.log('- 快速检查: 每小时执行一次'); console.log('- 管理员操作检查: 每30分钟执行一次'); console.log('- 完整审计: 每天凌晨2点执行'); console.log('- 深度审计: 每周日凌晨3点执行'); } /** * 停止监控服务 */ stop() { console.log('停止余额监控服务'); logger.info('Balance monitor service stopped'); process.exit(0); } /** * 运行监控服务 */ async run() { try { await this.init(); // 启动时执行一次快速检查 await this.quickBalanceCheck(); // 启动时执行一次管理员操作检查 await this.checkAdminOperations(); // 启动定时任务 this.startScheduledTasks(); // 监听退出信号 process.on('SIGINT', () => { console.log('\n收到退出信号,正在停止监控服务...'); this.stop(); }); process.on('SIGTERM', () => { console.log('\n收到终止信号,正在停止监控服务...'); this.stop(); }); console.log('余额监控服务正在运行中...'); console.log('按 Ctrl+C 停止服务'); } catch (error) { console.error('启动监控服务失败:', error); logger.error('Balance monitor startup failed', { error: error.message }); process.exit(1); } } } // 如果直接运行此脚本 if (require.main === module) { const monitor = new BalanceMonitor(); // 检查命令行参数 const args = process.argv.slice(2); if (args.includes('--quick')) { // 执行一次快速检查后退出 monitor.init().then(() => { return monitor.quickBalanceCheck(); }).then(() => { console.log('快速检查完成'); process.exit(0); }).catch(console.error); } else if (args.includes('--full')) { // 执行一次完整审计后退出 monitor.init().then(() => { return monitor.fullBalanceAudit(); }).then(() => { console.log('完整审计完成'); process.exit(0); }).catch(console.error); } else if (args.includes('--user')) { // 检查特定用户 const userIdIndex = args.indexOf('--user') + 1; const userId = args[userIdIndex]; if (!userId) { console.error('请指定用户ID: --user '); process.exit(1); } monitor.init().then(() => { return monitor.checkUserBalance(parseInt(userId)); }).then((result) => { console.log('用户余额检查结果:'); console.log(`用户: ${result.username} (ID: ${result.userId})`); console.log(`实际余额: ${result.actualBalance}`); console.log(`理论余额: ${result.theoreticalBalance}`); console.log(`差异: ${result.discrepancy}`); console.log(`是否有问题: ${result.isProblematic ? '是' : '否'}`); process.exit(0); }).catch(console.error); } else { // 启动持续监控服务 monitor.run().catch(console.error); } } module.exports = BalanceMonitor;