diff --git a/keyserver/src/session/cookies.js b/keyserver/src/session/cookies.js index a1e124140..2233788de 100644 --- a/keyserver/src/session/cookies.js +++ b/keyserver/src/session/cookies.js @@ -1,877 +1,898 @@ // @flow import crypto from 'crypto'; import type { $Response, $Request } from 'express'; import invariant from 'invariant'; import bcrypt from 'twin-bcrypt'; import url from 'url'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Shape } from 'lib/types/core.js'; import type { SignedIdentityKeysBlob } from 'lib/types/crypto-types.js'; import { isWebPlatform } from 'lib/types/device-types.js'; import type { Platform, PlatformDetails } from 'lib/types/device-types.js'; import type { CalendarQuery } from 'lib/types/entry-types.js'; import { type ServerSessionChange, cookieLifetime, cookieSources, type CookieSource, cookieTypes, sessionIdentifierTypes, type SessionIdentifierType, } from 'lib/types/session-types.js'; import type { SIWESocialProof } from 'lib/types/siwe-types.js'; import type { InitialClientSocketMessage } from 'lib/types/socket-types.js'; import type { UserInfo } from 'lib/types/user-types.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { Viewer } from './viewer.js'; import type { AnonymousViewerData, UserViewerData } from './viewer.js'; import createIDs from '../creators/id-creator.js'; import { createSession } from '../creators/session-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { clearDeviceToken } from '../updaters/device-token-updaters.js'; import { updateThreadMembers } from '../updaters/thread-updaters.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { type AppURLFacts, getAppURLFactsFromRequestURL, } from '../utils/urls.js'; function cookieIsExpired(lastUsed: number) { return lastUsed + cookieLifetime <= Date.now(); } type SessionParameterInfo = { isSocket: boolean, sessionID: ?string, sessionIdentifierType: SessionIdentifierType, ipAddress: string, userAgent: ?string, }; type FetchViewerResult = | { type: 'valid', viewer: Viewer } | InvalidFetchViewerResult; type InvalidFetchViewerResult = | { type: 'nonexistant', cookieName: ?string, cookieSource: ?CookieSource, sessionParameterInfo: SessionParameterInfo, } | { type: 'invalidated', cookieName: string, cookieID: string, cookieSource: CookieSource, sessionParameterInfo: SessionParameterInfo, platformDetails: ?PlatformDetails, deviceToken: ?string, }; async function fetchUserViewer( cookie: string, cookieSource: CookieSource, sessionParameterInfo: SessionParameterInfo, ): Promise { const [cookieID, cookiePassword] = cookie.split(':'); if (!cookieID || !cookiePassword) { return { type: 'nonexistant', cookieName: cookieTypes.USER, cookieSource, sessionParameterInfo, }; } const query = SQL` SELECT hash, user, last_used, platform, device_token, versions FROM cookies WHERE id = ${cookieID} AND user IS NOT NULL `; const [[result], allSessionInfo] = await Promise.all([ dbQuery(query), fetchSessionInfo(sessionParameterInfo, cookieID), ]); if (result.length === 0) { return { type: 'nonexistant', cookieName: cookieTypes.USER, cookieSource, sessionParameterInfo, }; } let sessionID = null, sessionInfo = null; if (allSessionInfo) { ({ sessionID, ...sessionInfo } = allSessionInfo); } const cookieRow = result[0]; let platformDetails = null; if (cookieRow.versions) { const versions = JSON.parse(cookieRow.versions); platformDetails = { platform: cookieRow.platform, codeVersion: versions.codeVersion, stateVersion: versions.stateVersion, }; } else { platformDetails = { platform: cookieRow.platform }; } const deviceToken = cookieRow.device_token; if ( !bcrypt.compareSync(cookiePassword, cookieRow.hash) || cookieIsExpired(cookieRow.last_used) ) { return { type: 'invalidated', cookieName: cookieTypes.USER, cookieID, cookieSource, sessionParameterInfo, platformDetails, deviceToken, }; } const userID = cookieRow.user.toString(); const viewer = new Viewer({ isSocket: sessionParameterInfo.isSocket, loggedIn: true, id: userID, platformDetails, deviceToken, userID, cookieSource, cookieID, cookiePassword, sessionIdentifierType: sessionParameterInfo.sessionIdentifierType, sessionID, sessionInfo, isScriptViewer: false, ipAddress: sessionParameterInfo.ipAddress, userAgent: sessionParameterInfo.userAgent, }); return { type: 'valid', viewer }; } async function fetchAnonymousViewer( cookie: string, cookieSource: CookieSource, sessionParameterInfo: SessionParameterInfo, ): Promise { const [cookieID, cookiePassword] = cookie.split(':'); if (!cookieID || !cookiePassword) { return { type: 'nonexistant', cookieName: cookieTypes.ANONYMOUS, cookieSource, sessionParameterInfo, }; } const query = SQL` SELECT last_used, hash, platform, device_token, versions FROM cookies WHERE id = ${cookieID} AND user IS NULL `; const [[result], allSessionInfo] = await Promise.all([ dbQuery(query), fetchSessionInfo(sessionParameterInfo, cookieID), ]); if (result.length === 0) { return { type: 'nonexistant', cookieName: cookieTypes.ANONYMOUS, cookieSource, sessionParameterInfo, }; } let sessionID = null, sessionInfo = null; if (allSessionInfo) { ({ sessionID, ...sessionInfo } = allSessionInfo); } const cookieRow = result[0]; let platformDetails = null; if (cookieRow.platform && cookieRow.versions) { const versions = JSON.parse(cookieRow.versions); platformDetails = { platform: cookieRow.platform, codeVersion: versions.codeVersion, stateVersion: versions.stateVersion, }; } else if (cookieRow.platform) { platformDetails = { platform: cookieRow.platform }; } const deviceToken = cookieRow.device_token; if ( !bcrypt.compareSync(cookiePassword, cookieRow.hash) || cookieIsExpired(cookieRow.last_used) ) { return { type: 'invalidated', cookieName: cookieTypes.ANONYMOUS, cookieID, cookieSource, sessionParameterInfo, platformDetails, deviceToken, }; } const viewer = new Viewer({ isSocket: sessionParameterInfo.isSocket, loggedIn: false, id: cookieID, platformDetails, deviceToken, cookieSource, cookieID, cookiePassword, sessionIdentifierType: sessionParameterInfo.sessionIdentifierType, sessionID, sessionInfo, isScriptViewer: false, ipAddress: sessionParameterInfo.ipAddress, userAgent: sessionParameterInfo.userAgent, }); return { type: 'valid', viewer }; } type SessionInfo = { +sessionID: ?string, +lastValidated: number, +lastUpdate: number, +calendarQuery: CalendarQuery, }; async function fetchSessionInfo( sessionParameterInfo: SessionParameterInfo, cookieID: string, ): Promise { const { sessionID } = sessionParameterInfo; const session = sessionID !== undefined ? sessionID : cookieID; if (!session) { return null; } const query = SQL` SELECT query, last_validated, last_update FROM sessions WHERE id = ${session} AND cookie = ${cookieID} `; const [result] = await dbQuery(query); if (result.length === 0) { return null; } return { sessionID, lastValidated: result[0].last_validated, lastUpdate: result[0].last_update, calendarQuery: JSON.parse(result[0].query), }; } // This function is meant to consume a cookie that has already been processed. // That means it doesn't have any logic to handle an invalid cookie, and it // doesn't update the cookie's last_used timestamp. async function fetchViewerFromCookieData( req: $Request, sessionParameterInfo: SessionParameterInfo, ): Promise { let viewerResult; const { user, anonymous } = req.cookies; if (user) { viewerResult = await fetchUserViewer( user, cookieSources.HEADER, sessionParameterInfo, ); } else if (anonymous) { viewerResult = await fetchAnonymousViewer( anonymous, cookieSources.HEADER, sessionParameterInfo, ); } else { return { type: 'nonexistant', cookieName: null, cookieSource: null, sessionParameterInfo, }; } // We protect against CSRF attacks by making sure that on web, // non-GET requests cannot use a bare cookie for session identification if (viewerResult.type === 'valid') { const { viewer } = viewerResult; invariant( req.method === 'GET' || viewer.sessionIdentifierType !== sessionIdentifierTypes.COOKIE_ID || !isWebPlatform(viewer.platform), 'non-GET request from web using sessionIdentifierTypes.COOKIE_ID', ); } return viewerResult; } async function fetchViewerFromRequestBody( body: mixed, sessionParameterInfo: SessionParameterInfo, ): Promise { if (!body || typeof body !== 'object') { return { type: 'nonexistant', cookieName: null, cookieSource: null, sessionParameterInfo, }; } const cookiePair = body.cookie; if (cookiePair === null || cookiePair === '') { return { type: 'nonexistant', cookieName: null, cookieSource: cookieSources.BODY, sessionParameterInfo, }; } if (!cookiePair || typeof cookiePair !== 'string') { return { type: 'nonexistant', cookieName: null, cookieSource: null, sessionParameterInfo, }; } const [type, cookie] = cookiePair.split('='); if (type === cookieTypes.USER && cookie) { return await fetchUserViewer( cookie, cookieSources.BODY, sessionParameterInfo, ); } else if (type === cookieTypes.ANONYMOUS && cookie) { return await fetchAnonymousViewer( cookie, cookieSources.BODY, sessionParameterInfo, ); } return { type: 'nonexistant', cookieName: null, cookieSource: null, sessionParameterInfo, }; } function getRequestIPAddress(req: $Request) { const { proxy } = getAppURLFactsFromRequestURL(req.originalUrl); let ipAddress; if (proxy === 'none') { ipAddress = req.socket.remoteAddress; } else if (proxy === 'apache') { ipAddress = req.get('X-Forwarded-For'); } invariant(ipAddress, 'could not determine requesting IP address'); return ipAddress; } function getSessionParameterInfoFromRequestBody( req: $Request, ): SessionParameterInfo { const body = (req.body: any); let sessionID = body.sessionID !== undefined || req.method !== 'GET' ? body.sessionID : null; if (sessionID === '') { sessionID = null; } const sessionIdentifierType = req.method === 'GET' || sessionID !== undefined ? sessionIdentifierTypes.BODY_SESSION_ID : sessionIdentifierTypes.COOKIE_ID; return { isSocket: false, sessionID, sessionIdentifierType, ipAddress: getRequestIPAddress(req), userAgent: req.get('User-Agent'), }; } async function fetchViewerForJSONRequest(req: $Request): Promise { assertSecureRequest(req); const sessionParameterInfo = getSessionParameterInfoFromRequestBody(req); let result = await fetchViewerFromRequestBody(req.body, sessionParameterInfo); if ( result.type === 'nonexistant' && (result.cookieSource === null || result.cookieSource === undefined) ) { result = await fetchViewerFromCookieData(req, sessionParameterInfo); } return await handleFetchViewerResult(result); } const webPlatformDetails = { platform: 'web' }; async function fetchViewerForHomeRequest(req: $Request): Promise { assertSecureRequest(req); const sessionParameterInfo = getSessionParameterInfoFromRequestBody(req); const result = await fetchViewerFromCookieData(req, sessionParameterInfo); return await handleFetchViewerResult(result, webPlatformDetails); } async function fetchViewerForSocket( req: $Request, clientMessage: InitialClientSocketMessage, ): Promise { assertSecureRequest(req); const { sessionIdentification } = clientMessage.payload; const { sessionID } = sessionIdentification; const sessionParameterInfo = { isSocket: true, sessionID, sessionIdentifierType: sessionID !== undefined ? sessionIdentifierTypes.BODY_SESSION_ID : sessionIdentifierTypes.COOKIE_ID, ipAddress: getRequestIPAddress(req), userAgent: req.get('User-Agent'), }; let result = await fetchViewerFromRequestBody( clientMessage.payload.sessionIdentification, sessionParameterInfo, ); if ( result.type === 'nonexistant' && (result.cookieSource === null || result.cookieSource === undefined) ) { result = await fetchViewerFromCookieData(req, sessionParameterInfo); } if (result.type === 'valid') { return result.viewer; } const promises = {}; if (result.cookieSource === cookieSources.BODY) { // We initialize a socket's Viewer after the WebSocket handshake, since to // properly initialize the Viewer we need a bunch of data, but that data // can't be sent until after the handshake. Consequently, by the time we // know that a cookie may be invalid, we are no longer communicating via // HTTP, and have no way to set a new cookie for HEADER (web) clients. const platformDetails = result.type === 'invalidated' ? result.platformDetails : null; const deviceToken = result.type === 'invalidated' ? result.deviceToken : null; promises.anonymousViewerData = createNewAnonymousCookie({ platformDetails, deviceToken, }); } if (result.type === 'invalidated') { promises.deleteCookie = deleteCookie(result.cookieID); } const { anonymousViewerData } = await promiseAll(promises); if (!anonymousViewerData) { return null; } return createViewerForInvalidFetchViewerResult(result, anonymousViewerData); } async function handleFetchViewerResult( result: FetchViewerResult, inputPlatformDetails?: PlatformDetails, ) { if (result.type === 'valid') { return result.viewer; } let platformDetails = inputPlatformDetails; if (!platformDetails && result.type === 'invalidated') { platformDetails = result.platformDetails; } const deviceToken = result.type === 'invalidated' ? result.deviceToken : null; const [anonymousViewerData] = await Promise.all([ createNewAnonymousCookie({ platformDetails, deviceToken }), result.type === 'invalidated' ? deleteCookie(result.cookieID) : null, ]); return createViewerForInvalidFetchViewerResult(result, anonymousViewerData); } function createViewerForInvalidFetchViewerResult( result: InvalidFetchViewerResult, anonymousViewerData: AnonymousViewerData, ): Viewer { // If a null cookie was specified in the request body, result.cookieSource // will still be BODY here. The only way it would be null or undefined here // is if there was no cookie specified in either the body or the header, in // which case we default to returning the new cookie in the response header. const cookieSource = result.cookieSource !== null && result.cookieSource !== undefined ? result.cookieSource : cookieSources.HEADER; const viewer = new Viewer({ ...anonymousViewerData, cookieSource, sessionIdentifierType: result.sessionParameterInfo.sessionIdentifierType, isSocket: result.sessionParameterInfo.isSocket, ipAddress: result.sessionParameterInfo.ipAddress, userAgent: result.sessionParameterInfo.userAgent, }); viewer.sessionChanged = true; // If cookieName is falsey, that tells us that there was no cookie specified // in the request, which means we can't be invalidating anything. if (result.cookieName) { viewer.cookieInvalidated = true; viewer.initialCookieName = result.cookieName; } return viewer; } function addSessionChangeInfoToResult( viewer: Viewer, res: $Response, result: Object, appURLFacts: AppURLFacts, ) { let threadInfos = {}, userInfos = {}; if (result.cookieChange) { ({ threadInfos, userInfos } = result.cookieChange); } let sessionChange; if (viewer.cookieInvalidated) { sessionChange = ({ cookieInvalidated: true, threadInfos, userInfos: (values(userInfos).map(a => a): UserInfo[]), currentUserInfo: { id: viewer.cookieID, anonymous: true, }, }: ServerSessionChange); } else { sessionChange = ({ cookieInvalidated: false, threadInfos, userInfos: (values(userInfos).map(a => a): UserInfo[]), }: ServerSessionChange); } if (viewer.cookieSource === cookieSources.BODY) { sessionChange.cookie = viewer.cookiePairString; } else { addActualHTTPCookie(viewer, res, appURLFacts); } if (viewer.sessionIdentifierType === sessionIdentifierTypes.BODY_SESSION_ID) { sessionChange.sessionID = viewer.sessionID ? viewer.sessionID : null; } result.cookieChange = sessionChange; } type AnonymousCookieCreationParams = Shape<{ +platformDetails: ?PlatformDetails, +deviceToken: ?string, }>; const defaultPlatformDetails = {}; // The result of this function should not be passed directly to the Viewer // constructor. Instead, it should be passed to viewer.setNewCookie. There are // several fields on AnonymousViewerData that are not set by this function: // sessionIdentifierType, cookieSource, ipAddress, and userAgent. These // parameters all depend on the initial request. If the result of this function // is passed to the Viewer constructor directly, the resultant Viewer object // will throw whenever anybody attempts to access the relevant properties. async function createNewAnonymousCookie( params: AnonymousCookieCreationParams, ): Promise { const { platformDetails, deviceToken } = params; const { platform, ...versions } = platformDetails || defaultPlatformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const time = Date.now(); const cookiePassword = crypto.randomBytes(32).toString('hex'); const cookieHash = bcrypt.hashSync(cookiePassword); const [[id]] = await Promise.all([ createIDs('cookies', 1), deviceToken ? clearDeviceToken(deviceToken) : undefined, ]); const cookieRow = [ id, cookieHash, null, platform, time, time, deviceToken, versionsString, ]; const query = SQL` INSERT INTO cookies(id, hash, user, platform, creation_time, last_used, device_token, versions) VALUES ${[cookieRow]} `; await dbQuery(query); return { loggedIn: false, id, platformDetails, deviceToken, cookieID: id, cookiePassword, sessionID: undefined, sessionInfo: null, cookieInsertedThisRequest: true, isScriptViewer: false, }; } type UserCookieCreationParams = { +platformDetails: PlatformDetails, +deviceToken?: ?string, +socialProof?: ?SIWESocialProof, +signedIdentityKeysBlob?: ?SignedIdentityKeysBlob, }; // The result of this function should never be passed directly to the Viewer // constructor. Instead, it should be passed to viewer.setNewCookie. There are // several fields on UserViewerData that are not set by this function: // sessionID, sessionIdentifierType, cookieSource, and ipAddress. These // parameters all depend on the initial request. If the result of this function // is passed to the Viewer constructor directly, the resultant Viewer object // will throw whenever anybody attempts to access the relevant properties. async function createNewUserCookie( userID: string, params: UserCookieCreationParams, ): Promise { const { platformDetails, deviceToken, socialProof, signedIdentityKeysBlob } = params; const { platform, ...versions } = platformDetails || defaultPlatformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const time = Date.now(); const cookiePassword = crypto.randomBytes(32).toString('hex'); const cookieHash = bcrypt.hashSync(cookiePassword); const [[cookieID]] = await Promise.all([ createIDs('cookies', 1), deviceToken ? clearDeviceToken(deviceToken) : undefined, ]); const cookieRow = [ cookieID, cookieHash, userID, platform, time, time, deviceToken, versionsString, JSON.stringify(socialProof), signedIdentityKeysBlob ? JSON.stringify(signedIdentityKeysBlob) : null, ]; const query = SQL` INSERT INTO cookies(id, hash, user, platform, creation_time, last_used, device_token, versions, social_proof, signed_identity_keys) VALUES ${[cookieRow]} `; await dbQuery(query); return { loggedIn: true, id: userID, platformDetails, deviceToken, userID, cookieID, sessionID: undefined, sessionInfo: null, cookiePassword, cookieInsertedThisRequest: true, isScriptViewer: false, }; } // This gets called after createNewUserCookie and from websiteResponder. If the // Viewer's sessionIdentifierType is COOKIE_ID then the cookieID is used as the // session identifier; otherwise, a new ID is created for the session. async function setNewSession( viewer: Viewer, calendarQuery: CalendarQuery, initialLastUpdate: number, ): Promise { if (viewer.sessionIdentifierType !== sessionIdentifierTypes.COOKIE_ID) { const [sessionID] = await createIDs('sessions', 1); viewer.setSessionID(sessionID); } await createSession(viewer, calendarQuery, initialLastUpdate); } async function extendCookieLifespan(cookieID: string) { const time = Date.now(); const query = SQL` UPDATE cookies SET last_used = ${time} WHERE id = ${cookieID} `; await dbQuery(query); } function addCookieToJSONResponse( viewer: Viewer, res: $Response, result: Object, expectCookieInvalidation: boolean, appURLFacts: AppURLFacts, ) { if (expectCookieInvalidation) { viewer.cookieInvalidated = false; } if (!viewer.getData().cookieInsertedThisRequest) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } if (viewer.sessionChanged) { addSessionChangeInfoToResult(viewer, res, result, appURLFacts); } else if (viewer.cookieSource !== cookieSources.BODY) { addActualHTTPCookie(viewer, res, appURLFacts); } } function addCookieToHomeResponse( viewer: Viewer, res: $Response, appURLFacts: AppURLFacts, ) { if (!viewer.getData().cookieInsertedThisRequest) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } addActualHTTPCookie(viewer, res, appURLFacts); } function getCookieOptions(appURLFacts: AppURLFacts) { const { baseDomain, basePath, https } = appURLFacts; const domainAsURL = new url.URL(baseDomain); return { domain: domainAsURL.hostname, path: basePath, httpOnly: true, secure: https, maxAge: cookieLifetime, sameSite: 'Strict', }; } function addActualHTTPCookie( viewer: Viewer, res: $Response, appURLFacts: AppURLFacts, ) { res.cookie( viewer.cookieName, viewer.cookieString, getCookieOptions(appURLFacts), ); if (viewer.cookieName !== viewer.initialCookieName) { res.clearCookie(viewer.initialCookieName, getCookieOptions(appURLFacts)); } } async function setCookieSignedIdentityKeysBlob( cookieID: string, signedIdentityKeysBlob: SignedIdentityKeysBlob, ) { const signedIdentityKeysStr = JSON.stringify(signedIdentityKeysBlob); const query = SQL` UPDATE cookies SET signed_identity_keys = ${signedIdentityKeysStr} WHERE id = ${cookieID} `; await dbQuery(query); } // Returns `true` if row with `id = cookieID` exists AND // `signed_identity_keys` is `NULL`. Otherwise, returns `false`. async function isCookieMissingSignedIdentityKeysBlob( cookieID: string, ): Promise { const query = SQL` SELECT signed_identity_keys FROM cookies WHERE id = ${cookieID} `; const [queryResult] = await dbQuery(query); return ( queryResult.length === 1 && queryResult[0].signed_identity_keys === null ); } +async function isCookieMissingOlmNotificationsSession( + viewer: Viewer, +): Promise { + if ( + !viewer.platformDetails || + viewer.platformDetails.platform !== 'ios' || + !viewer.platformDetails.codeVersion || + viewer.platformDetails.codeVersion < 222 + ) { + return false; + } + const query = SQL` + SELECT COUNT(*) AS count + FROM olm_sessions + WHERE cookie_id = ${viewer.cookieID} AND is_content = FALSE + `; + const [queryResult] = await dbQuery(query); + return queryResult[0].count === 0; +} + async function setCookiePlatform( viewer: Viewer, platform: Platform, ): Promise { const newPlatformDetails = { ...viewer.platformDetails, platform }; viewer.setPlatformDetails(newPlatformDetails); const query = SQL` UPDATE cookies SET platform = ${platform} WHERE id = ${viewer.cookieID} `; await dbQuery(query); } async function setCookiePlatformDetails( viewer: Viewer, platformDetails: PlatformDetails, ): Promise { if ( hasMinCodeVersion(platformDetails, { native: 70 }) && !hasMinCodeVersion(viewer.platformDetails, { native: 70 }) ) { await updateThreadMembers(viewer); } viewer.setPlatformDetails(platformDetails); const { platform, ...versions } = platformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const query = SQL` UPDATE cookies SET platform = ${platform}, versions = ${versionsString} WHERE id = ${viewer.cookieID} `; await dbQuery(query); } export { fetchViewerForJSONRequest, fetchViewerForHomeRequest, fetchViewerForSocket, createNewAnonymousCookie, createNewUserCookie, setNewSession, extendCookieLifespan, addCookieToJSONResponse, addCookieToHomeResponse, setCookieSignedIdentityKeysBlob, isCookieMissingSignedIdentityKeysBlob, setCookiePlatform, setCookiePlatformDetails, + isCookieMissingOlmNotificationsSession, }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index 93479ea25..ed1d8ed18 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,834 +1,847 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import type { Shape } from 'lib/types/core.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes } from 'lib/types/request-types.js'; import { cookieSources, sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { RedisSubscriber } from './redis.js'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { fetchEntryInfos } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { fetchCurrentUserInfo, fetchKnownUserInfos, } from '../fetchers/user-fetchers.js'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { fetchViewerForSocket, extendCookieLifespan, createNewAnonymousCookie, isCookieMissingSignedIdentityKeysBlob, + isCookieMissingOlmNotificationsSession, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ) => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); clientSocketMessage = checkInputValidator( clientSocketMessageInputValidator, messageObject, ); if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); if (!this.viewer) { // This indicates that the cookie was invalid, but the client is using // cookieSources.HEADER and thus can't accept a new cookie over // WebSockets. See comment under catch block for socket_deauthorized. throw new ServerError('socket_deauthorized'); } } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); await policiesValidator(viewer, baseLegalPolicies); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } for (const response of serverResponses) { this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (this.viewer) { // viewer should only be falsey for cookieSources.HEADER (web) // clients. Usually if the cookie is invalid we construct a new // anonymous Viewer with a new cookie, and then pass the cookie down // in the error. But we can't pass HTTP cookies in WebSocket messages. authErrorMessage.sessionChange = { cookie: this.viewer.cookiePairString, currentUserInfo: { id: this.viewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const promises = {}; promises.deleteCookie = deleteCookie(viewer.cookieID); if (viewer.cookieSource !== cookieSources.BODY) { promises.anonymousViewerData = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); } const { anonymousViewerData } = await promiseAll(promises); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (anonymousViewerData) { // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); authErrorMessage.sessionChange = { cookie: anonViewer.cookiePairString, currentUserInfo: { id: anonViewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState === 1) { const { viewer } = this; const validatedMessage = validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, message, ); this.ws.send(JSON.stringify(validatedMessage)); } }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; const isCookieMissingSignedIdentityKeysBlobPromise = isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); + const isCookieMissingOlmNotificationsSessionPromise = + isCookieMissingOlmNotificationsSession(viewer); + if (!sessionInitializationResult.sessionContinued) { const [threadsResult, entriesResult, currentUserInfo, knownUserInfos] = await Promise.all([ fetchThreadInfos(viewer), fetchEntryInfos(viewer, [calendarQuery]), fetchCurrentUserInfo(viewer), fetchKnownUserInfos(viewer), ]); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: threadsResult.threadInfos, currentUserInfo, rawEntryInfos: entriesResult.rawEntryInfos, userInfos: values(knownUserInfos), updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; const promises = {}; promises.deleteExpiredUpdates = deleteUpdatesBeforeTimeTargetingSession( viewer, oldUpdatesCurrentAsOf, ); promises.fetchUpdateResult = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); promises.sessionUpdate = commitSessionUpdate(viewer, sessionUpdate); const { fetchUpdateResult } = await promiseAll(promises); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } - const signedIdentityKeysBlobMissing = - await isCookieMissingSignedIdentityKeysBlobPromise; + const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = + await Promise.all([ + isCookieMissingSignedIdentityKeysBlobPromise, + isCookieMissingOlmNotificationsSessionPromise, + ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } + if (olmNotificationsSessionMissing) { + serverRequests.push({ + type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, + }); + } + if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, viewer.calendarQuery, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart() { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState( viewer, { status: 'state_check' }, viewer.calendarQuery, ); invariant(checkStateRequest, 'should be set'); this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection };