diff --git a/keyserver/src/database/database.js b/keyserver/src/database/database.js index 8928c103d..03d24a0a4 100644 --- a/keyserver/src/database/database.js +++ b/keyserver/src/database/database.js @@ -1,209 +1,210 @@ // @flow import type { ConnectionOptions, QueryResults, PoolOptions } from 'mysql'; import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise.js'; import SQL from 'sql-template-strings'; import { connectionLimit, queryWarnTime } from './consts.js'; import { getDBConfig } from './db-config.js'; import DatabaseMonitor from './monitor.js'; import type { Pool, SQLOrString, SQLStatementType } from './types.js'; import { getScriptContext } from '../scripts/script-context.js'; const SQLStatement: SQLStatementType = SQL.SQLStatement; const MYSQL_DUPLICATE_ENTRY_FOR_KEY_ERROR_CODE = 1062; +const MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE = 1146; +const MYSQL_DEADLOCK_ERROR_CODE = 1213; let migrationConnection; async function getMigrationConnection() { if (migrationConnection) { return migrationConnection; } const { dbType, ...dbConfig } = await getDBConfig(); const options: ConnectionOptions = dbConfig; migrationConnection = await mysqlPromise.createConnection(options); return migrationConnection; } let pool, databaseMonitor; async function loadPool(): Promise { if (pool) { return pool; } const scriptContext = getScriptContext(); const { dbType, ...dbConfig } = await getDBConfig(); const options: PoolOptions = { ...dbConfig, connectionLimit, multipleStatements: !!( scriptContext && scriptContext.allowMultiStatementSQLQueries ), }; // This function can be run asynchronously multiple times, // the previous check is not enough because the function will await // on `getDBConfig()` and as result we might get there // while the pool is already defined, which will result with // creating a new pool and losing the previous one which will stay open if (pool) { return pool; } pool = mysqlPromise.createPool(options); databaseMonitor = new DatabaseMonitor(pool); return pool; } function endPool() { pool?.end(); } function appendSQLArray( sql: SQLStatementType, sqlArray: $ReadOnlyArray, delimeter: SQLOrString, ): SQLStatementType { if (sqlArray.length === 0) { return sql; } const [first, ...rest] = sqlArray; sql.append(first); if (rest.length === 0) { return sql; } return rest.reduce( (prev: SQLStatementType, curr: SQLStatementType) => prev.append(delimeter).append(curr), sql, ); } function mergeConditions( conditions: $ReadOnlyArray, delimiter: SQLStatementType, ): SQLStatementType { const sql = SQL` (`; appendSQLArray(sql, conditions, delimiter); sql.append(SQL`) `); return sql; } function mergeAndConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` AND `); } function mergeOrConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` OR `); } // We use this fake result for dry runs const fakeResult: QueryResults = (() => { const result: any = []; result.insertId = -1; return result; })(); -const MYSQL_DEADLOCK_ERROR_CODE = 1213; - type ConnectionContext = { +migrationsActive?: boolean, }; let connectionContext = { migrationsActive: false, }; function setConnectionContext(newContext: ConnectionContext) { connectionContext = { ...connectionContext, ...newContext, }; if (!connectionContext.migrationsActive && migrationConnection) { migrationConnection.end(); migrationConnection = undefined; } } type QueryOptions = { +triesLeft?: number, +multipleStatements?: boolean, }; async function dbQuery( statement: SQLStatementType, options?: QueryOptions, ): Promise { const triesLeft = options?.triesLeft ?? 2; const multipleStatements = options?.multipleStatements ?? false; let connection; if (connectionContext.migrationsActive) { connection = await getMigrationConnection(); } if (multipleStatements) { connection = await getMultipleStatementsConnection(); } if (!connection) { connection = await loadPool(); } const timeoutID = setTimeout( () => databaseMonitor.reportLaggingQuery(statement.sql), queryWarnTime, ); const scriptContext = getScriptContext(); try { const sql = statement.sql.trim(); if ( scriptContext && scriptContext.dryRun && (sql.startsWith('INSERT') || sql.startsWith('DELETE') || sql.startsWith('UPDATE')) ) { console.log(rawSQL(statement)); return ([fakeResult]: any); } return await connection.query(statement); } catch (e) { if (e.errno === MYSQL_DEADLOCK_ERROR_CODE && triesLeft > 0) { console.log('deadlock occurred, trying again', e); return await dbQuery(statement, { ...options, triesLeft: triesLeft - 1 }); } e.query = statement.sql; throw e; } finally { clearTimeout(timeoutID); if (multipleStatements) { connection.end(); } } } function rawSQL(statement: SQLStatementType): string { return mysql.format(statement.sql, statement.values); } async function getMultipleStatementsConnection() { const { dbType, ...dbConfig } = await getDBConfig(); const options: ConnectionOptions = { ...dbConfig, multipleStatements: true, }; return await mysqlPromise.createConnection(options); } export { endPool, SQL, SQLStatement, appendSQLArray, mergeAndConditions, mergeOrConditions, setConnectionContext, dbQuery, rawSQL, MYSQL_DUPLICATE_ENTRY_FOR_KEY_ERROR_CODE, + MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE, }; diff --git a/keyserver/src/database/migrations.js b/keyserver/src/database/migrations.js index c67676c4c..a7048b0e5 100644 --- a/keyserver/src/database/migrations.js +++ b/keyserver/src/database/migrations.js @@ -1,88 +1,91 @@ // @flow import type { QueryResults } from 'mysql'; import { isDev } from 'lib/utils/dev-utils.js'; import { getMessageForException } from 'lib/utils/errors.js'; import sleep from 'lib/utils/sleep.js'; -import { dbQuery, SQL, setConnectionContext } from './database.js'; +import { + dbQuery, + SQL, + setConnectionContext, + MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE, +} from './database.js'; import { fetchDBVersion, updateDBVersion } from './db-version.js'; import { migrations } from './migration-config.js'; import { setupDB } from './setup-db.js'; async function migrate(): Promise { if (isDev) { await sleep(5000); } let dbVersion = null; try { dbVersion = await setUpDBAndReturnVersion(); console.log(`(node:${process.pid}) DB version: ${dbVersion}`); } catch (e) { const dbVersionExceptionMessage = String(getMessageForException(e)); console.error(`(node:${process.pid}) ${dbVersionExceptionMessage}`); return false; } setConnectionContext({ migrationsActive: true }); for (const [idx, migration] of migrations.entries()) { if (idx <= dbVersion) { continue; } try { await startTransaction(); await migration(); await updateDBVersion(idx); await commitTransaction(); console.log(`(node:${process.pid}) migration ${idx} succeeded.`); } catch (e) { const transactionExceptionMessage = String(getMessageForException(e)); console.error(`(node:${process.pid}) migration ${idx} failed.`); console.error(transactionExceptionMessage); await rollbackTransaction(); return false; } } setConnectionContext({ migrationsActive: false }); return true; } -const MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE = 1146; - async function setUpDBAndReturnVersion(): Promise { try { return await fetchDBVersion(); } catch (e) { if (e.errno !== MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE) { throw e; } await setupDB(); return await fetchDBVersion(); } } async function startTransaction(): Promise { const beginTxnQuery = SQL` START TRANSACTION; `; return dbQuery(beginTxnQuery); } async function commitTransaction(): Promise { const endTxnQuery = SQL` COMMIT; `; return dbQuery(endTxnQuery); } async function rollbackTransaction(): Promise { const rollbackTxnQuery = SQL` ROLLBACK; `; return dbQuery(rollbackTxnQuery); } export { migrate };