diff --git a/keyserver/src/endpoints.js b/keyserver/src/endpoints.js index e03b6be55..cb18518a4 100644 --- a/keyserver/src/endpoints.js +++ b/keyserver/src/endpoints.js @@ -1,532 +1,533 @@ // @flow import t from 'tcomb'; import type { TType } from 'tcomb'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import type { PolicyType } from 'lib/facts/policies.js'; import type { Endpoint } from 'lib/types/endpoints.js'; import { calendarQueryValidator } from 'lib/types/entry-types.js'; import { sessionStateValidator } from 'lib/types/session-types.js'; import { endpointValidators } from 'lib/types/validators/endpoint-validators.js'; import { updateUserAvatarRequestValidator } from 'lib/utils/avatar-utils.js'; import { updateActivityResponder, threadSetUnreadStatusResponder, setThreadUnreadStatusValidator, updateActivityResponderInputValidator, } from './responders/activity-responders.js'; import { fetchCommunityInfosResponder } from './responders/community-responders.js'; import { deviceTokenUpdateResponder, deviceTokenUpdateRequestInputValidator, } from './responders/device-responders.js'; import { entryFetchResponder, entryRevisionFetchResponder, entryCreationResponder, entryUpdateResponder, entryDeletionResponder, entryRestorationResponder, calendarQueryUpdateResponder, createEntryRequestInputValidator, deleteEntryRequestInputValidator, entryQueryInputValidator, entryRevisionHistoryFetchInputValidator, restoreEntryRequestInputValidator, saveEntryRequestInputValidator, } from './responders/entry-responders.js'; import { createOrUpdateFarcasterChannelTagResponder, deleteFarcasterChannelTagResponder, createOrUpdateFarcasterChannelTagInputValidator, deleteFarcasterChannelTagInputValidator, } from './responders/farcaster-channel-tag-responders.js'; import type { JSONResponder } from './responders/handlers.js'; import { createJSONResponder } from './responders/handlers.js'; import { getOlmSessionInitializationDataResponder } from './responders/keys-responders.js'; import { createOrUpdatePublicLinkResponder, disableInviteLinkResponder, fetchPrimaryInviteLinksResponder, inviteLinkVerificationResponder, createOrUpdatePublicLinkInputValidator, disableInviteLinkInputValidator, inviteLinkVerificationRequestInputValidator, } from './responders/link-responders.js'; import { messageReportCreationResponder, messageReportCreationRequestInputValidator, } from './responders/message-report-responder.js'; import { textMessageCreationResponder, messageFetchResponder, multimediaMessageCreationResponder, reactionMessageCreationResponder, editMessageCreationResponder, fetchPinnedMessagesResponder, searchMessagesResponder, sendMultimediaMessageRequestInputValidator, sendReactionMessageRequestInputValidator, editMessageRequestInputValidator, sendTextMessageRequestInputValidator, fetchMessageInfosRequestInputValidator, fetchPinnedMessagesResponderInputValidator, searchMessagesResponderInputValidator, } from './responders/message-responders.js'; import { getInitialReduxStateResponder, initialReduxStateRequestValidator, } from './responders/redux-state-responders.js'; import { updateRelationshipsResponder, updateRelationshipInputValidator, } from './responders/relationship-responders.js'; import { reportCreationResponder, reportMultiCreationResponder, errorReportFetchInfosResponder, reportCreationRequestInputValidator, fetchErrorReportInfosRequestInputValidator, reportMultiCreationRequestInputValidator, } from './responders/report-responders.js'; import { userSearchResponder, exactUserSearchResponder, exactUserSearchRequestInputValidator, userSearchRequestInputValidator, } from './responders/search-responders.js'; import { siweNonceResponder } from './responders/siwe-nonce-responders.js'; import { threadDeletionResponder, roleUpdateResponder, memberRemovalResponder, threadLeaveResponder, threadUpdateResponder, threadCreationResponder, threadFetchMediaResponder, threadJoinResponder, toggleMessagePinResponder, roleModificationResponder, roleDeletionResponder, newThreadRequestInputValidator, threadDeletionRequestInputValidator, joinThreadRequestInputValidator, leaveThreadRequestInputValidator, threadFetchMediaRequestInputValidator, removeMembersRequestInputValidator, roleChangeRequestInputValidator, toggleMessagePinRequestInputValidator, updateThreadRequestInputValidator, roleDeletionRequestInputValidator, roleModificationRequestInputValidator, } from './responders/thread-responders.js'; import { fetchPendingUpdatesResponder } from './responders/update-responders.js'; import { keyserverAuthRequestInputValidator, keyserverAuthResponder, userSubscriptionUpdateResponder, passwordUpdateResponder, sendVerificationEmailResponder, sendPasswordResetEmailResponder, logOutResponder, accountDeletionResponder, accountCreationResponder, logInResponder, siweAuthResponder, oldPasswordUpdateResponder, updateUserSettingsResponder, policyAcknowledgmentResponder, updateUserAvatarResponder, registerRequestInputValidator, logInRequestInputValidator, policyAcknowledgmentRequestInputValidator, accountUpdateInputValidator, resetPasswordRequestInputValidator, siweAuthRequestInputValidator, subscriptionUpdateRequestInputValidator, updatePasswordRequestInputValidator, updateUserSettingsInputValidator, claimUsernameResponder, claimUsernameRequestInputValidator, } from './responders/user-responders.js'; import { codeVerificationResponder, codeVerificationRequestInputValidator, } from './responders/verification-responders.js'; import { versionResponder } from './responders/version-responders.js'; import type { Viewer } from './session/viewer.js'; import { uploadMediaMetadataResponder, uploadDeletionResponder, UploadDeletionRequestInputValidator, uploadMediaMetadataInputValidator, } from './uploads/uploads.js'; const ignoredArgumentValidator = t.irreducible( 'Ignored argument', () => true, ); type EndpointData = { responder: (viewer: Viewer, input: any) => Promise<*>, inputValidator: TType<*>, policies: $ReadOnlyArray, }; const jsonEndpointsData: { +[id: Endpoint]: EndpointData } = { create_account: { responder: accountCreationResponder, inputValidator: registerRequestInputValidator, policies: [], }, create_entry: { responder: entryCreationResponder, inputValidator: createEntryRequestInputValidator, policies: baseLegalPolicies, }, create_error_report: { responder: reportCreationResponder, inputValidator: reportCreationRequestInputValidator, policies: [], }, create_message_report: { responder: messageReportCreationResponder, inputValidator: messageReportCreationRequestInputValidator, policies: baseLegalPolicies, }, create_multimedia_message: { responder: multimediaMessageCreationResponder, inputValidator: sendMultimediaMessageRequestInputValidator, policies: baseLegalPolicies, }, create_or_update_public_link: { responder: createOrUpdatePublicLinkResponder, inputValidator: createOrUpdatePublicLinkInputValidator, policies: baseLegalPolicies, }, create_reaction_message: { responder: reactionMessageCreationResponder, inputValidator: sendReactionMessageRequestInputValidator, policies: baseLegalPolicies, }, disable_invite_link: { responder: disableInviteLinkResponder, inputValidator: disableInviteLinkInputValidator, policies: baseLegalPolicies, }, edit_message: { responder: editMessageCreationResponder, inputValidator: editMessageRequestInputValidator, policies: baseLegalPolicies, }, create_report: { responder: reportCreationResponder, inputValidator: reportCreationRequestInputValidator, policies: [], }, create_reports: { responder: reportMultiCreationResponder, inputValidator: reportMultiCreationRequestInputValidator, policies: [], }, create_text_message: { responder: textMessageCreationResponder, inputValidator: sendTextMessageRequestInputValidator, policies: baseLegalPolicies, }, create_thread: { responder: threadCreationResponder, inputValidator: newThreadRequestInputValidator, policies: baseLegalPolicies, }, delete_account: { responder: accountDeletionResponder, inputValidator: ignoredArgumentValidator, policies: [], }, delete_entry: { responder: entryDeletionResponder, inputValidator: deleteEntryRequestInputValidator, policies: baseLegalPolicies, }, delete_community_role: { responder: roleDeletionResponder, inputValidator: roleDeletionRequestInputValidator, policies: baseLegalPolicies, }, delete_thread: { responder: threadDeletionResponder, inputValidator: threadDeletionRequestInputValidator, policies: baseLegalPolicies, }, delete_upload: { responder: uploadDeletionResponder, inputValidator: UploadDeletionRequestInputValidator, policies: baseLegalPolicies, }, exact_search_user: { responder: exactUserSearchResponder, inputValidator: exactUserSearchRequestInputValidator, policies: [], }, fetch_entries: { responder: entryFetchResponder, inputValidator: entryQueryInputValidator, policies: baseLegalPolicies, }, fetch_entry_revisions: { responder: entryRevisionFetchResponder, inputValidator: entryRevisionHistoryFetchInputValidator, policies: baseLegalPolicies, }, fetch_error_report_infos: { responder: errorReportFetchInfosResponder, inputValidator: fetchErrorReportInfosRequestInputValidator, policies: baseLegalPolicies, }, fetch_messages: { responder: messageFetchResponder, inputValidator: fetchMessageInfosRequestInputValidator, policies: baseLegalPolicies, }, fetch_pending_updates: { responder: fetchPendingUpdatesResponder, inputValidator: sessionStateValidator, policies: baseLegalPolicies, }, fetch_pinned_messages: { responder: fetchPinnedMessagesResponder, inputValidator: fetchPinnedMessagesResponderInputValidator, policies: baseLegalPolicies, }, fetch_primary_invite_links: { responder: fetchPrimaryInviteLinksResponder, inputValidator: ignoredArgumentValidator, policies: baseLegalPolicies, }, fetch_thread_media: { responder: threadFetchMediaResponder, inputValidator: threadFetchMediaRequestInputValidator, policies: baseLegalPolicies, }, get_initial_redux_state: { responder: getInitialReduxStateResponder, inputValidator: initialReduxStateRequestValidator, policies: [], }, join_thread: { responder: threadJoinResponder, inputValidator: joinThreadRequestInputValidator, policies: baseLegalPolicies, }, keyserver_auth: { responder: keyserverAuthResponder, inputValidator: keyserverAuthRequestInputValidator, policies: [], }, leave_thread: { responder: threadLeaveResponder, inputValidator: leaveThreadRequestInputValidator, policies: baseLegalPolicies, }, log_in: { responder: logInResponder, inputValidator: logInRequestInputValidator, policies: [], }, log_out: { responder: logOutResponder, inputValidator: ignoredArgumentValidator, policies: [], }, modify_community_role: { responder: roleModificationResponder, inputValidator: roleModificationRequestInputValidator, policies: baseLegalPolicies, }, policy_acknowledgment: { responder: policyAcknowledgmentResponder, inputValidator: policyAcknowledgmentRequestInputValidator, policies: [], }, remove_members: { responder: memberRemovalResponder, inputValidator: removeMembersRequestInputValidator, policies: baseLegalPolicies, }, restore_entry: { responder: entryRestorationResponder, inputValidator: restoreEntryRequestInputValidator, policies: baseLegalPolicies, }, search_messages: { responder: searchMessagesResponder, inputValidator: searchMessagesResponderInputValidator, policies: baseLegalPolicies, }, search_users: { responder: userSearchResponder, inputValidator: userSearchRequestInputValidator, policies: baseLegalPolicies, }, send_password_reset_email: { responder: sendPasswordResetEmailResponder, inputValidator: resetPasswordRequestInputValidator, policies: [], }, send_verification_email: { responder: sendVerificationEmailResponder, inputValidator: ignoredArgumentValidator, policies: [], }, set_thread_unread_status: { responder: threadSetUnreadStatusResponder, inputValidator: setThreadUnreadStatusValidator, policies: baseLegalPolicies, }, toggle_message_pin: { responder: toggleMessagePinResponder, inputValidator: toggleMessagePinRequestInputValidator, policies: baseLegalPolicies, }, update_account: { responder: passwordUpdateResponder, inputValidator: accountUpdateInputValidator, policies: baseLegalPolicies, }, update_activity: { responder: updateActivityResponder, inputValidator: updateActivityResponderInputValidator, policies: baseLegalPolicies, }, update_calendar_query: { responder: calendarQueryUpdateResponder, inputValidator: calendarQueryValidator, policies: baseLegalPolicies, }, update_user_settings: { responder: updateUserSettingsResponder, inputValidator: updateUserSettingsInputValidator, policies: baseLegalPolicies, }, update_device_token: { responder: deviceTokenUpdateResponder, inputValidator: deviceTokenUpdateRequestInputValidator, policies: [], }, update_entry: { responder: entryUpdateResponder, inputValidator: saveEntryRequestInputValidator, policies: baseLegalPolicies, }, update_password: { responder: oldPasswordUpdateResponder, inputValidator: updatePasswordRequestInputValidator, policies: baseLegalPolicies, }, update_relationships: { responder: updateRelationshipsResponder, inputValidator: updateRelationshipInputValidator, policies: baseLegalPolicies, }, update_role: { responder: roleUpdateResponder, inputValidator: roleChangeRequestInputValidator, policies: baseLegalPolicies, }, update_thread: { responder: threadUpdateResponder, inputValidator: updateThreadRequestInputValidator, policies: baseLegalPolicies, }, update_user_subscription: { responder: userSubscriptionUpdateResponder, inputValidator: subscriptionUpdateRequestInputValidator, policies: baseLegalPolicies, }, verify_code: { responder: codeVerificationResponder, inputValidator: codeVerificationRequestInputValidator, policies: baseLegalPolicies, }, verify_invite_link: { responder: inviteLinkVerificationResponder, inputValidator: inviteLinkVerificationRequestInputValidator, policies: baseLegalPolicies, }, siwe_nonce: { responder: siweNonceResponder, inputValidator: ignoredArgumentValidator, policies: [], }, siwe_auth: { responder: siweAuthResponder, inputValidator: siweAuthRequestInputValidator, policies: [], }, claim_username: { responder: claimUsernameResponder, inputValidator: claimUsernameRequestInputValidator, policies: [], }, update_user_avatar: { responder: updateUserAvatarResponder, inputValidator: updateUserAvatarRequestValidator, policies: baseLegalPolicies, }, upload_media_metadata: { responder: uploadMediaMetadataResponder, inputValidator: uploadMediaMetadataInputValidator, policies: baseLegalPolicies, }, get_olm_session_initialization_data: { responder: getOlmSessionInitializationDataResponder, inputValidator: ignoredArgumentValidator, policies: [], }, version: { responder: versionResponder, inputValidator: ignoredArgumentValidator, policies: [], }, fetch_community_infos: { responder: fetchCommunityInfosResponder, inputValidator: ignoredArgumentValidator, policies: baseLegalPolicies, }, create_or_update_farcaster_channel_tag: { responder: createOrUpdateFarcasterChannelTagResponder, inputValidator: createOrUpdateFarcasterChannelTagInputValidator, policies: baseLegalPolicies, }, delete_farcaster_channel_tag: { responder: deleteFarcasterChannelTagResponder, inputValidator: deleteFarcasterChannelTagInputValidator, policies: baseLegalPolicies, }, }; function createJSONResponders(obj: { +[Endpoint]: EndpointData }): { +[Endpoint]: JSONResponder, } { const result: { [Endpoint]: JSONResponder } = {}; Object.keys(obj).forEach((endpoint: Endpoint) => { const responder = createJSONResponder( obj[endpoint].responder, obj[endpoint].inputValidator, endpointValidators[endpoint].validator, obj[endpoint].policies, + endpoint, ); result[endpoint] = responder; }); return result; } const jsonEndpoints: { +[Endpoint]: JSONResponder } = createJSONResponders(jsonEndpointsData); export { jsonEndpoints }; diff --git a/keyserver/src/responders/comm-landing-responders.js b/keyserver/src/responders/comm-landing-responders.js index cf4ad1fcf..5b72d7a52 100644 --- a/keyserver/src/responders/comm-landing-responders.js +++ b/keyserver/src/responders/comm-landing-responders.js @@ -1,34 +1,38 @@ // @flow import type { $Response, $Request } from 'express'; import { type EmailSubscriptionRequest } from 'lib/types/account-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { tShape, tEmail } from 'lib/utils/validation-utils.js'; import { sendEmailSubscriptionRequestToAshoat } from '../emails/subscribe-email-updates.js'; import { checkInputValidator } from '../utils/validation-utils.js'; const emailSubscriptionInputValidator = tShape({ email: tEmail, }); async function emailSubscriptionResponder( req: $Request, res: $Response, ): Promise { try { if (!req.body || typeof req.body !== 'object') { throw new ServerError('invalid_parameters'); } const input: any = req.body; - checkInputValidator(emailSubscriptionInputValidator, input); + checkInputValidator( + emailSubscriptionInputValidator, + input, + 'emailSubscriptionResponder', + ); const subscriptionRequest: EmailSubscriptionRequest = input; await sendEmailSubscriptionRequestToAshoat(subscriptionRequest); res.json({ success: true }); } catch { res.json({ success: false }); } } export { emailSubscriptionResponder }; diff --git a/keyserver/src/responders/handlers.js b/keyserver/src/responders/handlers.js index 72606fc28..6b8886f7d 100644 --- a/keyserver/src/responders/handlers.js +++ b/keyserver/src/responders/handlers.js @@ -1,241 +1,248 @@ // @flow import type { $Response, $Request } from 'express'; import type { TType } from 'tcomb'; +import type { Endpoint } from 'lib/types/endpoints.js'; import { ServerError } from 'lib/utils/errors.js'; import { assertWithValidator, tPlatformDetails, } from 'lib/utils/validation-utils.js'; import { getMessageForException } from './utils.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import type { PolicyType } from '../lib/facts/policies.js'; import { fetchViewerForJSONRequest, addCookieToJSONResponse, addCookieToHomeResponse, createNewAnonymousCookie, setCookiePlatformDetails, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { getAppURLFactsFromRequestURL } from '../utils/urls.js'; import { policiesValidator, validateInput, validateOutput, } from '../utils/validation-utils.js'; type InnerJSONResponder = { responder: (viewer: Viewer, input: any) => Promise<*>, requiredPolicies: $ReadOnlyArray, }; export opaque type JSONResponder: InnerJSONResponder = InnerJSONResponder; function createJSONResponder( responder: (Viewer, input: I) => Promise, inputValidator: TType, outputValidator: TType, requiredPolicies: $ReadOnlyArray, + endpoint: Endpoint, ): JSONResponder { return { responder: async (viewer, input) => { - const request = await validateInput(viewer, inputValidator, input); + const request = await validateInput( + viewer, + inputValidator, + input, + endpoint, + ); const result = await responder(viewer, request); return await validateOutput( viewer.platformDetails, outputValidator, result, ); }, requiredPolicies, }; } export type DownloadResponder = ( viewer: Viewer, req: $Request, res: $Response, ) => Promise; export type HTTPGetResponder = DownloadResponder; export type HTMLResponder = (req: $Request, res: $Response) => Promise; function jsonHandler( responder: JSONResponder, expectCookieInvalidation: boolean, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { if (!req.body || typeof req.body !== 'object') { throw new ServerError('invalid_parameters'); } const { input, platformDetails } = req.body; viewer = await fetchViewerForJSONRequest(req); const promises = [policiesValidator(viewer, responder.requiredPolicies)]; if (platformDetails) { if (!tPlatformDetails.is(platformDetails)) { throw new ServerError('invalid_platform_details'); } promises.push( setCookiePlatformDetails( viewer, assertWithValidator(platformDetails, tPlatformDetails), ), ); } await Promise.all(promises); const responderResult = await responder.responder(viewer, input); if (res.headersSent) { return; } const result = { ...responderResult }; addCookieToJSONResponse(viewer, res, result, expectCookieInvalidation); res.json({ success: true, ...result }); } catch (e) { await handleException(e, res, viewer, expectCookieInvalidation); } }; } function httpGetHandler( responder: HTTPGetResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { viewer = await fetchViewerForJSONRequest(req); await responder(viewer, req, res); } catch (e) { await handleException(e, res, viewer); } }; } function downloadHandler( responder: DownloadResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { try { const viewer = await fetchViewerForJSONRequest(req); await responder(viewer, req, res); } catch (e) { // Passing viewer in only makes sense if we want to handle failures as // JSON. We don't, and presume all download handlers avoid ServerError. await handleException(e, res); } }; } async function handleException( error: Error, res: $Response, viewer?: ?Viewer, expectCookieInvalidation?: boolean, ) { console.warn(error); if (res.headersSent) { return; } if (!(error instanceof ServerError)) { res.status(500).send(getMessageForException(error)); return; } const result: Object = error.payload ? { error: error.message, payload: error.payload } : { error: error.message }; if (viewer) { if (error.message === 'client_version_unsupported' && viewer.loggedIn) { // If the client version is unsupported, log the user out const { platformDetails } = error; const [data] = await Promise.all([ createNewAnonymousCookie({ platformDetails, deviceToken: viewer.deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(data); viewer.cookieInvalidated = true; } // This can mutate the result object addCookieToJSONResponse(viewer, res, result, !!expectCookieInvalidation); } res.json(result); } function htmlHandler( responder: HTMLResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { try { addCookieToHomeResponse( req, res, getAppURLFactsFromRequestURL(req.originalUrl), ); res.type('html'); await responder(req, res); } catch (e) { console.warn(e); if (!res.headersSent) { res.status(500).send(getMessageForException(e)); } } }; } type MulterFile = { fieldname: string, originalname: string, encoding: string, mimetype: string, buffer: Buffer, size: number, }; export type MulterRequest = $Request & { files?: $ReadOnlyArray, ... }; type UploadResponder = (viewer: Viewer, req: MulterRequest) => Promise; function uploadHandler( responder: UploadResponder, ): (req: $Request, res: $Response) => Promise { return async (req: $Request, res: $Response) => { let viewer; try { if (!req.body || typeof req.body !== 'object') { throw new ServerError('invalid_parameters'); } viewer = await fetchViewerForJSONRequest(req); const responderResult = await responder( viewer, ((req: any): MulterRequest), ); if (res.headersSent) { return; } const result = { ...responderResult }; addCookieToJSONResponse(viewer, res, result, false); res.json({ success: true, ...result }); } catch (e) { await handleException(e, res, viewer); } }; } export { createJSONResponder, jsonHandler, httpGetHandler, downloadHandler, htmlHandler, uploadHandler, }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index cd592973e..9c8e27e0a 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,752 +1,754 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes, clientResponseInputValidator, } from 'lib/types/request-types.js'; import { sessionCheckFrequency, stateCheckInactivityActivationInterval, sessionStateValidator, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { ignorePromiseRejections } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { fetchDataForSocketInit } from './fetch-data.js'; import { RedisSubscriber } from './redis.js'; import { processClientResponses, checkState } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { getMessageFetchResultFromRedisMessages } from '../fetchers/message-fetchers.js'; import { fetchViewerForSocket, updateCookie, isCookieMissingSignedIdentityKeysBlob, isCookieMissingOlmNotificationsSession, createNewAnonymousCookie, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import type { AnonymousViewerData } from '../session/viewer.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, validateInput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: sessionStateValidator, clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; const minVersionsForCompression = { native: 265, web: 30, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ): Promise => { invariant(typeof messageString === 'string', 'message should be string'); let responseTo = null; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); const clientSocketMessageWithClientIDs = checkInputValidator( clientSocketMessageInputValidator, messageObject, + 'socket message', ); responseTo = clientSocketMessageWithClientIDs.id; if ( clientSocketMessageWithClientIDs.type === clientSocketMessageTypes.INITIAL ) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessageWithClientIDs, ); } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessageWithClientIDs, ); await policiesValidator(viewer, baseLegalPolicies); const clientSocketMessage = await validateInput( viewer, clientSocketMessageInputValidator, clientSocketMessageWithClientIDs, + `socket message type ${clientSocketMessageWithClientIDs.type}`, ); const serverResponses = await this.handleClientSocketMessage(clientSocketMessage); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { ignorePromiseRejections(updateCookie(viewer)); } for (const response of serverResponses) { // Normally it's an anti-pattern to await in sequence like this. But in // this case, we have a requirement that this array of serverResponses // is delivered in order. See here: // https://github.com/CommE2E/comm/blob/101eb34481deb49c609bfd2c785f375886e52666/keyserver/src/socket/socket.js#L566-L568 await this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); await this.sendMessage(errorMessage); return; } invariant(responseTo, 'should be set'); if (error.message === 'socket_deauthorized') { invariant(this.viewer, 'should be set'); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: this.viewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const anonymousViewerDataPromise: Promise = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); const deleteCookiePromise = deleteCookie(viewer.cookieID); const [anonymousViewerData] = await Promise.all([ anonymousViewerDataPromise, deleteCookiePromise, ]); // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: anonViewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = async (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState !== 1) { return; } const { viewer } = this; const validatedMessage = await validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, message, ); const stringMessage = JSON.stringify(validatedMessage); if ( !viewer?.platformDetails || !hasMinCodeVersion(viewer.platformDetails, minVersionsForCompression) || !isStaff(viewer.id) ) { this.ws.send(stringMessage); return; } const compressionResult = await compressMessage(stringMessage); if (this.ws.readyState !== 1) { return; } if (!compressionResult.compressed) { this.ws.send(stringMessage); return; } const compressedMessage = { type: serverSocketMessageTypes.COMPRESSED_MESSAGE, payload: compressionResult.result, }; const validatedCompressedMessage = await validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, compressedMessage, ); const stringCompressedMessage = JSON.stringify(validatedCompressedMessage); this.ws.send(stringCompressedMessage); }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise: Promise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise: Promise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses: Array = []; const { sessionState, clientResponses } = message.payload; const payload = await fetchDataForSocketInit(viewer, sessionState); responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); const [ { serverRequests, activityUpdateResult }, signedIdentityKeysBlobMissing, olmNotificationsSessionMissing, ] = await Promise.all([ processClientResponses(viewer, clientResponses), isCookieMissingSignedIdentityKeysBlob(viewer.cookieID), isCookieMissingOlmNotificationsSession(viewer), ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } if (olmNotificationsSessionMissing) { serverRequests.push({ type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout: { +cancel: () => void } & (() => void) = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity: { +cancel: () => void } & (() => void) = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Partial) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart(): boolean { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { ignorePromiseRejections(this.initiateStateCheck()); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState(viewer, { status: 'state_check', }); invariant(checkStateRequest, 'should be set'); await this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/keyserver/src/utils/validation-utils.js b/keyserver/src/utils/validation-utils.js index 05aa49b9b..c5473e2b5 100644 --- a/keyserver/src/utils/validation-utils.js +++ b/keyserver/src/utils/validation-utils.js @@ -1,235 +1,245 @@ // @flow import type { TType } from 'tcomb'; import type { PolicyType } from 'lib/facts/policies.js'; import { hasMinCodeVersion, hasMinStateVersion, } from 'lib/shared/version-utils.js'; import { type PlatformDetails } from 'lib/types/device-types.js'; import { convertClientIDsToServerIDs, convertObject, convertServerIDsToClientIDs, } from 'lib/utils/conversion-utils.js'; import { ServerError } from 'lib/utils/errors.js'; import { tCookie, tPassword, tPlatform, tPlatformDetails, assertWithValidator, } from 'lib/utils/validation-utils.js'; import { fetchNotAcknowledgedPolicies } from '../fetchers/policy-acknowledgment-fetchers.js'; import { verifyClientSupported } from '../session/version.js'; import type { Viewer } from '../session/viewer.js'; import { thisKeyserverID } from '../user/identity.js'; async function validateInput( viewer: Viewer, inputValidator: TType, input: mixed, + source: string, ): Promise { if (!viewer.isSocket) { await checkClientSupported(viewer, inputValidator, input); } - const convertedInput = checkInputValidator(inputValidator, input); + const convertedInput = checkInputValidator(inputValidator, input, source); const keyserverID = await thisKeyserverID(); if ( hasMinStateVersion(viewer.platformDetails, { native: 43, web: 3, }) ) { try { return convertClientIDsToServerIDs( keyserverID, inputValidator, convertedInput, ); } catch (err) { throw new ServerError(err.message); } } return convertedInput; } async function validateOutput( platformDetails: ?PlatformDetails, outputValidator: TType, data: T, ): Promise { if (!outputValidator.is(data)) { console.trace( 'Output validation failed, validator is:', outputValidator.displayName, ); return data; } const keyserverID = await thisKeyserverID(); if ( hasMinStateVersion(platformDetails, { native: 43, web: 3, }) ) { return convertServerIDsToClientIDs(keyserverID, outputValidator, data); } return data; } -function checkInputValidator(inputValidator: TType, input: mixed): T { +function checkInputValidator( + inputValidator: TType, + input: mixed, + source: string, +): T { if (inputValidator.is(input)) { return assertWithValidator(input, inputValidator); } const error = new ServerError('invalid_parameters'); - error.sanitizedInput = input ? sanitizeInput(inputValidator, input) : null; + try { + error.sanitizedInput = input ? sanitizeInput(inputValidator, input) : null; + } catch { + error.sanitizedInput = null; + } + console.log(`failed input validation on ${source}`); throw error; } async function checkClientSupported( viewer: Viewer, inputValidator: ?TType, input: T, ) { let platformDetails; if (inputValidator) { platformDetails = findFirstInputMatchingValidator( inputValidator, tPlatformDetails, input, ); } if (!platformDetails && inputValidator) { const platform = findFirstInputMatchingValidator( inputValidator, tPlatform, input, ); if (platform) { platformDetails = { platform }; } } if (!platformDetails) { ({ platformDetails } = viewer); } await verifyClientSupported(viewer, platformDetails); } const redactedString = '********'; const redactedTypes = [tPassword, tCookie]; function sanitizeInput(inputValidator: TType, input: T): T { return convertObject( inputValidator, input, redactedTypes, () => redactedString, ); } function findFirstInputMatchingValidator( wholeInputValidator: *, inputValidatorToMatch: *, input: *, ): any { if (!wholeInputValidator || input === null || input === undefined) { return null; } if ( wholeInputValidator === inputValidatorToMatch && wholeInputValidator.is(input) ) { return input; } if (wholeInputValidator.meta.kind === 'maybe') { return findFirstInputMatchingValidator( wholeInputValidator.meta.type, inputValidatorToMatch, input, ); } if ( wholeInputValidator.meta.kind === 'interface' && typeof input === 'object' ) { for (const key in input) { const value = input[key]; const validator = wholeInputValidator.meta.props[key]; const innerResult = findFirstInputMatchingValidator( validator, inputValidatorToMatch, value, ); if (innerResult) { return innerResult; } } } if (wholeInputValidator.meta.kind === 'union') { for (const validator of wholeInputValidator.meta.types) { if (validator.is(input)) { return findFirstInputMatchingValidator( validator, inputValidatorToMatch, input, ); } } } if (wholeInputValidator.meta.kind === 'list' && Array.isArray(input)) { const validator = wholeInputValidator.meta.type; for (const value of input) { const innerResult = findFirstInputMatchingValidator( validator, inputValidatorToMatch, value, ); if (innerResult) { return innerResult; } } } return null; } async function policiesValidator( viewer: Viewer, policies: $ReadOnlyArray, ) { if (!policies.length || !viewer.loggedIn) { return; } if (!hasMinCodeVersion(viewer.platformDetails, { native: 181 })) { return; } const notAcknowledgedPolicies = await fetchNotAcknowledgedPolicies( viewer.id, policies, ); if (notAcknowledgedPolicies.length) { throw new ServerError('policies_not_accepted', { notAcknowledgedPolicies, }); } } export { validateInput, validateOutput, checkInputValidator, redactedString, sanitizeInput, findFirstInputMatchingValidator, checkClientSupported, policiesValidator, };