diff --git a/keyserver/src/cron/backups.js b/keyserver/src/cron/backups.js index 084c63220..8378239cb 100644 --- a/keyserver/src/cron/backups.js +++ b/keyserver/src/cron/backups.js @@ -1,204 +1,214 @@ // @flow import childProcess from 'child_process'; import dateFormat from 'dateformat'; import fs from 'fs'; import invariant from 'invariant'; import { ReReadable } from 'rereadable-stream'; import { PassThrough } from 'stream'; import { promisify } from 'util'; import zlib from 'zlib'; -import dbConfig from '../../secrets/db_config'; +import { getDBConfig, type DBConfig } from '../database/db-config'; import { importJSON } from '../utils/import-json'; const readdir = promisify(fs.readdir); const lstat = promisify(fs.lstat); const unlink = promisify(fs.unlink); async function backupDB() { - const backupConfig = await importJSON('facts/backups'); + const [backupConfig, dbConfig] = await Promise.all([ + importJSON('facts/backups'), + getDBConfig(), + ]); + if (!backupConfig || !backupConfig.enabled) { return; } const dateString = dateFormat('yyyy-mm-dd-HH:MM'); const filename = `comm.${dateString}.sql.gz`; const filePath = `${backupConfig.directory}/${filename}`; const rawStream = new PassThrough(); (async () => { try { - await mysqldump(filename, rawStream, ['--no-data'], { end: false }); + await mysqldump(dbConfig, filename, rawStream, ['--no-data'], { + end: false, + }); } catch {} try { const ignoreReports = `--ignore-table=${dbConfig.database}.reports`; - await mysqldump(filename, rawStream, ['--no-create-info', ignoreReports]); + await mysqldump(dbConfig, filename, rawStream, [ + '--no-create-info', + ignoreReports, + ]); } catch { rawStream.end(); } })(); const gzippedBuffer = new ReReadable(); rawStream .on('error', (e: Error) => { console.warn(`mysqldump stdout stream emitted error for ${filename}`, e); }) .pipe(zlib.createGzip()) .on('error', (e: Error) => { console.warn(`gzip transform stream emitted error for ${filename}`, e); }) .pipe(gzippedBuffer); try { await saveBackup(filename, filePath, gzippedBuffer); } catch (e) { console.warn(`saveBackup threw for ${filename}`, e); await unlink(filePath); } } function mysqldump( + dbConfig: DBConfig, filename: string, rawStream: PassThrough, extraParams: $ReadOnlyArray, pipeParams?: { end?: boolean, ... }, ): Promise { const mysqlDump = childProcess.spawn( 'mysqldump', [ '-u', dbConfig.user, `-p${dbConfig.password}`, '--single-transaction', '--no-tablespaces', ...extraParams, dbConfig.database, ], { stdio: ['ignore', 'pipe', 'ignore'], }, ); const extraParamsString = extraParams.join(' '); return new Promise((resolve, reject) => { mysqlDump.on('error', (e: Error) => { console.warn( `error trying to spawn mysqldump ${extraParamsString} for ${filename}`, e, ); reject(e); }); mysqlDump.on('exit', (code: number | null, signal: string | null) => { if (signal !== null && signal !== undefined) { console.warn( `mysqldump ${extraParamsString} received signal ${signal} for ` + filename, ); reject(new Error(`mysqldump ${JSON.stringify({ code, signal })}`)); } else if (code !== null && code !== 0) { console.warn( `mysqldump ${extraParamsString} exited with code ${code} for ` + filename, ); reject(new Error(`mysqldump ${JSON.stringify({ code, signal })}`)); } resolve(); }); mysqlDump.stdout.pipe(rawStream, pipeParams); }); } async function saveBackup( filename: string, filePath: string, gzippedBuffer: ReReadable, retries: number = 2, ): Promise { try { await trySaveBackup(filename, filePath, gzippedBuffer); } catch (saveError) { if (saveError.code !== 'ENOSPC') { throw saveError; } if (!retries) { throw saveError; } try { await deleteOldestBackup(); } catch (deleteError) { if (deleteError.message === 'no_backups_left') { throw saveError; } else { throw deleteError; } } await saveBackup(filename, filePath, gzippedBuffer, retries - 1); } } const backupWatchFrequency = 60 * 1000; function trySaveBackup( filename: string, filePath: string, gzippedBuffer: ReReadable, ): Promise { const timeoutObject: { timeout: ?TimeoutID } = { timeout: null }; const setBackupTimeout = (alreadyWaited: number) => { timeoutObject.timeout = setTimeout(() => { const nowWaited = alreadyWaited + backupWatchFrequency; console.log( `writing backup for ${filename} has taken ${nowWaited}ms so far`, ); setBackupTimeout(nowWaited); }, backupWatchFrequency); }; setBackupTimeout(0); const writeStream = fs.createWriteStream(filePath); return new Promise((resolve, reject) => { gzippedBuffer .rewind() .pipe(writeStream) .on('finish', () => { clearTimeout(timeoutObject.timeout); resolve(); }) .on('error', (e: Error) => { clearTimeout(timeoutObject.timeout); console.warn(`write stream emitted error for ${filename}`, e); reject(e); }); }); } async function deleteOldestBackup() { const backupConfig = await importJSON('facts/backups'); invariant(backupConfig, 'backupConfig should be non-null'); const files = await readdir(backupConfig.directory); let oldestFile; for (const file of files) { if (!file.endsWith('.sql.gz') || !file.startsWith('comm.')) { continue; } const stat = await lstat(`${backupConfig.directory}/${file}`); if (stat.isDirectory()) { continue; } if (!oldestFile || stat.mtime < oldestFile.mtime) { oldestFile = { file, mtime: stat.mtime }; } } if (!oldestFile) { throw new Error('no_backups_left'); } try { await unlink(`${backupConfig.directory}/${oldestFile.file}`); } catch (e) { // Check if it's already been deleted if (e.code !== 'ENOENT') { throw e; } } } export { backupDB }; diff --git a/keyserver/src/database/database.js b/keyserver/src/database/database.js index 3bf321ee9..e227634ba 100644 --- a/keyserver/src/database/database.js +++ b/keyserver/src/database/database.js @@ -1,154 +1,160 @@ // @flow import type { QueryResults } from 'mysql'; import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import SQL from 'sql-template-strings'; -import dbConfig from '../../secrets/db_config'; import { getScriptContext } from '../scripts/script-context'; import { connectionLimit, queryWarnTime } from './consts'; +import { getDBConfig } from './db-config'; import DatabaseMonitor from './monitor'; import type { Pool, SQLOrString, SQLStatementType } from './types'; const SQLStatement: SQLStatementType = SQL.SQLStatement; let pool, databaseMonitor; -function getPool(): Pool { +async function loadPool(): Promise { if (pool) { return pool; } const scriptContext = getScriptContext(); + const dbConfig = await getDBConfig(); pool = mysqlPromise.createPool({ ...dbConfig, connectionLimit, multipleStatements: !!( scriptContext && scriptContext.allowMultiStatementSQLQueries ), }); databaseMonitor = new DatabaseMonitor(pool); return pool; } +function endPool() { + pool?.end(); +} + function appendSQLArray( sql: SQLStatementType, sqlArray: $ReadOnlyArray, delimeter: SQLOrString, ): SQLStatementType { if (sqlArray.length === 0) { return sql; } const [first, ...rest] = sqlArray; sql.append(first); if (rest.length === 0) { return sql; } return rest.reduce( (prev: SQLStatementType, curr: SQLStatementType) => prev.append(delimeter).append(curr), sql, ); } function mergeConditions( conditions: $ReadOnlyArray, delimiter: SQLStatementType, ): SQLStatementType { const sql = SQL` (`; appendSQLArray(sql, conditions, delimiter); sql.append(SQL`) `); return sql; } function mergeAndConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` AND `); } function mergeOrConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` OR `); } // We use this fake result for dry runs function FakeSQLResult() { this.insertId = -1; } FakeSQLResult.prototype = Array.prototype; const fakeResult: QueryResults = (new FakeSQLResult(): any); type QueryOptions = { +triesLeft?: number, +multipleStatements?: boolean, }; async function dbQuery( statement: SQLStatementType, options?: QueryOptions, ): Promise { const triesLeft = options?.triesLeft ?? 2; const multipleStatements = options?.multipleStatements ?? false; let connection; if (multipleStatements) { connection = await getMultipleStatementsConnection(); } if (!connection) { - connection = getPool(); + connection = await loadPool(); } const timeoutID = setTimeout( () => databaseMonitor.reportLaggingQuery(statement.sql), queryWarnTime, ); const scriptContext = getScriptContext(); try { const sql = statement.sql.trim(); if ( scriptContext && scriptContext.dryRun && (sql.startsWith('INSERT') || sql.startsWith('DELETE') || sql.startsWith('UPDATE')) ) { console.log(rawSQL(statement)); return ([fakeResult]: any); } return await connection.query(statement); } catch (e) { if (e.errno === 1213 && triesLeft > 0) { console.log('deadlock occurred, trying again', e); return await dbQuery(statement, { ...options, triesLeft: triesLeft - 1 }); } e.query = statement.sql; throw e; } finally { clearTimeout(timeoutID); if (multipleStatements) { connection.end(); } } } function rawSQL(statement: SQLStatementType): string { return mysql.format(statement.sql, statement.values); } async function getMultipleStatementsConnection() { + const dbConfig = await getDBConfig(); return await mysqlPromise.createConnection({ ...dbConfig, multipleStatements: true, }); } export { - getPool, + endPool, SQL, SQLStatement, appendSQLArray, mergeAndConditions, mergeOrConditions, dbQuery, rawSQL, }; diff --git a/keyserver/src/database/db-config.js b/keyserver/src/database/db-config.js new file mode 100644 index 000000000..45292d271 --- /dev/null +++ b/keyserver/src/database/db-config.js @@ -0,0 +1,38 @@ +// @flow + +import invariant from 'invariant'; + +import { importJSON } from '../utils/import-json'; + +export type DBConfig = { + +host: string, + +user: string, + +password: string, + +database: string, +}; + +let dbConfig; +async function getDBConfig(): Promise { + if (dbConfig !== undefined) { + return dbConfig; + } + if ( + process.env.COMM_MYSQL_DATABASE && + process.env.COMM_MYSQL_USER && + process.env.COMM_MYSQL_PASSWORD + ) { + dbConfig = { + host: process.env.COMM_MYSQL_HOST || 'localhost', + user: process.env.COMM_MYSQL_USER, + password: process.env.COMM_MYSQL_PASSWORD, + database: process.env.COMM_MYSQL_DATABASE, + }; + } else { + const importedDBConfig = await importJSON('secrets/db_config'); + invariant(importedDBConfig, 'DB config missing'); + dbConfig = importedDBConfig; + } + return dbConfig; +} + +export { getDBConfig }; diff --git a/keyserver/src/scripts/utils.js b/keyserver/src/scripts/utils.js index eee782897..bfb30c08e 100644 --- a/keyserver/src/scripts/utils.js +++ b/keyserver/src/scripts/utils.js @@ -1,26 +1,26 @@ // @flow -import { getPool } from '../database/database'; +import { endPool } from '../database/database'; import { endFirebase, endAPNs } from '../push/providers'; import { publisher } from '../socket/redis'; function endScript() { - getPool().end(); + endPool(); publisher.end(); endFirebase(); endAPNs(); } async function main(functions: $ReadOnlyArray<() => Promise>) { try { for (const f of functions) { await f(); } } catch (e) { console.warn(e); } finally { endScript(); } } export { endScript, main };