diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index 5088f3afd..3f78839d7 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,839 +1,880 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; +import { isStaff } from 'lib/shared/staff-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; +import { + hasMinCodeVersion, + NEXT_CODE_VERSION, +} from 'lib/shared/version-utils.js'; import type { Shape } from 'lib/types/core.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes } from 'lib/types/request-types.js'; import { cookieSources, sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { RedisSubscriber } from './redis.js'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { fetchViewerForSocket, extendCookieLifespan, createNewAnonymousCookie, isCookieMissingSignedIdentityKeysBlob, isCookieMissingOlmNotificationsSession, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; +import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; +const minVersionsForCompression = { + native: NEXT_CODE_VERSION, + web: NEXT_CODE_VERSION, +}; + class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ) => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); clientSocketMessage = checkInputValidator( clientSocketMessageInputValidator, messageObject, ); if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); if (!this.viewer) { // This indicates that the cookie was invalid, but the client is using // cookieSources.HEADER and thus can't accept a new cookie over // WebSockets. See comment under catch block for socket_deauthorized. throw new ServerError('socket_deauthorized'); } } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); await policiesValidator(viewer, baseLegalPolicies); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } for (const response of serverResponses) { this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (this.viewer) { // viewer should only be falsey for cookieSources.HEADER (web) // clients. Usually if the cookie is invalid we construct a new // anonymous Viewer with a new cookie, and then pass the cookie down // in the error. But we can't pass HTTP cookies in WebSocket messages. authErrorMessage.sessionChange = { cookie: this.viewer.cookiePairString, currentUserInfo: { id: this.viewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const promises = {}; promises.deleteCookie = deleteCookie(viewer.cookieID); if (viewer.cookieSource !== cookieSources.BODY) { promises.anonymousViewerData = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); } const { anonymousViewerData } = await promiseAll(promises); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (anonymousViewerData) { // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); authErrorMessage.sessionChange = { cookie: anonViewer.cookiePairString, currentUserInfo: { id: anonViewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); - if (this.ws.readyState === 1) { - const { viewer } = this; - const validatedMessage = validateOutput( - viewer?.platformDetails, - serverServerSocketMessageValidator, - message, - ); - this.ws.send(JSON.stringify(validatedMessage)); + if (this.ws.readyState !== 1) { + return; } + + const { viewer } = this; + const validatedMessage = validateOutput( + viewer?.platformDetails, + serverServerSocketMessageValidator, + message, + ); + const stringMessage = JSON.stringify(validatedMessage); + + if ( + !viewer?.platformDetails || + !hasMinCodeVersion(viewer.platformDetails, minVersionsForCompression) || + !isStaff(viewer.id) + ) { + this.ws.send(stringMessage); + return; + } + + const compressionResult = compressMessage(stringMessage); + if (!compressionResult.compressed) { + this.ws.send(stringMessage); + return; + } + + const compressedMessage = { + type: serverSocketMessageTypes.COMPRESSED_MESSAGE, + payload: compressionResult.result, + }; + + const validatedCompressedMessage = validateOutput( + viewer?.platformDetails, + serverServerSocketMessageValidator, + compressedMessage, + ); + const stringCompressedMessage = JSON.stringify(validatedCompressedMessage); + this.ws.send(stringCompressedMessage); }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; const isCookieMissingSignedIdentityKeysBlobPromise = isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); const isCookieMissingOlmNotificationsSessionPromise = isCookieMissingOlmNotificationsSession(viewer); if (!sessionInitializationResult.sessionContinued) { const promises = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), ]), ); const results = await promiseAll(promises); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: results.threadInfos, currentUserInfo: results.currentUserInfo, rawEntryInfos: results.entryInfos, userInfos: results.userInfos, updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; const promises = {}; promises.deleteExpiredUpdates = deleteUpdatesBeforeTimeTargetingSession( viewer, oldUpdatesCurrentAsOf, ); promises.fetchUpdateResult = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); promises.sessionUpdate = commitSessionUpdate(viewer, sessionUpdate); const { fetchUpdateResult } = await promiseAll(promises); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = await Promise.all([ isCookieMissingSignedIdentityKeysBlobPromise, isCookieMissingOlmNotificationsSessionPromise, ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } if (olmNotificationsSessionMissing) { serverRequests.push({ type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart() { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState(viewer, { status: 'state_check', }); invariant(checkStateRequest, 'should be set'); this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/keyserver/src/utils/compress.js b/keyserver/src/utils/compress.js new file mode 100644 index 000000000..4e72a779a --- /dev/null +++ b/keyserver/src/utils/compress.js @@ -0,0 +1,31 @@ +// @flow + +import zlib from 'zlib'; + +import type { CompressedData } from 'lib/types/compression-types.js'; + +const brotliOptions = { + params: { + [zlib.constants.BROTLI_PARAM_MODE]: zlib.constants.BROTLI_MODE_TEXT, + }, +}; +const minimumSizeForCompression = 4096; // bytes + +type CompressionResult = + | { +compressed: true, +result: CompressedData } + | { +compressed: false, +result: string }; +function compressMessage(message: string): CompressionResult { + const bytesInMessage = new Blob([message]).size; + if (bytesInMessage < minimumSizeForCompression) { + return { compressed: false, result: message }; + } + const brotliResult = zlib.brotliCompressSync(message, brotliOptions); + const base64Encoded = brotliResult.toString('base64'); + const result = { + algo: 'brotli+base64', + data: base64Encoded, + }; + return { compressed: true, result }; +} + +export { compressMessage }; diff --git a/lib/socket/socket.react.js b/lib/socket/socket.react.js index bd0c9b557..deef11bb4 100644 --- a/lib/socket/socket.react.js +++ b/lib/socket/socket.react.js @@ -1,787 +1,803 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; import _throttle from 'lodash/throttle.js'; import * as React from 'react'; import ActivityHandler from './activity-handler.react.js'; import APIRequestHandler from './api-request-handler.react.js'; import CalendarQueryHandler from './calendar-query-handler.react.js'; import { InflightRequests } from './inflight-requests.js'; import MessageHandler from './message-handler.react.js'; import ReportHandler from './report-handler.react.js'; import RequestResponseHandler from './request-response-handler.react.js'; import UpdateHandler from './update-handler.react.js'; import { updateActivityActionTypes } from '../actions/activity-actions.js'; import { updateLastCommunicatedPlatformDetailsActionType } from '../actions/device-actions.js'; import { logOutActionTypes } from '../actions/user-actions.js'; import { unsupervisedBackgroundActionType } from '../reducers/lifecycle-state-reducer.js'; import { pingFrequency, serverRequestSocketTimeout, clientRequestVisualTimeout, clientRequestSocketTimeout, } from '../shared/timeouts.js'; import { logInActionSources, type LogOutResult, } from '../types/account-types.js'; +import type { CompressedData } from '../types/compression-types.js'; import { isWebPlatform, type PlatformDetails } from '../types/device-types.js'; import type { CalendarQuery } from '../types/entry-types.js'; import { forcePolicyAcknowledgmentActionType } from '../types/policy-types.js'; import type { Dispatch } from '../types/redux-types.js'; import { serverRequestTypes, type ClientClientResponse, type ClientServerRequest, } from '../types/request-types.js'; import { type SessionState, type SessionIdentification, type PreRequestUserState, } from '../types/session-types.js'; import { clientSocketMessageTypes, type ClientClientSocketMessage, serverSocketMessageTypes, type ClientServerSocketMessage, stateSyncPayloadTypes, fullStateSyncActionType, incrementalStateSyncActionType, updateConnectionStatusActionType, type ConnectionInfo, type ClientInitialClientSocketMessage, type ClientResponsesClientSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, type ClientSocketMessageWithoutID, type SocketListener, type ConnectionStatus, setLateResponseActionType, type CommTransportLayer, } from '../types/socket-types.js'; import { actionLogger } from '../utils/action-logger.js'; import type { DispatchActionPromise } from '../utils/action-utils.js'; import { setNewSessionActionType, fetchNewCookieFromNativeCredentials, } from '../utils/action-utils.js'; import { getConfig } from '../utils/config.js'; import { ServerError, SocketTimeout, SocketOffline } from '../utils/errors.js'; import { promiseAll } from '../utils/promises.js'; import sleep from '../utils/sleep.js'; const remainingTimeAfterVisualTimeout = clientRequestSocketTimeout - clientRequestVisualTimeout; export type BaseSocketProps = { +detectUnsupervisedBackgroundRef?: ( detectUnsupervisedBackground: (alreadyClosed: boolean) => boolean, ) => void, }; type Props = { ...BaseSocketProps, // Redux state +active: boolean, +openSocket: () => CommTransportLayer, +getClientResponses: ( activeServerRequests: $ReadOnlyArray, ) => Promise<$ReadOnlyArray>, +activeThread: ?string, +sessionStateFunc: () => SessionState, +sessionIdentification: SessionIdentification, +cookie: ?string, +urlPrefix: string, +connection: ConnectionInfo, +currentCalendarQuery: () => CalendarQuery, +canSendReports: boolean, +frozen: boolean, +preRequestUserState: PreRequestUserState, +noDataAfterPolicyAcknowledgment?: boolean, +lastCommunicatedPlatformDetails: ?PlatformDetails, + +decompressSocketMessage: CompressedData => string, // Redux dispatch functions +dispatch: Dispatch, +dispatchActionPromise: DispatchActionPromise, // async functions that hit server APIs +logOut: (preRequestUserState: PreRequestUserState) => Promise, +socketCrashLoopRecovery?: () => Promise, // keyserver olm sessions specific props +getInitialNotificationsEncryptedMessage?: () => Promise, }; type State = { +inflightRequests: ?InflightRequests, }; class Socket extends React.PureComponent { state: State = { inflightRequests: null, }; socket: ?CommTransportLayer; nextClientMessageID: number = 0; listeners: Set = new Set(); pingTimeoutID: ?TimeoutID; messageLastReceived: ?number; reopenConnectionAfterClosing: boolean = false; invalidationRecoveryInProgress: boolean = false; initializedWithUserState: ?PreRequestUserState; failuresAfterPolicyAcknowledgment: number = 0; openSocket(newStatus: ConnectionStatus) { if ( this.props.frozen || (!isWebPlatform(getConfig().platformDetails.platform) && (!this.props.cookie || !this.props.cookie.startsWith('user='))) ) { return; } if (this.socket) { const { status } = this.props.connection; if (status === 'forcedDisconnecting') { this.reopenConnectionAfterClosing = true; return; } else if (status === 'disconnecting' && this.socket.readyState === 1) { this.markSocketInitialized(); return; } else if ( status === 'connected' || status === 'connecting' || status === 'reconnecting' ) { return; } if (this.socket.readyState < 2) { this.socket.close(); console.log(`this.socket seems open, but Redux thinks it's ${status}`); } } this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: newStatus }, }); const socket = this.props.openSocket(); const openObject = {}; socket.onopen = () => { if (this.socket === socket) { this.initializeSocket(); openObject.initializeMessageSent = true; } }; socket.onmessage = this.receiveMessage; socket.onclose = () => { if (this.socket === socket) { this.onClose(); } }; this.socket = socket; (async () => { await sleep(clientRequestVisualTimeout); if (this.socket !== socket || openObject.initializeMessageSent) { return; } this.setLateResponse(-1, true); await sleep(remainingTimeAfterVisualTimeout); if (this.socket !== socket || openObject.initializeMessageSent) { return; } this.finishClosingSocket(); })(); this.setState({ inflightRequests: new InflightRequests({ timeout: () => { if (this.socket === socket) { this.finishClosingSocket(); } }, setLateResponse: (messageID: number, isLate: boolean) => { if (this.socket === socket) { this.setLateResponse(messageID, isLate); } }, }), }); } markSocketInitialized() { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'connected' }, }); this.resetPing(); } closeSocket( // This param is a hack. When closing a socket there is a race between this // function and the one to propagate the activity update. We make sure that // the activity update wins the race by passing in this param. activityUpdatePending: boolean, ) { const { status } = this.props.connection; if (status === 'disconnected') { return; } else if (status === 'disconnecting' || status === 'forcedDisconnecting') { this.reopenConnectionAfterClosing = false; return; } this.stopPing(); this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnecting' }, }); if (!activityUpdatePending) { this.finishClosingSocket(); } } forceCloseSocket() { this.stopPing(); const { status } = this.props.connection; if (status !== 'forcedDisconnecting' && status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'forcedDisconnecting' }, }); } this.finishClosingSocket(); } finishClosingSocket(receivedResponseTo?: ?number) { const { inflightRequests } = this.state; if ( inflightRequests && !inflightRequests.allRequestsResolvedExcept(receivedResponseTo) ) { return; } if (this.socket && this.socket.readyState < 2) { // If it's not closing already, close it this.socket.close(); } this.socket = null; this.stopPing(); this.setState({ inflightRequests: null }); if (this.props.connection.status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnected' }, }); } if (this.reopenConnectionAfterClosing) { this.reopenConnectionAfterClosing = false; if (this.props.active) { this.openSocket('connecting'); } } } reconnect: $Call void, number> = _throttle( () => this.openSocket('reconnecting'), 2000, ); componentDidMount() { if (this.props.detectUnsupervisedBackgroundRef) { this.props.detectUnsupervisedBackgroundRef( this.detectUnsupervisedBackground, ); } if (this.props.active) { this.openSocket('connecting'); } } componentWillUnmount() { this.closeSocket(false); this.reconnect.cancel(); } componentDidUpdate(prevProps: Props) { if (this.props.active && !prevProps.active) { this.openSocket('connecting'); } else if (!this.props.active && prevProps.active) { this.closeSocket(!!prevProps.activeThread); } else if ( this.props.active && prevProps.openSocket !== this.props.openSocket ) { // This case happens when the baseURL/urlPrefix is changed this.reopenConnectionAfterClosing = true; this.forceCloseSocket(); } else if ( this.props.active && this.props.connection.status === 'disconnected' && prevProps.connection.status !== 'disconnected' && !this.invalidationRecoveryInProgress ) { this.reconnect(); } } render(): React.Node { // It's important that APIRequestHandler get rendered first here. This is so // that it is registered with Redux first, so that its componentDidUpdate // processes before the other Handlers. This allows APIRequestHandler to // register itself with action-utils before other Handlers call // dispatchActionPromise in response to the componentDidUpdate triggered by // the same Redux change (state.connection.status). return ( ); } sendMessageWithoutID: (message: ClientSocketMessageWithoutID) => number = message => { const id = this.nextClientMessageID++; // These conditions all do the same thing and the runtime checks are only // necessary for Flow if (message.type === clientSocketMessageTypes.INITIAL) { this.sendMessage( ({ ...message, id }: ClientInitialClientSocketMessage), ); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.sendMessage( ({ ...message, id }: ClientResponsesClientSocketMessage), ); } else if (message.type === clientSocketMessageTypes.PING) { this.sendMessage(({ ...message, id }: PingClientSocketMessage)); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.sendMessage(({ ...message, id }: AckUpdatesClientSocketMessage)); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.sendMessage(({ ...message, id }: APIRequestClientSocketMessage)); } return id; }; sendMessage(message: ClientClientSocketMessage) { const socket = this.socket; invariant(socket, 'should be set'); socket.send(JSON.stringify(message)); } - static messageFromEvent(event: MessageEvent): ?ClientServerSocketMessage { + messageFromEvent(event: MessageEvent): ?ClientServerSocketMessage { if (typeof event.data !== 'string') { console.log('socket received a non-string message'); return null; } + + let rawMessage; + try { + rawMessage = JSON.parse(event.data); + } catch (e) { + console.log(e); + return null; + } + + if (rawMessage.type !== serverSocketMessageTypes.COMPRESSED_MESSAGE) { + return rawMessage; + } + + const result = this.props.decompressSocketMessage(rawMessage.payload); try { - return JSON.parse(event.data); + return JSON.parse(result); } catch (e) { console.log(e); return null; } } receiveMessage: (event: MessageEvent) => Promise = async event => { - const message = Socket.messageFromEvent(event); + const message = this.messageFromEvent(event); if (!message) { return; } this.failuresAfterPolicyAcknowledgment = 0; const { inflightRequests } = this.state; if (!inflightRequests) { // inflightRequests can be falsey here if we receive a message after we've // begun shutting down the socket. It's possible for a React Native // WebSocket to deliver a message even after close() is called on it. In // this case the message is probably a PONG, which we can safely ignore. // If it's not a PONG, it has to be something server-initiated (like // UPDATES or MESSAGES), since InflightRequests.allRequestsResolvedExcept // will wait for all responses to client-initiated requests to be // delivered before closing a socket. UPDATES and MESSAGES are both // checkpointed on the client, so should be okay to just ignore here and // redownload them later, probably in an incremental STATE_SYNC. return; } // If we receive any message, that indicates that our connection is healthy, // so we can reset the ping timeout. this.resetPing(); inflightRequests.resolveRequestsForMessage(message); const { status } = this.props.connection; if (status === 'disconnecting' || status === 'forcedDisconnecting') { this.finishClosingSocket( // We do this for Flow message.responseTo !== undefined ? message.responseTo : null, ); } for (const listener of this.listeners) { listener(message); } if (message.type === serverSocketMessageTypes.ERROR) { const { message: errorMessage, payload } = message; if (payload) { console.log(`socket sent error ${errorMessage} with payload`, payload); } else { console.log(`socket sent error ${errorMessage}`); } if (errorMessage === 'policies_not_accepted') { this.props.dispatch({ type: forcePolicyAcknowledgmentActionType, payload, }); } } else if (message.type === serverSocketMessageTypes.AUTH_ERROR) { const { sessionChange } = message; const cookie = sessionChange ? sessionChange.cookie : this.props.cookie; this.invalidationRecoveryInProgress = true; const recoverySessionChange = await fetchNewCookieFromNativeCredentials( this.props.dispatch, cookie, this.props.urlPrefix, logInActionSources.socketAuthErrorResolutionAttempt, this.props.getInitialNotificationsEncryptedMessage, ); if (!recoverySessionChange && sessionChange) { // This should only happen in the cookieSources.BODY (native) case when // the resolution attempt failed const { cookie: newerCookie, currentUserInfo } = sessionChange; this.props.dispatch({ type: setNewSessionActionType, payload: { sessionChange: { cookieInvalidated: true, currentUserInfo, cookie: newerCookie, }, preRequestUserState: this.initializedWithUserState, error: null, logInActionSource: logInActionSources.socketAuthErrorResolutionAttempt, }, }); } else if (!recoverySessionChange) { this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } this.invalidationRecoveryInProgress = false; } }; addListener: (listener: SocketListener) => void = listener => { this.listeners.add(listener); }; removeListener: (listener: SocketListener) => void = listener => { this.listeners.delete(listener); }; onClose: () => void = () => { const { status } = this.props.connection; this.socket = null; this.stopPing(); if (this.state.inflightRequests) { this.state.inflightRequests.rejectAll(new Error('socket closed')); this.setState({ inflightRequests: null }); } const handled = this.detectUnsupervisedBackground(true); if (!handled && status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnected' }, }); } }; async sendInitialMessage() { const { inflightRequests } = this.state; invariant( inflightRequests, 'inflightRequests falsey inside sendInitialMessage', ); const messageID = this.nextClientMessageID++; const promises = {}; const shouldSendInitialPlatformDetails = !_isEqual( this.props.lastCommunicatedPlatformDetails, )(getConfig().platformDetails); const clientResponses = []; if (shouldSendInitialPlatformDetails) { clientResponses.push({ type: serverRequestTypes.PLATFORM_DETAILS, platformDetails: getConfig().platformDetails, }); } const { queuedActivityUpdates } = this.props.connection; if (queuedActivityUpdates.length > 0) { clientResponses.push({ type: serverRequestTypes.INITIAL_ACTIVITY_UPDATES, activityUpdates: queuedActivityUpdates, }); promises.activityUpdateMessage = inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, ); } const sessionState = this.props.sessionStateFunc(); const { sessionIdentification } = this.props; const initialMessage = { type: clientSocketMessageTypes.INITIAL, id: messageID, payload: { clientResponses, sessionState, sessionIdentification, }, }; this.initializedWithUserState = this.props.preRequestUserState; this.sendMessage(initialMessage); promises.stateSyncMessage = inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.STATE_SYNC, ); const { stateSyncMessage, activityUpdateMessage } = await promiseAll( promises, ); if (shouldSendInitialPlatformDetails) { this.props.dispatch({ type: updateLastCommunicatedPlatformDetailsActionType, payload: getConfig().platformDetails, }); } if (activityUpdateMessage) { this.props.dispatch({ type: updateActivityActionTypes.success, payload: { activityUpdates: queuedActivityUpdates, result: activityUpdateMessage.payload, }, }); } if (stateSyncMessage.payload.type === stateSyncPayloadTypes.FULL) { const { sessionID, type, ...actionPayload } = stateSyncMessage.payload; this.props.dispatch({ type: fullStateSyncActionType, payload: { ...actionPayload, calendarQuery: sessionState.calendarQuery, }, }); if (sessionID !== null && sessionID !== undefined) { invariant( this.initializedWithUserState, 'initializedWithUserState should be set when state sync received', ); this.props.dispatch({ type: setNewSessionActionType, payload: { sessionChange: { cookieInvalidated: false, sessionID }, preRequestUserState: this.initializedWithUserState, error: null, logInActionSource: undefined, }, }); } } else { const { type, ...actionPayload } = stateSyncMessage.payload; this.props.dispatch({ type: incrementalStateSyncActionType, payload: { ...actionPayload, calendarQuery: sessionState.calendarQuery, }, }); } const currentAsOf = stateSyncMessage.payload.type === stateSyncPayloadTypes.FULL ? stateSyncMessage.payload.updatesCurrentAsOf : stateSyncMessage.payload.updatesResult.currentAsOf; this.sendMessageWithoutID({ type: clientSocketMessageTypes.ACK_UPDATES, payload: { currentAsOf }, }); this.markSocketInitialized(); } initializeSocket: (retriesLeft?: number) => Promise = async ( retriesLeft = 1, ) => { try { await this.sendInitialMessage(); } catch (e) { if (this.props.noDataAfterPolicyAcknowledgment) { this.failuresAfterPolicyAcknowledgment++; } else { this.failuresAfterPolicyAcknowledgment = 0; } if ( this.failuresAfterPolicyAcknowledgment >= 2 && this.props.socketCrashLoopRecovery ) { this.failuresAfterPolicyAcknowledgment = 0; try { await this.props.socketCrashLoopRecovery(); } catch (error) { console.log(error); this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } return; } console.log(e); const { status } = this.props.connection; if ( e instanceof SocketTimeout || e instanceof SocketOffline || (status !== 'connecting' && status !== 'reconnecting') ) { // This indicates that the socket will be closed. Do nothing, since the // connection status update will trigger a reconnect. } else if ( retriesLeft === 0 || (e instanceof ServerError && e.message !== 'unknown_error') ) { if (e.message === 'not_logged_in') { this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } else if (this.socket) { this.socket.close(); } } else { await this.initializeSocket(retriesLeft - 1); } } }; stopPing() { if (this.pingTimeoutID) { clearTimeout(this.pingTimeoutID); this.pingTimeoutID = null; } } resetPing() { this.stopPing(); const socket = this.socket; this.messageLastReceived = Date.now(); this.pingTimeoutID = setTimeout(() => { if (this.socket === socket) { this.sendPing(); } }, pingFrequency); } async sendPing() { if (this.props.connection.status !== 'connected') { // This generally shouldn't happen because anything that changes the // connection status should call stopPing(), but it's good to make sure return; } const messageID = this.sendMessageWithoutID({ type: clientSocketMessageTypes.PING, }); try { invariant( this.state.inflightRequests, 'inflightRequests falsey inside sendPing', ); await this.state.inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.PONG, ); } catch (e) {} } setLateResponse: (messageID: number, isLate: boolean) => void = ( messageID, isLate, ) => { this.props.dispatch({ type: setLateResponseActionType, payload: { messageID, isLate }, }); }; cleanUpServerTerminatedSocket() { if (this.socket && this.socket.readyState < 2) { this.socket.close(); } else { this.onClose(); } } detectUnsupervisedBackground: (alreadyClosed: boolean) => boolean = alreadyClosed => { // On native, sometimes the app is backgrounded without the proper // callbacks getting triggered. This leaves us in an incorrect state for // two reasons: // (1) The connection is still considered to be active, causing API // requests to be processed via socket and failing. // (2) We rely on flipping foreground state in Redux to detect activity // changes, and thus won't think we need to update activity. if ( this.props.connection.status !== 'connected' || !this.messageLastReceived || this.messageLastReceived + serverRequestSocketTimeout >= Date.now() || (actionLogger.mostRecentActionTime && actionLogger.mostRecentActionTime + 3000 < Date.now()) ) { return false; } if (!alreadyClosed) { this.cleanUpServerTerminatedSocket(); } this.props.dispatch({ type: unsupervisedBackgroundActionType, payload: null, }); return true; }; } export default Socket; diff --git a/lib/types/compression-types.js b/lib/types/compression-types.js new file mode 100644 index 000000000..4435ea347 --- /dev/null +++ b/lib/types/compression-types.js @@ -0,0 +1,16 @@ +// @flow + +import t, { type TInterface } from 'tcomb'; + +import { tShape, tString } from '../utils/validation-utils.js'; + +export type CompressedData = { + +algo: 'brotli+base64', + +data: string, +}; + +export const compressedDataValidator: TInterface = + tShape({ + algo: tString('brotli+base64'), + data: t.String, + }); diff --git a/lib/types/socket-types.js b/lib/types/socket-types.js index 7e9a8d8b0..3796b7128 100644 --- a/lib/types/socket-types.js +++ b/lib/types/socket-types.js @@ -1,514 +1,533 @@ // @flow import invariant from 'invariant'; import t, { type TInterface, type TUnion } from 'tcomb'; import { type ActivityUpdate, activityUpdateValidator, type UpdateActivityResult, updateActivityResultValidator, } from './activity-types.js'; +import { + type CompressedData, + compressedDataValidator, +} from './compression-types.js'; import type { APIRequest } from './endpoints.js'; import { type RawEntryInfo, rawEntryInfoValidator, type CalendarQuery, } from './entry-types.js'; import { type MessagesResponse, messagesResponseValidator, type NewMessagesPayload, newMessagesPayloadValidator, } from './message-types.js'; import { type ServerServerRequest, serverServerRequestValidator, type ClientServerRequest, type ClientResponse, type ClientClientResponse, } from './request-types.js'; import type { SessionState, SessionIdentification } from './session-types.js'; import { type RawThreadInfo, rawThreadInfoValidator } from './thread-types.js'; import { type ClientUpdatesResult, type ClientUpdatesResultWithUserInfos, type ServerUpdatesResult, serverUpdatesResultValidator, type ServerUpdatesResultWithUserInfos, serverUpdatesResultWithUserInfosValidator, } from './update-types.js'; import { type UserInfo, userInfoValidator, type CurrentUserInfo, currentUserInfoValidator, type LoggedOutUserInfo, loggedOutUserInfoValidator, } from './user-types.js'; import { tShape, tNumber, tID } from '../utils/validation-utils.js'; // The types of messages that the client sends across the socket export const clientSocketMessageTypes = Object.freeze({ INITIAL: 0, RESPONSES: 1, //ACTIVITY_UPDATES: 2, (DEPRECATED) PING: 3, ACK_UPDATES: 4, API_REQUEST: 5, }); export type ClientSocketMessageType = $Values; export function assertClientSocketMessageType( ourClientSocketMessageType: number, ): ClientSocketMessageType { invariant( ourClientSocketMessageType === 0 || ourClientSocketMessageType === 1 || ourClientSocketMessageType === 3 || ourClientSocketMessageType === 4 || ourClientSocketMessageType === 5, 'number is not ClientSocketMessageType enum', ); return ourClientSocketMessageType; } export type InitialClientSocketMessage = { +type: 0, +id: number, +payload: { +sessionIdentification: SessionIdentification, +sessionState: SessionState, +clientResponses: $ReadOnlyArray, }, }; export type ResponsesClientSocketMessage = { +type: 1, +id: number, +payload: { +clientResponses: $ReadOnlyArray, }, }; export type PingClientSocketMessage = { +type: 3, +id: number, }; export type AckUpdatesClientSocketMessage = { +type: 4, +id: number, +payload: { +currentAsOf: number, }, }; export type APIRequestClientSocketMessage = { +type: 5, +id: number, +payload: APIRequest, }; export type ClientSocketMessage = | InitialClientSocketMessage | ResponsesClientSocketMessage | PingClientSocketMessage | AckUpdatesClientSocketMessage | APIRequestClientSocketMessage; export type ClientInitialClientSocketMessage = { +type: 0, +id: number, +payload: { +sessionIdentification: SessionIdentification, +sessionState: SessionState, +clientResponses: $ReadOnlyArray, }, }; export type ClientResponsesClientSocketMessage = { +type: 1, +id: number, +payload: { +clientResponses: $ReadOnlyArray, }, }; export type ClientClientSocketMessage = | ClientInitialClientSocketMessage | ClientResponsesClientSocketMessage | PingClientSocketMessage | AckUpdatesClientSocketMessage | APIRequestClientSocketMessage; export type ClientSocketMessageWithoutID = $Diff< ClientClientSocketMessage, { id: number }, >; // The types of messages that the server sends across the socket export const serverSocketMessageTypes = Object.freeze({ STATE_SYNC: 0, REQUESTS: 1, ERROR: 2, AUTH_ERROR: 3, ACTIVITY_UPDATE_RESPONSE: 4, PONG: 5, UPDATES: 6, MESSAGES: 7, API_RESPONSE: 8, + COMPRESSED_MESSAGE: 9, }); export type ServerSocketMessageType = $Values; export function assertServerSocketMessageType( ourServerSocketMessageType: number, ): ServerSocketMessageType { invariant( ourServerSocketMessageType === 0 || ourServerSocketMessageType === 1 || ourServerSocketMessageType === 2 || ourServerSocketMessageType === 3 || ourServerSocketMessageType === 4 || ourServerSocketMessageType === 5 || ourServerSocketMessageType === 6 || ourServerSocketMessageType === 7 || - ourServerSocketMessageType === 8, + ourServerSocketMessageType === 8 || + ourServerSocketMessageType === 9, 'number is not ServerSocketMessageType enum', ); return ourServerSocketMessageType; } export const stateSyncPayloadTypes = Object.freeze({ FULL: 0, INCREMENTAL: 1, }); export const fullStateSyncActionType = 'FULL_STATE_SYNC'; export type BaseFullStateSync = { +messagesResult: MessagesResponse, +threadInfos: { +[id: string]: RawThreadInfo }, +rawEntryInfos: $ReadOnlyArray, +userInfos: $ReadOnlyArray, +updatesCurrentAsOf: number, }; const baseFullStateSyncValidator = tShape({ messagesResult: messagesResponseValidator, threadInfos: t.dict(tID, rawThreadInfoValidator), rawEntryInfos: t.list(rawEntryInfoValidator), userInfos: t.list(userInfoValidator), updatesCurrentAsOf: t.Number, }); export type ClientFullStateSync = { ...BaseFullStateSync, +currentUserInfo: CurrentUserInfo, }; export type StateSyncFullActionPayload = { ...ClientFullStateSync, +calendarQuery: CalendarQuery, }; export type ClientStateSyncFullSocketPayload = { ...ClientFullStateSync, +type: 0, // Included iff client is using sessionIdentifierTypes.BODY_SESSION_ID +sessionID?: string, }; export type ServerFullStateSync = { ...BaseFullStateSync, +currentUserInfo: CurrentUserInfo, }; const serverFullStateSyncValidator = tShape({ ...baseFullStateSyncValidator.meta.props, currentUserInfo: currentUserInfoValidator, }); export type ServerStateSyncFullSocketPayload = { ...ServerFullStateSync, +type: 0, // Included iff client is using sessionIdentifierTypes.BODY_SESSION_ID +sessionID?: string, }; const serverStateSyncFullSocketPayloadValidator = tShape({ ...serverFullStateSyncValidator.meta.props, type: tNumber(stateSyncPayloadTypes.FULL), sessionID: t.maybe(t.String), }); export const incrementalStateSyncActionType = 'INCREMENTAL_STATE_SYNC'; export type BaseIncrementalStateSync = { +messagesResult: MessagesResponse, +deltaEntryInfos: $ReadOnlyArray, +deletedEntryIDs: $ReadOnlyArray, +userInfos: $ReadOnlyArray, }; const baseIncrementalStateSyncValidator = tShape({ messagesResult: messagesResponseValidator, deltaEntryInfos: t.list(rawEntryInfoValidator), deletedEntryIDs: t.list(tID), userInfos: t.list(userInfoValidator), }); export type ClientIncrementalStateSync = { ...BaseIncrementalStateSync, +updatesResult: ClientUpdatesResult, }; export type StateSyncIncrementalActionPayload = { ...ClientIncrementalStateSync, +calendarQuery: CalendarQuery, }; type ClientStateSyncIncrementalSocketPayload = { +type: 1, ...ClientIncrementalStateSync, }; export type ServerIncrementalStateSync = { ...BaseIncrementalStateSync, +updatesResult: ServerUpdatesResult, }; const serverIncrementalStateSyncValidator = tShape({ ...baseIncrementalStateSyncValidator.meta.props, updatesResult: serverUpdatesResultValidator, }); type ServerStateSyncIncrementalSocketPayload = { +type: 1, ...ServerIncrementalStateSync, }; const serverStateSyncIncrementalSocketPayloadValidator = tShape({ type: tNumber(stateSyncPayloadTypes.INCREMENTAL), ...serverIncrementalStateSyncValidator.meta.props, }); export type ClientStateSyncSocketPayload = | ClientStateSyncFullSocketPayload | ClientStateSyncIncrementalSocketPayload; export type ServerStateSyncSocketPayload = | ServerStateSyncFullSocketPayload | ServerStateSyncIncrementalSocketPayload; const serverStateSyncSocketPayloadValidator = t.union([ serverStateSyncFullSocketPayloadValidator, serverStateSyncIncrementalSocketPayloadValidator, ]); export type ServerStateSyncServerSocketMessage = { +type: 0, +responseTo: number, +payload: ServerStateSyncSocketPayload, }; export const serverStateSyncServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.STATE_SYNC), responseTo: t.Number, payload: serverStateSyncSocketPayloadValidator, }); export type ServerRequestsServerSocketMessage = { +type: 1, +responseTo?: number, +payload: { +serverRequests: $ReadOnlyArray, }, }; export const serverRequestsServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.REQUESTS), responseTo: t.maybe(t.Number), payload: tShape({ serverRequests: t.list(serverServerRequestValidator), }), }); export type ErrorServerSocketMessage = { type: 2, responseTo?: number, message: string, payload?: Object, }; export const errorServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.ERROR), responseTo: t.maybe(t.Number), message: t.String, payload: t.maybe(t.Object), }); export type AuthErrorServerSocketMessage = { type: 3, responseTo: number, message: string, // If unspecified, it is because the client is using cookieSources.HEADER, // which means the server can't update the cookie from a socket message. sessionChange?: { cookie: string, currentUserInfo: LoggedOutUserInfo, }, }; export const authErrorServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.AUTH_ERROR), responseTo: t.Number, message: t.String, sessionChange: t.maybe( tShape({ cookie: t.String, currentUserInfo: loggedOutUserInfoValidator }), ), }); export type ActivityUpdateResponseServerSocketMessage = { +type: 4, +responseTo: number, +payload: UpdateActivityResult, }; export const activityUpdateResponseServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE), responseTo: t.Number, payload: updateActivityResultValidator, }); export type PongServerSocketMessage = { +type: 5, +responseTo: number, }; export const pongServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.PONG), responseTo: t.Number, }); export type ServerUpdatesServerSocketMessage = { +type: 6, +payload: ServerUpdatesResultWithUserInfos, }; export const serverUpdatesServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.UPDATES), payload: serverUpdatesResultWithUserInfosValidator, }); export type MessagesServerSocketMessage = { +type: 7, +payload: NewMessagesPayload, }; export const messagesServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.MESSAGES), payload: newMessagesPayloadValidator, }); export type APIResponseServerSocketMessage = { +type: 8, +responseTo: number, +payload?: Object, }; export const apiResponseServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.API_RESPONSE), responseTo: t.Number, payload: t.maybe(t.Object), }); +export type CompressedMessageServerSocketMessage = { + +type: 9, + +payload: CompressedData, +}; +export const compressedMessageServerSocketMessageValidator: TInterface = + tShape({ + type: tNumber(serverSocketMessageTypes.COMPRESSED_MESSAGE), + payload: compressedDataValidator, + }); + export type ServerServerSocketMessage = | ServerStateSyncServerSocketMessage | ServerRequestsServerSocketMessage | ErrorServerSocketMessage | AuthErrorServerSocketMessage | ActivityUpdateResponseServerSocketMessage | PongServerSocketMessage | ServerUpdatesServerSocketMessage | MessagesServerSocketMessage - | APIResponseServerSocketMessage; + | APIResponseServerSocketMessage + | CompressedMessageServerSocketMessage; export const serverServerSocketMessageValidator: TUnion = t.union([ serverStateSyncServerSocketMessageValidator, serverRequestsServerSocketMessageValidator, errorServerSocketMessageValidator, authErrorServerSocketMessageValidator, activityUpdateResponseServerSocketMessageValidator, pongServerSocketMessageValidator, serverUpdatesServerSocketMessageValidator, messagesServerSocketMessageValidator, apiResponseServerSocketMessageValidator, + compressedMessageServerSocketMessageValidator, ]); export type ClientRequestsServerSocketMessage = { +type: 1, +responseTo?: number, +payload: { +serverRequests: $ReadOnlyArray, }, }; export type ClientStateSyncServerSocketMessage = { +type: 0, +responseTo: number, +payload: ClientStateSyncSocketPayload, }; export type ClientUpdatesServerSocketMessage = { +type: 6, +payload: ClientUpdatesResultWithUserInfos, }; export type ClientServerSocketMessage = | ClientStateSyncServerSocketMessage | ClientRequestsServerSocketMessage | ErrorServerSocketMessage | AuthErrorServerSocketMessage | ActivityUpdateResponseServerSocketMessage | PongServerSocketMessage | ClientUpdatesServerSocketMessage | MessagesServerSocketMessage - | APIResponseServerSocketMessage; + | APIResponseServerSocketMessage + | CompressedMessageServerSocketMessage; export type SocketListener = (message: ClientServerSocketMessage) => void; export type ConnectionStatus = | 'connecting' | 'connected' | 'reconnecting' | 'disconnecting' | 'forcedDisconnecting' | 'disconnected'; export type ConnectionInfo = { +status: ConnectionStatus, +queuedActivityUpdates: $ReadOnlyArray, +lateResponses: $ReadOnlyArray, +showDisconnectedBar: boolean, }; export const connectionInfoValidator: TInterface = tShape({ status: t.enums.of([ 'connecting', 'connected', 'reconnecting', 'disconnecting', 'forcedDisconnecting', 'disconnected', ]), queuedActivityUpdates: t.list(activityUpdateValidator), lateResponses: t.list(t.Number), showDisconnectedBar: t.Boolean, }); export const defaultConnectionInfo: ConnectionInfo = { status: 'connecting', queuedActivityUpdates: [], lateResponses: [], showDisconnectedBar: false, }; export const updateConnectionStatusActionType = 'UPDATE_CONNECTION_STATUS'; export type UpdateConnectionStatusPayload = { +status: ConnectionStatus, }; export const setLateResponseActionType = 'SET_LATE_RESPONSE'; export type SetLateResponsePayload = { +messageID: number, +isLate: boolean, }; export const updateDisconnectedBarActionType = 'UPDATE_DISCONNECTED_BAR'; export type UpdateDisconnectedBarPayload = { +visible: boolean }; export type OneTimeKeyGenerator = (inc: number) => string; export type GRPCStream = { readyState: number, onopen: (ev: any) => mixed, onmessage: (ev: MessageEvent) => mixed, onclose: (ev: CloseEvent) => mixed, close(code?: number, reason?: string): void, send(data: string | Blob | ArrayBuffer | $ArrayBufferView): void, }; export type CommTransportLayer = GRPCStream | WebSocket; diff --git a/native/socket.react.js b/native/socket.react.js index 927725bb8..514606789 100644 --- a/native/socket.react.js +++ b/native/socket.react.js @@ -1,172 +1,174 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import { useDispatch } from 'react-redux'; import { logOut, logOutActionTypes } from 'lib/actions/user-actions.js'; import { preRequestUserStateSelector } from 'lib/selectors/account-selectors.js'; import { cookieSelector, urlPrefixSelector, connectionSelector, lastCommunicatedPlatformDetailsSelector, } from 'lib/selectors/keyserver-selectors.js'; import { isLoggedIn } from 'lib/selectors/user-selectors.js'; import { accountHasPassword } from 'lib/shared/account-utils.js'; import Socket, { type BaseSocketProps } from 'lib/socket/socket.react.js'; import { logInActionSources } from 'lib/types/account-types.js'; import { useServerCall, useDispatchActionPromise, fetchNewCookieFromNativeCredentials, } from 'lib/utils/action-utils.js'; import { InputStateContext } from './input/input-state.js'; import { activeMessageListSelector, nativeCalendarQuery, } from './navigation/nav-selectors.js'; import { NavContext } from './navigation/navigation-context.js'; import { useSelector } from './redux/redux-utils.js'; import { noDataAfterPolicyAcknowledgmentSelector } from './selectors/account-selectors.js'; import { openSocketSelector, sessionIdentificationSelector, nativeGetClientResponsesSelector, nativeSessionStateFuncSelector, } from './selectors/socket-selectors.js'; import Alert from './utils/alert.js'; import { useInitialNotificationsEncryptedMessage } from './utils/crypto-utils.js'; +import { decompressMessage } from './utils/decompress.js'; const NativeSocket: React.ComponentType = React.memo(function NativeSocket(props: BaseSocketProps) { const inputState = React.useContext(InputStateContext); const navContext = React.useContext(NavContext); const cookie = useSelector(cookieSelector); const urlPrefix = useSelector(urlPrefixSelector); invariant(urlPrefix, 'missing urlPrefix for given keyserver id'); const connection = useSelector(connectionSelector); invariant(connection, 'keyserver missing from keyserverStore'); const frozen = useSelector(state => state.frozen); const active = useSelector( state => isLoggedIn(state) && state.lifecycleState !== 'background', ); const noDataAfterPolicyAcknowledgment = useSelector( noDataAfterPolicyAcknowledgmentSelector, ); const currentUserInfo = useSelector(state => state.currentUserInfo); const openSocket = useSelector(openSocketSelector); invariant(openSocket, 'openSocket failed to be created'); const sessionIdentification = useSelector(sessionIdentificationSelector); const preRequestUserState = useSelector(preRequestUserStateSelector); const getInitialNotificationsEncryptedMessage = useInitialNotificationsEncryptedMessage(); const getClientResponses = useSelector(state => nativeGetClientResponsesSelector({ redux: state, navContext, getInitialNotificationsEncryptedMessage, }), ); const sessionStateFunc = useSelector(state => nativeSessionStateFuncSelector({ redux: state, navContext, }), ); const currentCalendarQuery = useSelector(state => nativeCalendarQuery({ redux: state, navContext, }), ); const canSendReports = useSelector( state => !state.frozen && state.connectivity.hasWiFi && (!inputState || !inputState.uploadInProgress()), ); const activeThread = React.useMemo(() => { if (!active) { return null; } return activeMessageListSelector(navContext); }, [active, navContext]); const lastCommunicatedPlatformDetails = useSelector( lastCommunicatedPlatformDetailsSelector, ); const dispatch = useDispatch(); const dispatchActionPromise = useDispatchActionPromise(); const callLogOut = useServerCall(logOut); const socketCrashLoopRecovery = React.useCallback(async () => { if (!accountHasPassword(currentUserInfo)) { dispatchActionPromise( logOutActionTypes, callLogOut(preRequestUserState), ); Alert.alert( 'Log in needed', 'After acknowledging the policies, we need you to log in to your account again', [{ text: 'OK' }], ); return; } await fetchNewCookieFromNativeCredentials( dispatch, cookie, urlPrefix, logInActionSources.refetchUserDataAfterAcknowledgment, getInitialNotificationsEncryptedMessage, ); }, [ callLogOut, cookie, currentUserInfo, dispatch, dispatchActionPromise, preRequestUserState, urlPrefix, getInitialNotificationsEncryptedMessage, ]); return ( ); }); export default NativeSocket; diff --git a/native/utils/decompress.js b/native/utils/decompress.js new file mode 100644 index 000000000..8795badb9 --- /dev/null +++ b/native/utils/decompress.js @@ -0,0 +1,17 @@ +// @flow + +import decompress from 'brotli/decompress.js'; +import { Buffer } from 'buffer'; +import invariant from 'invariant'; + +import type { CompressedData } from 'lib/types/compression-types.js'; + +function decompressMessage(message: CompressedData): string { + invariant(message.algo === 'brotli+base64', 'only supports brotli+base64'); + const inBuffer = Buffer.from(message.data, 'base64'); + const decompressed = decompress(inBuffer); + const outBuffer = Buffer.from(decompressed); + return outBuffer.toString('utf-8'); +} + +export { decompressMessage }; diff --git a/web/.eslintrc.json b/web/.eslintrc.json index 021e60c2e..67c307940 100644 --- a/web/.eslintrc.json +++ b/web/.eslintrc.json @@ -1,9 +1,10 @@ { "env": { "browser": true, "jest": true }, "globals": { - "process": true + "process": true, + "Buffer": true } } diff --git a/web/socket.react.js b/web/socket.react.js index 2f7edbd9d..606e53969 100644 --- a/web/socket.react.js +++ b/web/socket.react.js @@ -1,96 +1,98 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import { useDispatch } from 'react-redux'; import { logOut } from 'lib/actions/user-actions.js'; import { preRequestUserStateSelector } from 'lib/selectors/account-selectors.js'; import { cookieSelector, urlPrefixSelector, connectionSelector, lastCommunicatedPlatformDetailsSelector, } from 'lib/selectors/keyserver-selectors.js'; import Socket, { type BaseSocketProps } from 'lib/socket/socket.react.js'; import { useServerCall, useDispatchActionPromise, } from 'lib/utils/action-utils.js'; import { useSelector } from './redux/redux-utils.js'; import { activeThreadSelector, webCalendarQuery, } from './selectors/nav-selectors.js'; import { openSocketSelector, sessionIdentificationSelector, webGetClientResponsesSelector, webSessionStateFuncSelector, } from './selectors/socket-selectors.js'; +import { decompressMessage } from './utils/decompress.js'; const WebSocket: React.ComponentType = React.memo(function WebSocket(props) { const cookie = useSelector(cookieSelector); const urlPrefix = useSelector(urlPrefixSelector); invariant(urlPrefix, 'missing urlPrefix for given keyserver id'); const connection = useSelector(connectionSelector); invariant(connection, 'keyserver missing from keyserverStore'); const active = useSelector( state => !!state.currentUserInfo && !state.currentUserInfo.anonymous && state.lifecycleState !== 'background', ); const openSocket = useSelector(openSocketSelector); invariant(openSocket, 'openSocket failed to be created'); const sessionIdentification = useSelector(sessionIdentificationSelector); const preRequestUserState = useSelector(preRequestUserStateSelector); const getClientResponses = useSelector(webGetClientResponsesSelector); const sessionStateFunc = useSelector(webSessionStateFuncSelector); const currentCalendarQuery = useSelector(webCalendarQuery); const reduxActiveThread = useSelector(activeThreadSelector); const windowActive = useSelector(state => state.windowActive); const activeThread = React.useMemo(() => { if (!active || !windowActive) { return null; } return reduxActiveThread; }, [active, windowActive, reduxActiveThread]); const dispatch = useDispatch(); const dispatchActionPromise = useDispatchActionPromise(); const callLogOut = useServerCall(logOut); const lastCommunicatedPlatformDetails = useSelector( lastCommunicatedPlatformDetailsSelector, ); return ( ); }); export default WebSocket; diff --git a/web/utils/decompress.js b/web/utils/decompress.js new file mode 100644 index 000000000..6d2da2c5b --- /dev/null +++ b/web/utils/decompress.js @@ -0,0 +1,16 @@ +// @flow + +import decompress from 'brotli/decompress.js'; +import invariant from 'invariant'; + +import type { CompressedData } from 'lib/types/compression-types.js'; + +function decompressMessage(message: CompressedData): string { + invariant(message.algo === 'brotli+base64', 'only supports brotli+base64'); + const inBuffer = Buffer.from(message.data, 'base64'); + const decompressed = decompress(inBuffer); + const outBuffer = Buffer.from(decompressed); + return outBuffer.toString('utf-8'); +} + +export { decompressMessage };