diff --git a/keyserver/src/database/database.js b/keyserver/src/database/database.js index 76f0a950c..051c8d8a3 100644 --- a/keyserver/src/database/database.js +++ b/keyserver/src/database/database.js @@ -1,162 +1,194 @@ // @flow import type { QueryResults } from 'mysql'; import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import SQL from 'sql-template-strings'; import { getScriptContext } from '../scripts/script-context'; import { connectionLimit, queryWarnTime } from './consts'; import { getDBConfig } from './db-config'; import DatabaseMonitor from './monitor'; import type { Pool, SQLOrString, SQLStatementType } from './types'; const SQLStatement: SQLStatementType = SQL.SQLStatement; +let migrationConnection; +async function getMigrationConnection() { + if (migrationConnection) { + return migrationConnection; + } + const dbConfig = await getDBConfig(); + migrationConnection = await mysqlPromise.createConnection(dbConfig); + return migrationConnection; +} + let pool, databaseMonitor; async function loadPool(): Promise { if (pool) { return pool; } const scriptContext = getScriptContext(); const dbConfig = await getDBConfig(); pool = mysqlPromise.createPool({ ...dbConfig, connectionLimit, multipleStatements: !!( scriptContext && scriptContext.allowMultiStatementSQLQueries ), }); databaseMonitor = new DatabaseMonitor(pool); return pool; } function endPool() { pool?.end(); } function appendSQLArray( sql: SQLStatementType, sqlArray: $ReadOnlyArray, delimeter: SQLOrString, ): SQLStatementType { if (sqlArray.length === 0) { return sql; } const [first, ...rest] = sqlArray; sql.append(first); if (rest.length === 0) { return sql; } return rest.reduce( (prev: SQLStatementType, curr: SQLStatementType) => prev.append(delimeter).append(curr), sql, ); } function mergeConditions( conditions: $ReadOnlyArray, delimiter: SQLStatementType, ): SQLStatementType { const sql = SQL` (`; appendSQLArray(sql, conditions, delimiter); sql.append(SQL`) `); return sql; } function mergeAndConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` AND `); } function mergeOrConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` OR `); } // We use this fake result for dry runs function FakeSQLResult() { this.insertId = -1; } FakeSQLResult.prototype = Array.prototype; const fakeResult: QueryResults = (new FakeSQLResult(): any); const MYSQL_DEADLOCK_ERROR_CODE = 1213; +type ConnectionContext = { + +migrationsActive?: boolean, +}; +let connectionContext = { + migrationsActive: false, +}; + +function setConnectionContext(newContext: ConnectionContext) { + connectionContext = { + ...connectionContext, + ...newContext, + }; + if (!connectionContext.migrationsActive && migrationConnection) { + migrationConnection.end(); + migrationConnection = undefined; + } +} + type QueryOptions = { +triesLeft?: number, +multipleStatements?: boolean, }; async function dbQuery( statement: SQLStatementType, options?: QueryOptions, ): Promise { const triesLeft = options?.triesLeft ?? 2; const multipleStatements = options?.multipleStatements ?? false; let connection; + if (connectionContext.migrationsActive) { + connection = await getMigrationConnection(); + } if (multipleStatements) { connection = await getMultipleStatementsConnection(); } if (!connection) { connection = await loadPool(); } const timeoutID = setTimeout( () => databaseMonitor.reportLaggingQuery(statement.sql), queryWarnTime, ); const scriptContext = getScriptContext(); try { const sql = statement.sql.trim(); if ( scriptContext && scriptContext.dryRun && (sql.startsWith('INSERT') || sql.startsWith('DELETE') || sql.startsWith('UPDATE')) ) { console.log(rawSQL(statement)); return ([fakeResult]: any); } return await connection.query(statement); } catch (e) { if (e.errno === MYSQL_DEADLOCK_ERROR_CODE && triesLeft > 0) { console.log('deadlock occurred, trying again', e); return await dbQuery(statement, { ...options, triesLeft: triesLeft - 1 }); } e.query = statement.sql; throw e; } finally { clearTimeout(timeoutID); if (multipleStatements) { connection.end(); } } } function rawSQL(statement: SQLStatementType): string { return mysql.format(statement.sql, statement.values); } async function getMultipleStatementsConnection() { const dbConfig = await getDBConfig(); return await mysqlPromise.createConnection({ ...dbConfig, multipleStatements: true, }); } export { endPool, SQL, SQLStatement, appendSQLArray, mergeAndConditions, mergeOrConditions, + setConnectionContext, dbQuery, rawSQL, }; diff --git a/keyserver/src/database/migrations.js b/keyserver/src/database/migrations.js index 500a3ad77..ccdb3edb4 100644 --- a/keyserver/src/database/migrations.js +++ b/keyserver/src/database/migrations.js @@ -1,86 +1,88 @@ // @flow import type { QueryResults } from 'mysql'; import { isDev } from 'lib/utils/dev-utils'; import { getMessageForException } from 'lib/utils/errors'; import sleep from 'lib/utils/sleep'; -import { dbQuery, SQL } from './database'; +import { dbQuery, SQL, setConnectionContext } from './database'; import { fetchDBVersion, updateDBVersion } from './db-version'; import { migrations } from './migration-config'; import { setupDB } from './setup-db'; async function migrate(): Promise { if (isDev) { await sleep(5000); } let dbVersion = null; try { dbVersion = await setUpDBAndReturnVersion(); console.log(`(node:${process.pid}) DB version: ${dbVersion}`); } catch (e) { const dbVersionExceptionMessage = String(getMessageForException(e)); console.error(`(node:${process.pid}) ${dbVersionExceptionMessage}`); return false; } + setConnectionContext({ migrationsActive: true }); for (const [idx, migration] of migrations.entries()) { if (idx <= dbVersion) { continue; } try { await startTransaction(); await migration(); await updateDBVersion(idx); await commitTransaction(); console.log(`(node:${process.pid}) migration ${idx} succeeded.`); } catch (e) { const transactionExceptionMessage = String(getMessageForException(e)); console.error(`(node:${process.pid}) migration ${idx} failed.`); console.error(transactionExceptionMessage); await rollbackTransaction(); return false; } } + setConnectionContext({ migrationsActive: false }); return true; } const MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE = 1146; async function setUpDBAndReturnVersion(): Promise { try { return await fetchDBVersion(); } catch (e) { if (e.errno !== MYSQL_TABLE_DOESNT_EXIST_ERROR_CODE) { throw e; } await setupDB(); return await fetchDBVersion(); } } async function startTransaction(): Promise { const beginTxnQuery = SQL` START TRANSACTION; `; return dbQuery(beginTxnQuery); } async function commitTransaction(): Promise { const endTxnQuery = SQL` COMMIT; `; return dbQuery(endTxnQuery); } async function rollbackTransaction(): Promise { const rollbackTxnQuery = SQL` ROLLBACK; `; return dbQuery(rollbackTxnQuery); } export { migrate };