diff --git a/keyserver/src/endpoints.js b/keyserver/src/endpoints.js index fdbb55c55..6c7111b8f 100644 --- a/keyserver/src/endpoints.js +++ b/keyserver/src/endpoints.js @@ -1,103 +1,229 @@ // @flow +import { policyTypes } from 'lib/facts/policies.js'; import type { Endpoint } from 'lib/types/endpoints'; import { updateActivityResponder, threadSetUnreadStatusResponder, } from './responders/activity-responders'; import { deviceTokenUpdateResponder } from './responders/device-responders'; import { entryFetchResponder, entryRevisionFetchResponder, entryCreationResponder, entryUpdateResponder, entryDeletionResponder, entryRestorationResponder, calendarQueryUpdateResponder, } from './responders/entry-responders'; import type { JSONResponder } from './responders/handlers'; import { getSessionPublicKeysResponder } from './responders/keys-responders'; import { messageReportCreationResponder } from './responders/message-report-responder'; import { textMessageCreationResponder, messageFetchResponder, multimediaMessageCreationResponder, } from './responders/message-responders'; import { updateRelationshipsResponder } from './responders/relationship-responders'; import { reportCreationResponder, reportMultiCreationResponder, errorReportFetchInfosResponder, } from './responders/report-responders'; import { userSearchResponder } from './responders/search-responders'; import { threadDeletionResponder, roleUpdateResponder, memberRemovalResponder, threadLeaveResponder, threadUpdateResponder, threadCreationResponder, threadJoinResponder, } from './responders/thread-responders'; import { userSubscriptionUpdateResponder, passwordUpdateResponder, sendVerificationEmailResponder, sendPasswordResetEmailResponder, logOutResponder, accountDeletionResponder, accountCreationResponder, logInResponder, oldPasswordUpdateResponder, updateUserSettingsResponder, policyAcknowledgmentResponder, } from './responders/user-responders'; import { codeVerificationResponder } from './responders/verification-responders'; import { uploadDeletionResponder } from './uploads/uploads'; +const baseLegalPolicies = [policyTypes.tosAndPrivacyPolicy]; + const jsonEndpoints: { [id: Endpoint]: JSONResponder } = { - create_account: accountCreationResponder, - create_entry: entryCreationResponder, - create_error_report: reportCreationResponder, - create_message_report: messageReportCreationResponder, - create_multimedia_message: multimediaMessageCreationResponder, - create_report: reportCreationResponder, - create_reports: reportMultiCreationResponder, - create_text_message: textMessageCreationResponder, - create_thread: threadCreationResponder, - delete_account: accountDeletionResponder, - delete_entry: entryDeletionResponder, - delete_thread: threadDeletionResponder, - delete_upload: uploadDeletionResponder, - fetch_entries: entryFetchResponder, - fetch_entry_revisions: entryRevisionFetchResponder, - fetch_error_report_infos: errorReportFetchInfosResponder, - fetch_messages: messageFetchResponder, - get_session_public_keys: getSessionPublicKeysResponder, - join_thread: threadJoinResponder, - leave_thread: threadLeaveResponder, - log_in: logInResponder, - log_out: logOutResponder, - policy_acknowledgment: policyAcknowledgmentResponder, - remove_members: memberRemovalResponder, - restore_entry: entryRestorationResponder, - search_users: userSearchResponder, - send_password_reset_email: sendPasswordResetEmailResponder, - send_verification_email: sendVerificationEmailResponder, - set_thread_unread_status: threadSetUnreadStatusResponder, - update_account: passwordUpdateResponder, - update_activity: updateActivityResponder, - update_calendar_query: calendarQueryUpdateResponder, - update_user_settings: updateUserSettingsResponder, - update_device_token: deviceTokenUpdateResponder, - update_entry: entryUpdateResponder, - update_password: oldPasswordUpdateResponder, - update_relationships: updateRelationshipsResponder, - update_role: roleUpdateResponder, - update_thread: threadUpdateResponder, - update_user_subscription: userSubscriptionUpdateResponder, - verify_code: codeVerificationResponder, + create_account: { + responder: accountCreationResponder, + requiredPolicies: [], + }, + create_entry: { + responder: entryCreationResponder, + requiredPolicies: baseLegalPolicies, + }, + create_error_report: { + responder: reportCreationResponder, + requiredPolicies: [], + }, + create_message_report: { + responder: messageReportCreationResponder, + requiredPolicies: baseLegalPolicies, + }, + create_multimedia_message: { + responder: multimediaMessageCreationResponder, + requiredPolicies: baseLegalPolicies, + }, + create_report: { + responder: reportCreationResponder, + requiredPolicies: [], + }, + create_reports: { + responder: reportMultiCreationResponder, + requiredPolicies: [], + }, + create_text_message: { + responder: textMessageCreationResponder, + requiredPolicies: baseLegalPolicies, + }, + create_thread: { + responder: threadCreationResponder, + requiredPolicies: baseLegalPolicies, + }, + delete_account: { + responder: accountDeletionResponder, + requiredPolicies: [], + }, + delete_entry: { + responder: entryDeletionResponder, + requiredPolicies: baseLegalPolicies, + }, + delete_thread: { + responder: threadDeletionResponder, + requiredPolicies: baseLegalPolicies, + }, + delete_upload: { + responder: uploadDeletionResponder, + requiredPolicies: baseLegalPolicies, + }, + fetch_entries: { + responder: entryFetchResponder, + requiredPolicies: baseLegalPolicies, + }, + fetch_entry_revisions: { + responder: entryRevisionFetchResponder, + requiredPolicies: baseLegalPolicies, + }, + fetch_error_report_infos: { + responder: errorReportFetchInfosResponder, + requiredPolicies: baseLegalPolicies, + }, + fetch_messages: { + responder: messageFetchResponder, + requiredPolicies: baseLegalPolicies, + }, + get_session_public_keys: { + responder: getSessionPublicKeysResponder, + requiredPolicies: baseLegalPolicies, + }, + join_thread: { + responder: threadJoinResponder, + requiredPolicies: baseLegalPolicies, + }, + leave_thread: { + responder: threadLeaveResponder, + requiredPolicies: baseLegalPolicies, + }, + log_in: { + responder: logInResponder, + requiredPolicies: [], + }, + log_out: { + responder: logOutResponder, + requiredPolicies: [], + }, + policy_acknowledgment: { + responder: policyAcknowledgmentResponder, + requiredPolicies: [], + }, + remove_members: { + responder: memberRemovalResponder, + requiredPolicies: baseLegalPolicies, + }, + restore_entry: { + responder: entryRestorationResponder, + requiredPolicies: baseLegalPolicies, + }, + search_users: { + responder: userSearchResponder, + requiredPolicies: baseLegalPolicies, + }, + send_password_reset_email: { + responder: sendPasswordResetEmailResponder, + requiredPolicies: [], + }, + send_verification_email: { + responder: sendVerificationEmailResponder, + requiredPolicies: [], + }, + set_thread_unread_status: { + responder: threadSetUnreadStatusResponder, + requiredPolicies: baseLegalPolicies, + }, + update_account: { + responder: passwordUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_activity: { + responder: updateActivityResponder, + requiredPolicies: baseLegalPolicies, + }, + update_calendar_query: { + responder: calendarQueryUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_user_settings: { + responder: updateUserSettingsResponder, + requiredPolicies: baseLegalPolicies, + }, + update_device_token: { + responder: deviceTokenUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_entry: { + responder: entryUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_password: { + responder: oldPasswordUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_relationships: { + responder: updateRelationshipsResponder, + requiredPolicies: baseLegalPolicies, + }, + update_role: { + responder: roleUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_thread: { + responder: threadUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + update_user_subscription: { + responder: userSubscriptionUpdateResponder, + requiredPolicies: baseLegalPolicies, + }, + verify_code: { + responder: codeVerificationResponder, + requiredPolicies: baseLegalPolicies, + }, }; export { jsonEndpoints }; diff --git a/keyserver/src/responders/handlers.js b/keyserver/src/responders/handlers.js index 22be464d5..49e205b09 100644 --- a/keyserver/src/responders/handlers.js +++ b/keyserver/src/responders/handlers.js @@ -1,235 +1,240 @@ // @flow import type { $Response, $Request } from 'express'; import { ServerError } from 'lib/utils/errors'; import { deleteCookie } from '../deleters/cookie-deleters'; +import type { PolicyType } from '../lib/facts/policies.js'; import { fetchViewerForJSONRequest, addCookieToJSONResponse, fetchViewerForHomeRequest, addCookieToHomeResponse, createNewAnonymousCookie, } from '../session/cookies'; import type { Viewer } from '../session/viewer'; import { type AppURLFacts, getAppURLFactsFromRequestURL } from '../utils/urls'; import { getMessageForException } from './utils'; -export type JSONResponder = (viewer: Viewer, input: any) => Promise<*>; +export type JSONResponder = { + responder: (viewer: Viewer, input: any) => Promise<*>, + requiredPolicies: $ReadOnlyArray, +}; + export type DownloadResponder = ( viewer: Viewer, req: $Request, res: $Response, ) => Promise; export type HTMLResponder = DownloadResponder; export type HTTPGetResponder = DownloadResponder; function jsonHandler( responder: JSONResponder, expectCookieInvalidation: boolean, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { if (!req.body || typeof req.body !== 'object') { throw new ServerError('invalid_parameters'); } const { input } = req.body; viewer = await fetchViewerForJSONRequest(req); - const responderResult = await responder(viewer, input); + const responderResult = await responder.responder(viewer, input); if (res.headersSent) { return; } const result = { ...responderResult }; addCookieToJSONResponse( viewer, res, result, expectCookieInvalidation, getAppURLFactsFromRequestURL(req.originalUrl), ); res.json({ success: true, ...result }); } catch (e) { await handleException( e, res, getAppURLFactsFromRequestURL(req.originalUrl), viewer, expectCookieInvalidation, ); } }; } function httpGetHandler( responder: HTTPGetResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { viewer = await fetchViewerForJSONRequest(req); await responder(viewer, req, res); } catch (e) { await handleException( e, res, getAppURLFactsFromRequestURL(req.originalUrl), viewer, ); } }; } function downloadHandler( responder: DownloadResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { try { const viewer = await fetchViewerForJSONRequest(req); await responder(viewer, req, res); } catch (e) { // Passing viewer in only makes sense if we want to handle failures as // JSON. We don't, and presume all download handlers avoid ServerError. await handleException( e, res, getAppURLFactsFromRequestURL(req.originalUrl), ); } }; } async function handleException( error: Error, res: $Response, appURLFacts: AppURLFacts, viewer?: ?Viewer, expectCookieInvalidation?: boolean, ) { console.warn(error); if (res.headersSent) { return; } if (!(error instanceof ServerError)) { res.status(500).send(getMessageForException(error)); return; } const result: Object = error.payload ? { error: error.message, payload: error.payload } : { error: error.message }; if (viewer) { if (error.message === 'client_version_unsupported' && viewer.loggedIn) { // If the client version is unsupported, log the user out const { platformDetails } = error; const [data] = await Promise.all([ createNewAnonymousCookie({ platformDetails, deviceToken: viewer.deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(data); viewer.cookieInvalidated = true; } // This can mutate the result object addCookieToJSONResponse( viewer, res, result, !!expectCookieInvalidation, appURLFacts, ); } res.json(result); } function htmlHandler( responder: HTMLResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { try { const viewer = await fetchViewerForHomeRequest(req); addCookieToHomeResponse( viewer, res, getAppURLFactsFromRequestURL(req.originalUrl), ); res.type('html'); await responder(viewer, req, res); } catch (e) { console.warn(e); if (!res.headersSent) { res.status(500).send(getMessageForException(e)); } } }; } type MulterFile = { fieldname: string, originalname: string, encoding: string, mimetype: string, buffer: Buffer, size: number, }; export type MulterRequest = $Request & { files?: $ReadOnlyArray, ... }; type UploadResponder = (viewer: Viewer, req: MulterRequest) => Promise; function uploadHandler( responder: UploadResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { if (!req.body || typeof req.body !== 'object') { throw new ServerError('invalid_parameters'); } viewer = await fetchViewerForJSONRequest(req); const responderResult = await responder( viewer, ((req: any): MulterRequest), ); if (res.headersSent) { return; } const result = { ...responderResult }; addCookieToJSONResponse( viewer, res, result, false, getAppURLFactsFromRequestURL(req.originalUrl), ); res.json({ success: true, ...result }); } catch (e) { await handleException( e, res, getAppURLFactsFromRequestURL(req.originalUrl), viewer, ); } }; } async function handleAsyncPromise(promise: Promise) { try { await promise; } catch (error) { console.warn(error); } } export { jsonHandler, httpGetHandler, downloadHandler, htmlHandler, uploadHandler, handleAsyncPromise, }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index c58b468b6..299c4f7bc 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,814 +1,814 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce'; import t from 'tcomb'; import WebSocket from 'ws'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils'; import type { Shape } from 'lib/types/core'; import { endpointIsSocketSafe } from 'lib/types/endpoints'; import { defaultNumberPerThread } from 'lib/types/message-types'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types'; import { cookieSources, sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, } from 'lib/types/socket-types'; import { ServerError } from 'lib/utils/errors'; import { values } from 'lib/utils/objects'; import { promiseAll } from 'lib/utils/promises'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver'; import sleep from 'lib/utils/sleep'; import { tShape, tCookie } from 'lib/utils/validation-utils'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters'; import { deleteCookie } from '../deleters/cookie-deleters'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters'; import { jsonEndpoints } from '../endpoints'; import { fetchEntryInfos } from '../fetchers/entry-fetchers'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers'; import { fetchThreadInfos } from '../fetchers/thread-fetchers'; import { fetchUpdateInfos } from '../fetchers/update-fetchers'; import { fetchCurrentUserInfo, fetchKnownUserInfos, } from '../fetchers/user-fetchers'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders'; import { handleAsyncPromise } from '../responders/handlers'; import { fetchViewerForSocket, extendCookieLifespan, createNewAnonymousCookie, } from '../session/cookies'; import { Viewer } from '../session/viewer'; import { commitSessionUpdate } from '../updaters/session-updaters'; import { assertSecureRequest } from '../utils/security-utils'; import { checkInputValidator, checkClientSupported, } from '../utils/validation-utils'; import { RedisSubscriber } from './redis'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils'; const clientSocketMessageInputValidator = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.Object, }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ) => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const message = JSON.parse(messageString); checkInputValidator(clientSocketMessageInputValidator, message); clientSocketMessage = message; if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); if (!this.viewer) { // This indicates that the cookie was invalid, but the client is using // cookieSources.HEADER and thus can't accept a new cookie over // WebSockets. See comment under catch block for socket_deauthorized. throw new ServerError('socket_deauthorized'); } } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } for (const response of serverResponses) { this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (this.viewer) { // viewer should only be falsey for cookieSources.HEADER (web) // clients. Usually if the cookie is invalid we construct a new // anonymous Viewer with a new cookie, and then pass the cookie down // in the error. But we can't pass HTTP cookies in WebSocket messages. authErrorMessage.sessionChange = { cookie: this.viewer.cookiePairString, currentUserInfo: { id: this.viewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const promises = {}; promises.deleteCookie = deleteCookie(viewer.cookieID); if (viewer.cookieSource !== cookieSources.BODY) { promises.anonymousViewerData = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); } const { anonymousViewerData } = await promiseAll(promises); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (anonymousViewerData) { // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); authErrorMessage.sessionChange = { cookie: anonViewer.cookiePairString, currentUserInfo: { id: anonViewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState === 1) { this.ws.send(JSON.stringify(message)); } }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [ fetchMessagesResult, { serverRequests, activityUpdateResult }, ] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; if (!sessionInitializationResult.sessionContinued) { const [ threadsResult, entriesResult, currentUserInfo, knownUserInfos, ] = await Promise.all([ fetchThreadInfos(viewer), fetchEntryInfos(viewer, [calendarQuery]), fetchCurrentUserInfo(viewer), fetchKnownUserInfos(viewer), ]); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: threadsResult.threadInfos, currentUserInfo, rawEntryInfos: entriesResult.rawEntryInfos, userInfos: values(knownUserInfos), updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult, } = sessionInitializationResult; const promises = {}; promises.deleteExpiredUpdates = deleteUpdatesBeforeTimeTargetingSession( viewer, oldUpdatesCurrentAsOf, ); promises.fetchUpdateResult = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); promises.sessionUpdate = commitSessionUpdate(viewer, sessionUpdate); const { fetchUpdateResult } = await promiseAll(promises); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, viewer.calendarQuery, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; - const response = await responder(viewer, message.payload.input); + const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos, } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart() { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState( viewer, { status: 'state_check' }, viewer.calendarQuery, ); invariant(checkStateRequest, 'should be set'); this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection };