diff --git a/keyserver/src/responders/activity-responders.js b/keyserver/src/responders/activity-responders.js index 5f93bea93..fd5a014b6 100644 --- a/keyserver/src/responders/activity-responders.js +++ b/keyserver/src/responders/activity-responders.js @@ -1,54 +1,50 @@ // @flow import t, { type TInterface, type TList } from 'tcomb'; import { type UpdateActivityResult, type UpdateActivityRequest, type SetThreadUnreadStatusRequest, type SetThreadUnreadStatusResult, type ActivityUpdate, activityUpdateValidator, } from 'lib/types/activity-types.js'; import { tShape, tID } from 'lib/utils/validation-utils.js'; import type { Viewer } from '../session/viewer.js'; import { activityUpdater, setThreadUnreadStatus, } from '../updaters/activity-updaters.js'; const activityUpdatesInputValidator: TList> = t.list( activityUpdateValidator, ); export const updateActivityResponderInputValidator: TInterface = tShape({ updates: activityUpdatesInputValidator, }); async function updateActivityResponder( viewer: Viewer, request: UpdateActivityRequest, ): Promise { return await activityUpdater(viewer, request); } export const setThreadUnreadStatusValidator: TInterface = tShape({ threadID: tID, unread: t.Bool, latestMessage: t.maybe(tID), }); async function threadSetUnreadStatusResponder( viewer: Viewer, request: SetThreadUnreadStatusRequest, ): Promise { return await setThreadUnreadStatus(viewer, request); } -export { - activityUpdatesInputValidator, - updateActivityResponder, - threadSetUnreadStatusResponder, -}; +export { updateActivityResponder, threadSetUnreadStatusResponder }; diff --git a/keyserver/src/responders/report-responders.js b/keyserver/src/responders/report-responders.js index 2bb510cb0..911d7e358 100644 --- a/keyserver/src/responders/report-responders.js +++ b/keyserver/src/responders/report-responders.js @@ -1,218 +1,175 @@ // @flow import type { $Response, $Request } from 'express'; import t from 'tcomb'; -import type { TInterface, TStructProps, TUnion } from 'tcomb'; +import type { TInterface, TUnion } from 'tcomb'; -import { calendarQueryValidator } from 'lib/types/entry-types.js'; -import type { BaseAction } from 'lib/types/redux-types.js'; import { type ReportCreationResponse, type ReportCreationRequest, type FetchErrorReportInfosResponse, type FetchErrorReportInfosRequest, - type ThreadInconsistencyReportShape, - type EntryInconsistencyReportShape, - type ActionSummary, type ThreadInconsistencyReportCreationRequest, type EntryInconsistencyReportCreationRequest, type MediaMissionReportCreationRequest, type UserInconsistencyReportCreationRequest, reportTypes, + threadInconsistencyReportValidatorShape, + entryInconsistencyReportValidatorShape, + userInconsistencyReportValidatorShape, } from 'lib/types/report-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { tShape, tPlatformDetails } from 'lib/utils/validation-utils.js'; import createReport from '../creators/report-creator.js'; import { fetchErrorReportInfos, fetchReduxToolsImport, } from '../fetchers/report-fetchers.js'; import type { Viewer } from '../session/viewer.js'; -const tActionSummary = tShape({ - type: t.String, - time: t.Number, - summary: t.String, -}); -const tActionType = t.irreducible<$PropertyType>( - 'ActionType', - x => typeof x === 'string', -); -const threadInconsistencyReportValidatorShape: TStructProps = - { - platformDetails: tPlatformDetails, - beforeAction: t.Object, - action: t.Object, - pollResult: t.maybe(t.Object), - pushResult: t.Object, - lastActionTypes: t.maybe(t.list(tActionType)), - lastActions: t.maybe(t.list(tActionSummary)), - time: t.maybe(t.Number), - }; -const entryInconsistencyReportValidatorShape: TStructProps = - { - platformDetails: tPlatformDetails, - beforeAction: t.Object, - action: t.Object, - calendarQuery: calendarQueryValidator, - pollResult: t.maybe(t.Object), - pushResult: t.Object, - lastActionTypes: t.maybe(t.list(tActionType)), - lastActions: t.maybe(t.list(tActionSummary)), - time: t.Number, - }; -const userInconsistencyReportValidatorShape = { - platformDetails: tPlatformDetails, - action: t.Object, - beforeStateCheck: t.Object, - afterStateCheck: t.Object, - lastActions: t.list(tActionSummary), - time: t.Number, -}; - const threadInconsistencyReportCreationRequest = tShape({ ...threadInconsistencyReportValidatorShape, type: t.irreducible( 'reportTypes.THREAD_INCONSISTENCY', x => x === reportTypes.THREAD_INCONSISTENCY, ), }); const entryInconsistencyReportCreationRquest = tShape({ ...entryInconsistencyReportValidatorShape, type: t.irreducible( 'reportTypes.ENTRY_INCONSISTENCY', x => x === reportTypes.ENTRY_INCONSISTENCY, ), }); const mediaMissionReportCreationRequest = tShape({ type: t.irreducible( 'reportTypes.MEDIA_MISSION', x => x === reportTypes.MEDIA_MISSION, ), platformDetails: tPlatformDetails, time: t.Number, mediaMission: t.Object, uploadServerID: t.maybe(t.String), uploadLocalID: t.maybe(t.String), mediaLocalID: t.maybe(t.String), messageServerID: t.maybe(t.String), messageLocalID: t.maybe(t.String), }); const userInconsistencyReportCreationRequest = tShape({ ...userInconsistencyReportValidatorShape, type: t.irreducible( 'reportTypes.USER_INCONSISTENCY', x => x === reportTypes.USER_INCONSISTENCY, ), }); export const reportCreationRequestInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'reportTypes.ERROR', x => x === reportTypes.ERROR, ), platformDetails: tPlatformDetails, errors: t.list( tShape({ errorMessage: t.String, stack: t.maybe(t.String), componentStack: t.maybe(t.String), }), ), preloadedState: t.Object, currentState: t.Object, actions: t.list(t.Object), }), threadInconsistencyReportCreationRequest, entryInconsistencyReportCreationRquest, mediaMissionReportCreationRequest, userInconsistencyReportCreationRequest, ]); async function reportCreationResponder( viewer: Viewer, request: ReportCreationRequest, ): Promise { if (request.type === null || request.type === undefined) { request.type = reportTypes.ERROR; } if (!request.platformDetails && request.deviceType) { const { deviceType, codeVersion, stateVersion, ...rest } = request; request = { ...rest, platformDetails: { platform: deviceType, codeVersion, stateVersion }, }; } const response = await createReport(viewer, request); if (!response) { throw new ServerError('ignored_report'); } return response; } export const reportMultiCreationRequestInputValidator: TInterface = tShape({ reports: t.list(reportCreationRequestInputValidator), }); type ReportMultiCreationRequest = { +reports: $ReadOnlyArray, }; async function reportMultiCreationResponder( viewer: Viewer, request: ReportMultiCreationRequest, ): Promise { await Promise.all( request.reports.map(reportCreationRequest => createReport(viewer, reportCreationRequest), ), ); } export const fetchErrorReportInfosRequestInputValidator: TInterface = tShape({ cursor: t.maybe(t.String), }); async function errorReportFetchInfosResponder( viewer: Viewer, request: FetchErrorReportInfosRequest, ): Promise { return await fetchErrorReportInfos(viewer, request); } async function errorReportDownloadResponder( viewer: Viewer, req: $Request, res: $Response, ): Promise { const id = req.params.reportID; if (!id) { throw new ServerError('invalid_parameters'); } const result = await fetchReduxToolsImport(viewer, id); res.set('Content-Disposition', `attachment; filename=report-${id}.json`); res.json({ preloadedState: JSON.stringify(result.preloadedState), payload: JSON.stringify(result.payload), }); } export { threadInconsistencyReportValidatorShape, entryInconsistencyReportValidatorShape, reportCreationResponder, reportMultiCreationResponder, errorReportFetchInfosResponder, errorReportDownloadResponder, }; diff --git a/keyserver/src/socket/session-utils.js b/keyserver/src/socket/session-utils.js index 7c99be787..cbffef0a5 100644 --- a/keyserver/src/socket/session-utils.js +++ b/keyserver/src/socket/session-utils.js @@ -1,535 +1,459 @@ // @flow import invariant from 'invariant'; import t from 'tcomb'; -import type { TUnion } from 'tcomb'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { UpdateActivityResult, ActivityUpdate, } from 'lib/types/activity-types.js'; import type { IdentityKeysBlob } from 'lib/types/crypto-types.js'; import { isDeviceType } from 'lib/types/device-types.js'; import type { CalendarQuery, DeltaEntryInfosResponse, } from 'lib/types/entry-types.js'; import { reportTypes, type ThreadInconsistencyReportCreationRequest, type EntryInconsistencyReportCreationRequest, } from 'lib/types/report-types.js'; import { serverRequestTypes, type ThreadInconsistencyClientResponse, type EntryInconsistencyClientResponse, type ClientResponse, type ServerServerRequest, type ServerCheckStateServerRequest, } from 'lib/types/request-types.js'; import { sessionCheckFrequency } from 'lib/types/session-types.js'; -import { signedIdentityKeysBlobValidator } from 'lib/utils/crypto-utils.js'; import { hash, values } from 'lib/utils/objects.js'; import { promiseAll, ignorePromiseRejections } from 'lib/utils/promises.js'; -import { - tShape, - tPlatform, - tPlatformDetails, -} from 'lib/utils/validation-utils.js'; import { createAndPersistOlmSession } from '../creators/olm-session-creator.js'; import createReport from '../creators/report-creator.js'; import { fetchEntriesForSession } from '../fetchers/entry-fetchers.js'; -import { activityUpdatesInputValidator } from '../responders/activity-responders.js'; -import { - threadInconsistencyReportValidatorShape, - entryInconsistencyReportValidatorShape, -} from '../responders/report-responders.js'; import { setNewSession, setCookiePlatform, setCookiePlatformDetails, setCookieSignedIdentityKeysBlob, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { activityUpdater } from '../updaters/activity-updaters.js'; import { compareNewCalendarQuery } from '../updaters/entry-updaters.js'; import type { SessionUpdate } from '../updaters/session-updaters.js'; import { getOlmUtility } from '../utils/olm-utils.js'; -const clientResponseInputValidator: TUnion = t.union([ - tShape({ - type: t.irreducible( - 'serverRequestTypes.PLATFORM', - x => x === serverRequestTypes.PLATFORM, - ), - platform: tPlatform, - }), - tShape({ - ...threadInconsistencyReportValidatorShape, - type: t.irreducible( - 'serverRequestTypes.THREAD_INCONSISTENCY', - x => x === serverRequestTypes.THREAD_INCONSISTENCY, - ), - }), - tShape({ - ...entryInconsistencyReportValidatorShape, - type: t.irreducible( - 'serverRequestTypes.ENTRY_INCONSISTENCY', - x => x === serverRequestTypes.ENTRY_INCONSISTENCY, - ), - }), - tShape({ - type: t.irreducible( - 'serverRequestTypes.PLATFORM_DETAILS', - x => x === serverRequestTypes.PLATFORM_DETAILS, - ), - platformDetails: tPlatformDetails, - }), - tShape({ - type: t.irreducible( - 'serverRequestTypes.CHECK_STATE', - x => x === serverRequestTypes.CHECK_STATE, - ), - hashResults: t.dict(t.String, t.Boolean), - }), - tShape({ - type: t.irreducible( - 'serverRequestTypes.INITIAL_ACTIVITY_UPDATES', - x => x === serverRequestTypes.INITIAL_ACTIVITY_UPDATES, - ), - activityUpdates: activityUpdatesInputValidator, - }), - tShape({ - type: t.irreducible( - 'serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB', - x => x === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, - ), - signedIdentityKeysBlob: signedIdentityKeysBlobValidator, - }), - tShape({ - type: t.irreducible( - 'serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE', - x => x === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, - ), - initialNotificationsEncryptedMessage: t.String, - }), -]); - type StateCheckStatus = | { status: 'state_validated' } | { status: 'state_invalid', invalidKeys: $ReadOnlyArray } | { status: 'state_check' }; type ProcessClientResponsesResult = { serverRequests: ServerServerRequest[], stateCheckStatus: ?StateCheckStatus, activityUpdateResult: ?UpdateActivityResult, }; async function processClientResponses( viewer: Viewer, clientResponses: $ReadOnlyArray, ): Promise { let viewerMissingPlatform = !viewer.platform; const { platformDetails } = viewer; let viewerMissingPlatformDetails = !platformDetails || (isDeviceType(viewer.platform) && (platformDetails.codeVersion === null || platformDetails.codeVersion === undefined || platformDetails.stateVersion === null || platformDetails.stateVersion === undefined)); const promises = []; let activityUpdates: Array = []; let stateCheckStatus = null; const clientSentPlatformDetails = clientResponses.some( response => response.type === serverRequestTypes.PLATFORM_DETAILS, ); for (const clientResponse of clientResponses) { if ( clientResponse.type === serverRequestTypes.PLATFORM && !clientSentPlatformDetails ) { promises.push(setCookiePlatform(viewer, clientResponse.platform)); viewerMissingPlatform = false; if (!isDeviceType(clientResponse.platform)) { viewerMissingPlatformDetails = false; } } else if ( clientResponse.type === serverRequestTypes.THREAD_INCONSISTENCY ) { promises.push(recordThreadInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.ENTRY_INCONSISTENCY) { promises.push(recordEntryInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.PLATFORM_DETAILS) { promises.push( setCookiePlatformDetails(viewer, clientResponse.platformDetails), ); viewerMissingPlatform = false; viewerMissingPlatformDetails = false; } else if ( clientResponse.type === serverRequestTypes.INITIAL_ACTIVITY_UPDATES ) { activityUpdates = [...activityUpdates, ...clientResponse.activityUpdates]; } else if (clientResponse.type === serverRequestTypes.CHECK_STATE) { const invalidKeys = []; for (const key in clientResponse.hashResults) { const result = clientResponse.hashResults[key]; if (!result) { invalidKeys.push(key); } } stateCheckStatus = invalidKeys.length > 0 ? { status: 'state_invalid', invalidKeys } : { status: 'state_validated' }; } else if ( clientResponse.type === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB ) { invariant( clientResponse.signedIdentityKeysBlob, 'signedIdentityKeysBlob expected in client response', ); const { signedIdentityKeysBlob } = clientResponse; const identityKeys: IdentityKeysBlob = JSON.parse( signedIdentityKeysBlob.payload, ); const olmUtil = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); ignorePromiseRejections( setCookieSignedIdentityKeysBlob( viewer.cookieID, signedIdentityKeysBlob, ), ); } catch (e) { continue; } } else if ( clientResponse.type === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE ) { invariant( t.String.is(clientResponse.initialNotificationsEncryptedMessage), 'initialNotificationsEncryptedMessage expected in client response', ); const { initialNotificationsEncryptedMessage } = clientResponse; try { await createAndPersistOlmSession( initialNotificationsEncryptedMessage, 'notifications', viewer.cookieID, ); } catch (e) { continue; } } } const activityUpdatePromise: Promise = (async () => { if (activityUpdates.length === 0) { return undefined; } return await activityUpdater(viewer, { updates: activityUpdates }); })(); const serverRequests: Array = []; const { activityUpdateResult } = await promiseAll({ all: Promise.all(promises), activityUpdateResult: activityUpdatePromise, }); if ( !stateCheckStatus && viewer.loggedIn && viewer.sessionLastValidated + sessionCheckFrequency < Date.now() ) { stateCheckStatus = { status: 'state_check' }; } if (viewerMissingPlatform) { serverRequests.push({ type: serverRequestTypes.PLATFORM }); } if (viewerMissingPlatformDetails) { serverRequests.push({ type: serverRequestTypes.PLATFORM_DETAILS }); } return { serverRequests, stateCheckStatus, activityUpdateResult }; } async function recordThreadInconsistency( viewer: Viewer, response: ThreadInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.THREAD_INCONSISTENCY, }: ThreadInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } async function recordEntryInconsistency( viewer: Viewer, response: EntryInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.ENTRY_INCONSISTENCY, }: EntryInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } type SessionInitializationResult = | { sessionContinued: false } | { sessionContinued: true, deltaEntryInfoResult: DeltaEntryInfosResponse, sessionUpdate: SessionUpdate, }; async function initializeSession( viewer: Viewer, calendarQuery: CalendarQuery, oldLastUpdate: number, ): Promise { if (!viewer.loggedIn) { return { sessionContinued: false }; } if (!viewer.hasSessionInfo) { // If the viewer has no session info but is logged in, that is indicative // of an expired / invalidated session and we should generate a new one await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } if (oldLastUpdate < viewer.sessionLastUpdated) { // If the client has an older last_update than the server is tracking for // that client, then the client either had some issue persisting its store, // or the user restored the client app from a backup. Either way, we should // invalidate the existing session, since the server has assumed that the // checkpoint is further along than it is on the client, and might not still // have all of the updates necessary to do an incremental update await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } let comparisonResult = null; try { comparisonResult = compareNewCalendarQuery(viewer, calendarQuery); } catch (e) { if (e.message !== 'unknown_error') { throw e; } } if (comparisonResult) { const { difference, oldCalendarQuery } = comparisonResult; const sessionUpdate = { ...comparisonResult.sessionUpdate, lastUpdate: oldLastUpdate, }; const deltaEntryInfoResult = await fetchEntriesForSession( viewer, difference, oldCalendarQuery, ); return { sessionContinued: true, deltaEntryInfoResult, sessionUpdate }; } else { await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } } type StateCheckResult = { sessionUpdate?: SessionUpdate, checkStateRequest?: ServerCheckStateServerRequest, }; async function checkState( viewer: Viewer, status: StateCheckStatus, ): Promise { if (status.status === 'state_validated') { return { sessionUpdate: { lastValidated: Date.now() } }; } else if (status.status === 'state_check') { const promises = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, (async () => { if ( !hasMinCodeVersion(viewer.platformDetails, { native: 267, web: 32, }) ) { const data = await spec.fetch(viewer); return hash(data); } const infosHash = await spec.fetchServerInfosHash(viewer); return infosHash; })(), ]), ); const hashesToCheck = await promiseAll(promises); const checkStateRequest = { type: serverRequestTypes.CHECK_STATE, hashesToCheck, }; return { checkStateRequest }; } const invalidKeys = new Set(status.invalidKeys); const shouldFetchAll = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, invalidKeys.has(spec.hashKey), ]), ); const idsToFetch = Object.fromEntries( values(serverStateSyncSpecs) .filter(spec => spec.innerHashSpec?.hashKey) .map(spec => [spec.innerHashSpec?.hashKey, new Set()]), ); for (const key of invalidKeys) { const [innerHashKey, id] = key.split('|'); if (innerHashKey && id) { idsToFetch[innerHashKey]?.add(id); } } const fetchPromises: { [string]: Promise } = {}; for (const spec of values(serverStateSyncSpecs)) { if (shouldFetchAll[spec.hashKey]) { fetchPromises[spec.hashKey] = spec.fetch(viewer); } else if (idsToFetch[spec.innerHashSpec?.hashKey]?.size > 0) { fetchPromises[spec.hashKey] = spec.fetch( viewer, idsToFetch[spec.innerHashSpec?.hashKey], ); } } const fetchedData = await promiseAll(fetchPromises); const specPerHashKey = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [spec.hashKey, spec]), ); const specPerInnerHashKey = Object.fromEntries( values(serverStateSyncSpecs) .filter(spec => spec.innerHashSpec?.hashKey) .map(spec => [spec.innerHashSpec?.hashKey, spec]), ); const hashesToCheck: { [string]: number } = {}, failUnmentioned: { [string]: boolean } = {}, stateChanges: { [string]: mixed } = {}; for (const key of invalidKeys) { const spec = specPerHashKey[key]; const innerHashKey = spec?.innerHashSpec?.hashKey; const isTopLevelKey = !!spec; if (isTopLevelKey && innerHashKey) { // Instead of returning all the infos, we want to narrow down and figure // out which infos don't match first const infos = fetchedData[key]; // We have a type error here because in fact the relationship between // Infos and Info is not guaranteed to be like this. In particular, // currentUserStateSyncSpec does not match this pattern. But this code // doesn't fire for it because no innerHashSpec is defined const iterableInfos: { +[string]: mixed } = (infos: any); for (const infoID in iterableInfos) { let hashValue; if ( hasMinCodeVersion(viewer.platformDetails, { native: 267, web: 32, }) ) { // We have a type error here because Flow has no way to determine that // spec and infos are matched up hashValue = await spec.getServerInfoHash( (iterableInfos[infoID]: any), ); } else { hashValue = hash(iterableInfos[infoID]); } hashesToCheck[`${innerHashKey}|${infoID}`] = hashValue; } failUnmentioned[key] = true; } else if (isTopLevelKey) { stateChanges[key] = fetchedData[key]; } else { const [keyPrefix, id] = key.split('|'); const innerSpec = specPerInnerHashKey[keyPrefix]; const innerHashSpec = innerSpec?.innerHashSpec; if (!innerHashSpec || !id) { continue; } const infos = fetchedData[innerSpec.hashKey]; // We have a type error here because in fact the relationship between // Infos and Info is not guaranteed to be like this. In particular, // currentUserStateSyncSpec does not match this pattern. But this code // doesn't fire for it because no innerHashSpec is defined const iterableInfos: { +[string]: mixed } = (infos: any); const info = iterableInfos[id]; // We have a type error here because Flow wants us to type iterableInfos // in this file, but we don't have access to the parameterization of // innerHashSpec here if (!info || innerHashSpec.additionalDeleteCondition?.((info: any))) { if (!stateChanges[innerHashSpec.deleteKey]) { stateChanges[innerHashSpec.deleteKey] = [id]; } else { // We have a type error here because in fact stateChanges values // aren't always arrays. In particular, currentUserStateSyncSpec does // not match this pattern. But this code doesn't fire for it because // no innerHashSpec is defined const curDeleteKeyChanges: Array = (stateChanges[ innerHashSpec.deleteKey ]: any); curDeleteKeyChanges.push(id); } continue; } if (!stateChanges[innerHashSpec.rawInfosKey]) { stateChanges[innerHashSpec.rawInfosKey] = [info]; } else { // We have a type error here because in fact stateChanges values aren't // always arrays. In particular, currentUserStateSyncSpec does not match // this pattern. But this code doesn't fire for it because no // innerHashSpec is defined const curRawInfosKeyChanges: Array = (stateChanges[ innerHashSpec.rawInfosKey ]: any); curRawInfosKeyChanges.push(info); } } } // We have a type error here because the keys that get set on some of these // collections aren't statically typed when they're set. Rather, they are set // as arbitrary strings const checkStateRequest: ServerCheckStateServerRequest = ({ type: serverRequestTypes.CHECK_STATE, hashesToCheck, failUnmentioned, stateChanges, }: any); if (Object.keys(hashesToCheck).length === 0) { return { checkStateRequest, sessionUpdate: { lastValidated: Date.now() } }; } else { return { checkStateRequest }; } } -export { - clientResponseInputValidator, - processClientResponses, - initializeSession, - checkState, -}; +export { processClientResponses, initializeSession, checkState }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index 5f663e84d..66535156a 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,891 +1,893 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import { type RawEntryInfo, calendarQueryValidator, } from 'lib/types/entry-types.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; -import { serverRequestTypes } from 'lib/types/request-types.js'; +import { + serverRequestTypes, + clientResponseInputValidator, +} from 'lib/types/request-types.js'; import { sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import type { LegacyRawThreadInfos } from 'lib/types/thread-types.js'; import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll, ignorePromiseRejections } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { RedisSubscriber } from './redis.js'; import { - clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { verifyCalendarQueryThreadIDs } from '../responders/entry-responders.js'; import { fetchViewerForSocket, updateCookie, isCookieMissingSignedIdentityKeysBlob, isCookieMissingOlmNotificationsSession, createNewAnonymousCookie, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import type { AnonymousViewerData } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, validateInput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: calendarQueryValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; const minVersionsForCompression = { native: 265, web: 30, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ): Promise => { invariant(typeof messageString === 'string', 'message should be string'); let responseTo = null; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); const clientSocketMessageWithClientIDs = checkInputValidator( clientSocketMessageInputValidator, messageObject, ); responseTo = clientSocketMessageWithClientIDs.id; if ( clientSocketMessageWithClientIDs.type === clientSocketMessageTypes.INITIAL ) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessageWithClientIDs, ); } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessageWithClientIDs, ); await policiesValidator(viewer, baseLegalPolicies); const clientSocketMessage = await validateInput( viewer, clientSocketMessageInputValidator, clientSocketMessageWithClientIDs, ); const serverResponses = await this.handleClientSocketMessage(clientSocketMessage); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { ignorePromiseRejections(updateCookie(viewer)); } for (const response of serverResponses) { // Normally it's an anti-pattern to await in sequence like this. But in // this case, we have a requirement that this array of serverResponses // is delivered in order. See here: // https://github.com/CommE2E/comm/blob/101eb34481deb49c609bfd2c785f375886e52666/keyserver/src/socket/socket.js#L566-L568 await this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); await this.sendMessage(errorMessage); return; } invariant(responseTo, 'should be set'); if (error.message === 'socket_deauthorized') { invariant(this.viewer, 'should be set'); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: this.viewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const anonymousViewerDataPromise: Promise = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); const deleteCookiePromise = deleteCookie(viewer.cookieID); const [anonymousViewerData] = await Promise.all([ anonymousViewerDataPromise, deleteCookiePromise, ]); // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: anonViewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = async (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState !== 1) { return; } const { viewer } = this; const validatedMessage = await validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, message, ); const stringMessage = JSON.stringify(validatedMessage); if ( !viewer?.platformDetails || !hasMinCodeVersion(viewer.platformDetails, minVersionsForCompression) || !isStaff(viewer.id) ) { this.ws.send(stringMessage); return; } const compressionResult = await compressMessage(stringMessage); if (this.ws.readyState !== 1) { return; } if (!compressionResult.compressed) { this.ws.send(stringMessage); return; } const compressedMessage = { type: serverSocketMessageTypes.COMPRESSED_MESSAGE, payload: compressionResult.result, }; const validatedCompressedMessage = await validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, compressedMessage, ); const stringCompressedMessage = JSON.stringify(validatedCompressedMessage); this.ws.send(stringCompressedMessage); }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise: Promise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise: Promise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses: Array = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors: { [string]: null } = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; const isCookieMissingSignedIdentityKeysBlobPromise = isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); const isCookieMissingOlmNotificationsSessionPromise = isCookieMissingOlmNotificationsSession(viewer); if (!sessionInitializationResult.sessionContinued) { const promises: { +[string]: Promise } = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), ]), ); // We have a type error here because Flow doesn't know spec.hashKey const castPromises: { +threadInfos: Promise, +currentUserInfo: Promise, +entryInfos: Promise<$ReadOnlyArray>, +userInfos: Promise<$ReadOnlyArray>, } = (promises: any); const results = await promiseAll(castPromises); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: results.threadInfos, currentUserInfo: results.currentUserInfo, rawEntryInfos: results.entryInfos, userInfos: results.userInfos, updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; const deleteExpiredUpdatesPromise = deleteUpdatesBeforeTimeTargetingSession(viewer, oldUpdatesCurrentAsOf); const fetchUpdateResultPromise = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); const [fetchUpdateResult] = await Promise.all([ fetchUpdateResultPromise, deleteExpiredUpdatesPromise, sessionUpdatePromise, ]); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = await Promise.all([ isCookieMissingSignedIdentityKeysBlobPromise, isCookieMissingOlmNotificationsSessionPromise, ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } if (olmNotificationsSessionMissing) { serverRequests.push({ type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout: { +cancel: () => void } & (() => void) = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity: { +cancel: () => void } & (() => void) = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Partial) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart(): boolean { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { ignorePromiseRejections(this.initiateStateCheck()); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState(viewer, { status: 'state_check', }); invariant(checkStateRequest, 'should be set'); await this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/lib/types/report-types.js b/lib/types/report-types.js index a0173fef2..7ab670791 100644 --- a/lib/types/report-types.js +++ b/lib/types/report-types.js @@ -1,242 +1,289 @@ // @flow import invariant from 'invariant'; -import t, { type TInterface } from 'tcomb'; +import t, { type TInterface, type TStructProps } from 'tcomb'; import { type PlatformDetails } from './device-types.js'; -import { type RawEntryInfo, type CalendarQuery } from './entry-types.js'; +import { + type RawEntryInfo, + type CalendarQuery, + calendarQueryValidator, +} from './entry-types.js'; import { type MediaMission } from './media-types.js'; import type { AppState, BaseAction } from './redux-types.js'; import { type MixedRawThreadInfos } from './thread-types.js'; import type { UserInfo, UserInfos } from './user-types.js'; import { tPlatformDetails, tShape, tUserID, } from '../utils/validation-utils.js'; export type EnabledReports = { +crashReports: boolean, +inconsistencyReports: boolean, +mediaReports: boolean, }; export type SupportedReports = $Keys; export const defaultEnabledReports: EnabledReports = { crashReports: false, inconsistencyReports: false, mediaReports: false, }; export const defaultDevEnabledReports: EnabledReports = { crashReports: true, inconsistencyReports: true, mediaReports: true, }; export type ReportStore = { +enabledReports: EnabledReports, +queuedReports: $ReadOnlyArray, }; export const reportTypes = Object.freeze({ ERROR: 0, THREAD_INCONSISTENCY: 1, ENTRY_INCONSISTENCY: 2, MEDIA_MISSION: 3, USER_INCONSISTENCY: 4, }); type ReportType = $Values; export function assertReportType(reportType: number): ReportType { invariant( reportType === 0 || reportType === 1 || reportType === 2 || reportType === 3 || reportType === 4, 'number is not ReportType enum', ); return reportType; } export type ErrorInfo = { componentStack: string, ... }; export type ErrorData = { error: Error, info?: ErrorInfo }; export type FlatErrorData = { errorMessage: string, stack?: string, componentStack?: ?string, }; export type ActionSummary = { +type: $PropertyType, +time: number, +summary: string, }; export type ThreadInconsistencyReportShape = { +platformDetails: PlatformDetails, +beforeAction: MixedRawThreadInfos, +action: BaseAction, +pollResult?: ?MixedRawThreadInfos, +pushResult: MixedRawThreadInfos, +lastActionTypes?: ?$ReadOnlyArray<$PropertyType>, +lastActions?: ?$ReadOnlyArray, +time?: ?number, }; export type EntryInconsistencyReportShape = { +platformDetails: PlatformDetails, +beforeAction: { +[id: string]: RawEntryInfo }, +action: BaseAction, +calendarQuery: CalendarQuery, +pollResult?: ?{ +[id: string]: RawEntryInfo }, +pushResult: { +[id: string]: RawEntryInfo }, +lastActionTypes?: ?$ReadOnlyArray<$PropertyType>, +lastActions?: ?$ReadOnlyArray, +time: number, }; export type UserInconsistencyReportShape = { +platformDetails: PlatformDetails, +action: BaseAction, +beforeStateCheck: UserInfos, +afterStateCheck: UserInfos, +lastActions: $ReadOnlyArray, +time: number, }; export type ErrorReportCreationRequest = { +type: 0, +platformDetails: PlatformDetails, +errors: $ReadOnlyArray, +preloadedState: AppState, +currentState: AppState, +actions: $ReadOnlyArray, }; export type ThreadInconsistencyReportCreationRequest = { ...ThreadInconsistencyReportShape, +type: 1, }; export type EntryInconsistencyReportCreationRequest = { ...EntryInconsistencyReportShape, +type: 2, }; export type MediaMissionReportCreationRequest = { +type: 3, +platformDetails: PlatformDetails, +time: number, // ms +mediaMission: MediaMission, +uploadServerID?: ?string, +uploadLocalID?: ?string, +mediaLocalID?: ?string, // deprecated +messageServerID?: ?string, +messageLocalID?: ?string, }; export type UserInconsistencyReportCreationRequest = { ...UserInconsistencyReportShape, +type: 4, }; export type ReportCreationRequest = | ErrorReportCreationRequest | ThreadInconsistencyReportCreationRequest | EntryInconsistencyReportCreationRequest | MediaMissionReportCreationRequest | UserInconsistencyReportCreationRequest; export type ClientThreadInconsistencyReportShape = { +platformDetails: PlatformDetails, +beforeAction: MixedRawThreadInfos, +action: BaseAction, +pushResult: MixedRawThreadInfos, +lastActions: $ReadOnlyArray, +time: number, }; export type ClientEntryInconsistencyReportShape = { +platformDetails: PlatformDetails, +beforeAction: { +[id: string]: RawEntryInfo }, +action: BaseAction, +calendarQuery: CalendarQuery, +pushResult: { +[id: string]: RawEntryInfo }, +lastActions: $ReadOnlyArray, +time: number, }; export type ClientErrorReportCreationRequest = { ...ErrorReportCreationRequest, +id: string, }; export type ClientThreadInconsistencyReportCreationRequest = { ...ClientThreadInconsistencyReportShape, +type: 1, +id: string, }; export type ClientEntryInconsistencyReportCreationRequest = { ...ClientEntryInconsistencyReportShape, +type: 2, +id: string, }; export type ClientMediaMissionReportCreationRequest = { ...MediaMissionReportCreationRequest, +id: string, }; export type ClientUserInconsistencyReportCreationRequest = { ...UserInconsistencyReportCreationRequest, +id: string, }; export type ClientReportCreationRequest = | ClientErrorReportCreationRequest | ClientThreadInconsistencyReportCreationRequest | ClientEntryInconsistencyReportCreationRequest | ClientMediaMissionReportCreationRequest | ClientUserInconsistencyReportCreationRequest; export type QueueReportsPayload = { +reports: $ReadOnlyArray, }; export type ClearDeliveredReportsPayload = { +reports: $ReadOnlyArray, }; export type ReportCreationResponse = { +id: string, }; // Reports Service specific types export type ReportsServiceSendReportsRequest = | ClientReportCreationRequest | $ReadOnlyArray; export type ReportsServiceSendReportsResponse = { +reportIDs: $ReadOnlyArray, }; export type ReportsServiceSendReportsAction = ( request: ReportsServiceSendReportsRequest, ) => Promise; // Keyserver specific types type ReportInfo = { +id: string, +viewerID: string, +platformDetails: PlatformDetails, +creationTime: number, }; export const reportInfoValidator: TInterface = tShape({ id: t.String, viewerID: tUserID, platformDetails: tPlatformDetails, creationTime: t.Number, }); export type FetchErrorReportInfosRequest = { +cursor: ?string, }; export type FetchErrorReportInfosResponse = { +reports: $ReadOnlyArray, +userInfos: $ReadOnlyArray, }; export type ReduxToolsImport = { +preloadedState: AppState, +payload: $ReadOnlyArray, }; + +const tActionSummary = tShape({ + type: t.String, + time: t.Number, + summary: t.String, +}); +const tActionType = t.irreducible<$PropertyType>( + 'ActionType', + x => typeof x === 'string', +); + +export const threadInconsistencyReportValidatorShape: TStructProps = + { + platformDetails: tPlatformDetails, + beforeAction: t.Object, + action: t.Object, + pollResult: t.maybe(t.Object), + pushResult: t.Object, + lastActionTypes: t.maybe(t.list(tActionType)), + lastActions: t.maybe(t.list(tActionSummary)), + time: t.maybe(t.Number), + }; +export const entryInconsistencyReportValidatorShape: TStructProps = + { + platformDetails: tPlatformDetails, + beforeAction: t.Object, + action: t.Object, + calendarQuery: calendarQueryValidator, + pollResult: t.maybe(t.Object), + pushResult: t.Object, + lastActionTypes: t.maybe(t.list(tActionType)), + lastActions: t.maybe(t.list(tActionSummary)), + time: t.Number, + }; +export const userInconsistencyReportValidatorShape: TStructProps = + { + platformDetails: tPlatformDetails, + action: t.Object, + beforeStateCheck: t.Object, + afterStateCheck: t.Object, + lastActions: t.list(tActionSummary), + time: t.Number, + }; diff --git a/lib/types/request-types.js b/lib/types/request-types.js index a5c8acd73..571abd97f 100644 --- a/lib/types/request-types.js +++ b/lib/types/request-types.js @@ -1,298 +1,361 @@ // @flow import invariant from 'invariant'; import t, { type TUnion, type TInterface } from 'tcomb'; -import { type ActivityUpdate } from './activity-types.js'; +import { + type ActivityUpdate, + activityUpdateValidator, +} from './activity-types.js'; import type { SignedIdentityKeysBlob } from './crypto-types.js'; import { signedIdentityKeysBlobValidator } from './crypto-types.js'; import type { MessageSourceMetadata } from './db-ops-types.js'; import type { Platform, PlatformDetails } from './device-types.js'; import { type RawEntryInfo, type CalendarQuery, rawEntryInfoValidator, } from './entry-types.js'; import type { RawThreadInfo } from './minimally-encoded-thread-permissions-types'; -import type { - ThreadInconsistencyReportShape, - EntryInconsistencyReportShape, - ClientThreadInconsistencyReportShape, - ClientEntryInconsistencyReportShape, +import { + type ThreadInconsistencyReportShape, + type EntryInconsistencyReportShape, + type ClientThreadInconsistencyReportShape, + type ClientEntryInconsistencyReportShape, + threadInconsistencyReportValidatorShape, + entryInconsistencyReportValidatorShape, } from './report-types.js'; import type { LegacyRawThreadInfo } from './thread-types.js'; import { type CurrentUserInfo, currentUserInfoValidator, type AccountUserInfo, accountUserInfoValidator, } from './user-types.js'; import { mixedRawThreadInfoValidator } from '../permissions/minimally-encoded-raw-thread-info-validators.js'; -import { tNumber, tShape, tID, tUserID } from '../utils/validation-utils.js'; +import { + tNumber, + tShape, + tID, + tUserID, + tPlatform, + tPlatformDetails, +} from '../utils/validation-utils.js'; // "Server requests" are requests for information that the server delivers to // clients. Clients then respond to those requests with a "client response". export const serverRequestTypes = Object.freeze({ PLATFORM: 0, //DEVICE_TOKEN: 1, (DEPRECATED) THREAD_INCONSISTENCY: 2, PLATFORM_DETAILS: 3, //INITIAL_ACTIVITY_UPDATE: 4, (DEPRECATED) ENTRY_INCONSISTENCY: 5, CHECK_STATE: 6, INITIAL_ACTIVITY_UPDATES: 7, // MORE_ONE_TIME_KEYS: 8, (DEPRECATED) SIGNED_IDENTITY_KEYS_BLOB: 9, INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE: 10, }); type ServerRequestType = $Values; export function assertServerRequestType( serverRequestType: number, ): ServerRequestType { invariant( serverRequestType === 0 || serverRequestType === 2 || serverRequestType === 3 || serverRequestType === 5 || serverRequestType === 6 || serverRequestType === 7 || serverRequestType === 9 || serverRequestType === 10, 'number is not ServerRequestType enum', ); return serverRequestType; } type PlatformServerRequest = { +type: 0, }; const platformServerRequestValidator = tShape({ type: tNumber(serverRequestTypes.PLATFORM), }); type PlatformClientResponse = { +type: 0, +platform: Platform, }; +const platformClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.PLATFORM), + platform: tPlatform, + }); export type ThreadInconsistencyClientResponse = { ...ThreadInconsistencyReportShape, +type: 2, }; +const threadInconsistencyClientResponseValidator: TInterface = + tShape({ + ...threadInconsistencyReportValidatorShape, + type: tNumber(serverRequestTypes.THREAD_INCONSISTENCY), + }); type PlatformDetailsServerRequest = { type: 3, }; const platformDetailsServerRequestValidator = tShape({ type: tNumber(serverRequestTypes.PLATFORM_DETAILS), }); type PlatformDetailsClientResponse = { type: 3, platformDetails: PlatformDetails, }; +const platformDetailsClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.PLATFORM_DETAILS), + platformDetails: tPlatformDetails, + }); export type EntryInconsistencyClientResponse = { type: 5, ...EntryInconsistencyReportShape, }; +const entryInconsistencyClientResponseValidator: TInterface = + tShape({ + ...entryInconsistencyReportValidatorShape, + type: tNumber(serverRequestTypes.ENTRY_INCONSISTENCY), + }); type FailUnmentioned = Partial<{ +threadInfos: boolean, +entryInfos: boolean, +userInfos: boolean, }>; type StateChanges = Partial<{ +rawThreadInfos: LegacyRawThreadInfo[] | RawThreadInfo[], +rawEntryInfos: RawEntryInfo[], +currentUserInfo: CurrentUserInfo, +userInfos: AccountUserInfo[], +deleteThreadIDs: string[], +deleteEntryIDs: string[], +deleteUserInfoIDs: string[], }>; export type ServerCheckStateServerRequest = { +type: 6, +hashesToCheck: { +[key: string]: number }, +failUnmentioned?: FailUnmentioned, +stateChanges?: StateChanges, }; const serverCheckStateServerRequestValidator = tShape({ type: tNumber(serverRequestTypes.CHECK_STATE), hashesToCheck: t.dict(t.String, t.Number), failUnmentioned: t.maybe( tShape({ threadInfos: t.maybe(t.Boolean), entryInfos: t.maybe(t.Boolean), userInfos: t.maybe(t.Boolean), }), ), stateChanges: t.maybe( tShape({ rawThreadInfos: t.maybe(t.list(mixedRawThreadInfoValidator)), rawEntryInfos: t.maybe(t.list(rawEntryInfoValidator)), currentUserInfo: t.maybe(currentUserInfoValidator), userInfos: t.maybe(t.list(accountUserInfoValidator)), deleteThreadIDs: t.maybe(t.list(tID)), deleteEntryIDs: t.maybe(t.list(tID)), deleteUserInfoIDs: t.maybe(t.list(tUserID)), }), ), }); type CheckStateClientResponse = { +type: 6, +hashResults: { +[key: string]: boolean }, }; +const checkStateClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.CHECK_STATE), + hashResults: t.dict(t.String, t.Boolean), + }); type InitialActivityUpdatesClientResponse = { +type: 7, +activityUpdates: $ReadOnlyArray, }; +const initialActivityUpdatesClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.INITIAL_ACTIVITY_UPDATES), + activityUpdates: t.list(activityUpdateValidator), + }); type MoreOneTimeKeysClientResponse = { +type: 8, +keys: $ReadOnlyArray, }; type SignedIdentityKeysBlobServerRequest = { +type: 9, }; const signedIdentityKeysBlobServerRequestValidator = tShape({ type: tNumber(serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB), }); type SignedIdentityKeysBlobClientResponse = { +type: 9, +signedIdentityKeysBlob: SignedIdentityKeysBlob, }; +const signedIdentityKeysBlobClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB), + signedIdentityKeysBlob: signedIdentityKeysBlobValidator, + }); type InitialNotificationsEncryptedMessageServerRequest = { +type: 10, }; const initialNotificationsEncryptedMessageServerRequestValidator = tShape({ type: tNumber(serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE), }); type InitialNotificationsEncryptedMessageClientResponse = { +type: 10, +initialNotificationsEncryptedMessage: string, }; +const initialNotificationsEncryptedMessageClientResponseValidator: TInterface = + tShape({ + type: tNumber(serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE), + initialNotificationsEncryptedMessage: t.String, + }); export type ServerServerRequest = | PlatformServerRequest | PlatformDetailsServerRequest | ServerCheckStateServerRequest | SignedIdentityKeysBlobServerRequest | InitialNotificationsEncryptedMessageServerRequest; export const serverServerRequestValidator: TUnion = t.union([ platformServerRequestValidator, platformDetailsServerRequestValidator, serverCheckStateServerRequestValidator, signedIdentityKeysBlobServerRequestValidator, initialNotificationsEncryptedMessageServerRequestValidator, ]); export type ClientResponse = | PlatformClientResponse | ThreadInconsistencyClientResponse | PlatformDetailsClientResponse | EntryInconsistencyClientResponse | CheckStateClientResponse | InitialActivityUpdatesClientResponse | MoreOneTimeKeysClientResponse | SignedIdentityKeysBlobClientResponse | InitialNotificationsEncryptedMessageClientResponse; export type ClientCheckStateServerRequest = { +type: 6, +hashesToCheck: { +[key: string]: number }, +failUnmentioned?: Partial<{ +threadInfos: boolean, +entryInfos: boolean, +userInfos: boolean, }>, +stateChanges?: Partial<{ +rawThreadInfos: RawThreadInfo[], +rawEntryInfos: RawEntryInfo[], +currentUserInfo: CurrentUserInfo, +userInfos: AccountUserInfo[], +deleteThreadIDs: string[], +deleteEntryIDs: string[], +deleteUserInfoIDs: string[], }>, }; export type ClientServerRequest = | PlatformServerRequest | PlatformDetailsServerRequest | ClientCheckStateServerRequest | SignedIdentityKeysBlobServerRequest | InitialNotificationsEncryptedMessageServerRequest; // This is just the client variant of ClientResponse. The server needs to handle // multiple client versions so the type supports old versions of certain client // responses, but the client variant only need to support the latest version. type ClientThreadInconsistencyClientResponse = { ...ClientThreadInconsistencyReportShape, +type: 2, }; type ClientEntryInconsistencyClientResponse = { +type: 5, ...ClientEntryInconsistencyReportShape, }; export type ClientClientResponse = | PlatformClientResponse | ClientThreadInconsistencyClientResponse | PlatformDetailsClientResponse | ClientEntryInconsistencyClientResponse | CheckStateClientResponse | InitialActivityUpdatesClientResponse | MoreOneTimeKeysClientResponse | SignedIdentityKeysBlobClientResponse | InitialNotificationsEncryptedMessageClientResponse; export type ClientInconsistencyResponse = | ClientThreadInconsistencyClientResponse | ClientEntryInconsistencyClientResponse; export const processServerRequestsActionType = 'PROCESS_SERVER_REQUESTS'; export type ProcessServerRequestsPayload = { +serverRequests: $ReadOnlyArray, +calendarQuery: CalendarQuery, +keyserverID: string, }; export type ProcessServerRequestAction = { +messageSourceMetadata?: MessageSourceMetadata, +type: 'PROCESS_SERVER_REQUESTS', +payload: ProcessServerRequestsPayload, }; export type OlmSessionInitializationInfo = { +prekey: string, +prekeySignature: string, +oneTimeKey: ?string, }; export const olmSessionInitializationInfoValidator: TInterface = tShape({ prekey: t.String, prekeySignature: t.String, oneTimeKey: t.maybe(t.String), }); export type GetOlmSessionInitializationDataResponse = { +signedIdentityKeysBlob: SignedIdentityKeysBlob, +contentInitializationInfo: OlmSessionInitializationInfo, +notifInitializationInfo: OlmSessionInitializationInfo, }; export const getOlmSessionInitializationDataResponseValidator: TInterface = tShape({ signedIdentityKeysBlob: signedIdentityKeysBlobValidator, contentInitializationInfo: olmSessionInitializationInfoValidator, notifInitializationInfo: olmSessionInitializationInfoValidator, }); + +export const clientResponseInputValidator: TUnion = t.union([ + platformClientResponseValidator, + threadInconsistencyClientResponseValidator, + entryInconsistencyClientResponseValidator, + platformDetailsClientResponseValidator, + checkStateClientResponseValidator, + initialActivityUpdatesClientResponseValidator, + signedIdentityKeysBlobClientResponseValidator, + initialNotificationsEncryptedMessageClientResponseValidator, +]);