diff --git a/keyserver/flow-typed/npm/mysql_v2.x.x.js b/keyserver/flow-typed/npm/mysql_v2.x.x.js index dc37a6815..dfa516075 100644 --- a/keyserver/flow-typed/npm/mysql_v2.x.x.js +++ b/keyserver/flow-typed/npm/mysql_v2.x.x.js @@ -1,198 +1,197 @@ // flow-typed signature: c19cdbb02c8406091b029b937c58fb76 // flow-typed version: eaa5f54644/mysql_v2.x.x/flow_>=v0.104.x // TODO: Events on event emitters // TODO: Ssl structure type in ConnectionOptions // TODO: PoolNamespace internal structure // TODO: Packets internal structure declare module "mysql" { declare type TypeCastField = { db: string, table: string, name: string, type: string, length: number, string: () => string, buffer: () => Buffer, ... } declare type ConnectionOptions = { host?: string, port?: number, localAddress?: string, socketPath?: string, user: string, password: string, database?: string, charset?: string, timezone?: string, connectTimeout?: number, stringifyObjects?: boolean, insecureAuth?: boolean, typeCast?: boolean | ((field: TypeCastField, next: () => mixed) => any), queryFormat?: (query: string, values: ?mixed, timezone: string) => string, supportBigNumbers?: boolean, bigNumberStrings?: boolean, dateStrings?: boolean | Array, // Array form contains ids of packets for logging debug?: boolean | Array, trace?: boolean, multipleStatements?: boolean, flags?: string, ssl?: string | {...}, - ... }; declare type QueryOptions = { sql: string, typeCast?: boolean | ((field: TypeCastField, next: () => mixed) => any), // string form is a separator used to produce column names nestTables?: boolean | string, values?: Array, timeout?: number, ... } | string; declare type QueryResults = Array & { insertId?: string | number, affectedRows?: number, changedRows?: number, ... }; declare type QueryField = { name: string, type: string, length: number, table: string, db: string, ... }; declare class Query extends events$EventEmitter { // readableStreamOptions declared in Flow /lib/node.js stream(options?: readableStreamOptions): stream$Readable; } declare class Connection extends events$EventEmitter { threadId: number; connect(callback?: (error: ?Error) => *): void; release(): void; destroy(): void; end(callback?: (error: ?Error) => *): void; query( sql: QueryOptions, values?: Array | {...}, callback?: QueryCallback ): Query; query(sql: QueryOptions, callback?: QueryCallback): Query; changeUser( options: { user?: string, password?: string, charset?: string, database?: string, ... }, callback: (error: ?Error) => * ): void; beginTransaction(options: QueryOptions, callback: QueryCallback): void; beginTransaction(callback: QueryCallback): void; commit(options: QueryOptions, callback: QueryCallback): void; commit(callback: QueryCallback): void; rollback(options: QueryOptions, callback: QueryCallback): void; rollback(callback: QueryCallback): void; ping(options: QueryOptions, callback: QueryCallback): void; ping(callback: QueryCallback): void; escapeId(val: mixed, forbidQualified?: boolean): string; escape(val: mixed, stringifyObjects?: boolean, timeZone?: string): string; format(sql: string, valus: Array): string; } declare class Pool extends events$EventEmitter { getConnection( callback: (error: ?Error, connection?: Connection) => * ): void; end(callback?: (error: ?Error) => *): void; query( sql: QueryOptions, values?: Array, callback?: QueryCallback ): Query; query(sql: QueryOptions, callback?: QueryCallback): Query; escapeId(val: mixed, forbidQualified?: boolean): string; escape(val: mixed, stringifyObjects?: boolean, timeZone?: string): string; } - declare type PoolOptions = ConnectionOptions & { + declare type PoolOptions = { + ...ConnectionOptions, acquireTimeout?: number, connectionLimit?: number, waitForConnections?: boolean, queueLimit?: number, - ... }; declare type PoolClusterSelector = "RR" | "ORDER" | "RANDOM"; declare type PoolClusterOptions = { defaultSelector?: PoolClusterSelector, canRetry?: boolean, removeNodeErrorCount?: number, restoreNodeTimeout?: number, ... }; declare type QueryCallback = ( error: ?Error, results: QueryResults, fields?: Array ) => *; declare class PoolCluster extends events$EventEmitter { add(config: PoolOptions | string): void; add(name: string, config: PoolOptions | string): void; remove(name: string): void; getConnection( pattern: string | RegExp, selector: PoolClusterSelector, callback: (error: ?Error, connection?: Connection) => * ): void; getConnection( pattern: string | RegExp, callback: (error: ?Error, connection?: Connection) => * ): void; getConnection( callback: (error: ?Error, connection?: Connection) => * ): void; // Truth to be told, of returns not a Pool, but PoolNamespace instance but it is the same for the most part of(pattern: string | RegExp, selector?: PoolClusterSelector): Pool; end(callback?: (error: ?Error) => *): void; } declare function escapeId(val: mixed, forbidQualified?: boolean): string; declare function escape( val: mixed, stringifyObjects?: boolean, timeZone?: string ): string; declare function format(sql: string, valus: Array): string; declare function createConnection( options: ConnectionOptions | string ): Connection; declare function createPool(options: PoolOptions | string): Pool; declare function createPoolCluster(options?: PoolClusterOptions): PoolCluster; declare function raw(sql: string): { toSqlString: () => string, ... }; } diff --git a/keyserver/src/database/database.js b/keyserver/src/database/database.js index 051c8d8a3..c17b1824a 100644 --- a/keyserver/src/database/database.js +++ b/keyserver/src/database/database.js @@ -1,194 +1,196 @@ // @flow -import type { QueryResults } from 'mysql'; +import type { ConnectionOptions, QueryResults, PoolOptions } from 'mysql'; import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import SQL from 'sql-template-strings'; import { getScriptContext } from '../scripts/script-context'; import { connectionLimit, queryWarnTime } from './consts'; import { getDBConfig } from './db-config'; import DatabaseMonitor from './monitor'; import type { Pool, SQLOrString, SQLStatementType } from './types'; const SQLStatement: SQLStatementType = SQL.SQLStatement; let migrationConnection; async function getMigrationConnection() { if (migrationConnection) { return migrationConnection; } const dbConfig = await getDBConfig(); migrationConnection = await mysqlPromise.createConnection(dbConfig); return migrationConnection; } let pool, databaseMonitor; async function loadPool(): Promise { if (pool) { return pool; } const scriptContext = getScriptContext(); const dbConfig = await getDBConfig(); - pool = mysqlPromise.createPool({ + const options: PoolOptions = { ...dbConfig, connectionLimit, multipleStatements: !!( scriptContext && scriptContext.allowMultiStatementSQLQueries ), - }); + }; + pool = mysqlPromise.createPool(options); databaseMonitor = new DatabaseMonitor(pool); return pool; } function endPool() { pool?.end(); } function appendSQLArray( sql: SQLStatementType, sqlArray: $ReadOnlyArray, delimeter: SQLOrString, ): SQLStatementType { if (sqlArray.length === 0) { return sql; } const [first, ...rest] = sqlArray; sql.append(first); if (rest.length === 0) { return sql; } return rest.reduce( (prev: SQLStatementType, curr: SQLStatementType) => prev.append(delimeter).append(curr), sql, ); } function mergeConditions( conditions: $ReadOnlyArray, delimiter: SQLStatementType, ): SQLStatementType { const sql = SQL` (`; appendSQLArray(sql, conditions, delimiter); sql.append(SQL`) `); return sql; } function mergeAndConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` AND `); } function mergeOrConditions( andConditions: $ReadOnlyArray, ): SQLStatementType { return mergeConditions(andConditions, SQL` OR `); } // We use this fake result for dry runs function FakeSQLResult() { this.insertId = -1; } FakeSQLResult.prototype = Array.prototype; const fakeResult: QueryResults = (new FakeSQLResult(): any); const MYSQL_DEADLOCK_ERROR_CODE = 1213; type ConnectionContext = { +migrationsActive?: boolean, }; let connectionContext = { migrationsActive: false, }; function setConnectionContext(newContext: ConnectionContext) { connectionContext = { ...connectionContext, ...newContext, }; if (!connectionContext.migrationsActive && migrationConnection) { migrationConnection.end(); migrationConnection = undefined; } } type QueryOptions = { +triesLeft?: number, +multipleStatements?: boolean, }; async function dbQuery( statement: SQLStatementType, options?: QueryOptions, ): Promise { const triesLeft = options?.triesLeft ?? 2; const multipleStatements = options?.multipleStatements ?? false; let connection; if (connectionContext.migrationsActive) { connection = await getMigrationConnection(); } if (multipleStatements) { connection = await getMultipleStatementsConnection(); } if (!connection) { connection = await loadPool(); } const timeoutID = setTimeout( () => databaseMonitor.reportLaggingQuery(statement.sql), queryWarnTime, ); const scriptContext = getScriptContext(); try { const sql = statement.sql.trim(); if ( scriptContext && scriptContext.dryRun && (sql.startsWith('INSERT') || sql.startsWith('DELETE') || sql.startsWith('UPDATE')) ) { console.log(rawSQL(statement)); return ([fakeResult]: any); } return await connection.query(statement); } catch (e) { if (e.errno === MYSQL_DEADLOCK_ERROR_CODE && triesLeft > 0) { console.log('deadlock occurred, trying again', e); return await dbQuery(statement, { ...options, triesLeft: triesLeft - 1 }); } e.query = statement.sql; throw e; } finally { clearTimeout(timeoutID); if (multipleStatements) { connection.end(); } } } function rawSQL(statement: SQLStatementType): string { return mysql.format(statement.sql, statement.values); } async function getMultipleStatementsConnection() { const dbConfig = await getDBConfig(); - return await mysqlPromise.createConnection({ + const options: ConnectionOptions = { ...dbConfig, multipleStatements: true, - }); + }; + return await mysqlPromise.createConnection(options); } export { endPool, SQL, SQLStatement, appendSQLArray, mergeAndConditions, mergeOrConditions, setConnectionContext, dbQuery, rawSQL, };