diff --git a/keyserver/src/shared/state-sync/entries-state-sync-spec.js b/keyserver/src/shared/state-sync/entries-state-sync-spec.js index 8de8ffcc3..47f4e1b41 100644 --- a/keyserver/src/shared/state-sync/entries-state-sync-spec.js +++ b/keyserver/src/shared/state-sync/entries-state-sync-spec.js @@ -1,29 +1,33 @@ // @flow import { serverEntryInfosObject } from 'lib/shared/entry-utils.js'; import type { CalendarQuery, RawEntryInfos } from 'lib/types/entry-types.js'; import type { StateSyncSpec } from './state-sync-spec.js'; import { fetchEntryInfos, fetchEntryInfosByID, } from '../../fetchers/entry-fetchers.js'; import type { Viewer } from '../../session/viewer.js'; export const entriesStateSyncSpec: StateSyncSpec = Object.freeze( { async fetch( viewer: Viewer, query: $ReadOnlyArray, ids?: $ReadOnlySet, ) { if (ids) { return fetchEntryInfosByID(viewer, ids); } const entriesResult = await fetchEntryInfos(viewer, query); return serverEntryInfosObject(entriesResult.rawEntryInfos); }, hashKey: 'entryInfos', - innerHashKey: 'entryInfo', + innerHashSpec: { + hashKey: 'entryInfo', + deleteKey: 'deleteEntryIDs', + rawInfosKey: 'rawEntryInfos', + }, }, ); diff --git a/keyserver/src/shared/state-sync/state-sync-spec.js b/keyserver/src/shared/state-sync/state-sync-spec.js index 1289b117b..8879bf861 100644 --- a/keyserver/src/shared/state-sync/state-sync-spec.js +++ b/keyserver/src/shared/state-sync/state-sync-spec.js @@ -1,15 +1,20 @@ // @flow import type { CalendarQuery } from 'lib/types/entry-types.js'; import type { Viewer } from '../../session/viewer.js'; -export type StateSyncSpec = { +export type StateSyncSpec = { +fetch: ( viewer: Viewer, calendarQuery: $ReadOnlyArray, ids?: $ReadOnlySet, ) => Promise, +hashKey: string, - +innerHashKey?: string, + +innerHashSpec?: { + +hashKey: string, + +deleteKey: string, + +rawInfosKey: string, + +additionalDeleteCondition?: Info => boolean, + }, }; diff --git a/keyserver/src/shared/state-sync/threads-state-sync-spec.js b/keyserver/src/shared/state-sync/threads-state-sync-spec.js index af3441155..87fcf06fa 100644 --- a/keyserver/src/shared/state-sync/threads-state-sync-spec.js +++ b/keyserver/src/shared/state-sync/threads-state-sync-spec.js @@ -1,25 +1,29 @@ // @flow import type { CalendarQuery } from 'lib/types/entry-types.js'; import type { StateSyncSpec } from './state-sync-spec.js'; import { fetchThreadInfos, type RawThreadInfos, } from '../../fetchers/thread-fetchers.js'; import type { Viewer } from '../../session/viewer.js'; export const threadsStateSyncSpec: StateSyncSpec = Object.freeze({ async fetch( viewer: Viewer, query: $ReadOnlyArray, ids?: $ReadOnlySet, ) { const filter = ids ? { threadIDs: ids } : undefined; const result = await fetchThreadInfos(viewer, filter); return result.threadInfos; }, hashKey: 'threadInfos', - innerHashKey: 'threadInfo', + innerHashSpec: { + hashKey: 'threadInfo', + deleteKey: 'deleteThreadIDs', + rawInfosKey: 'rawThreadInfos', + }, }); diff --git a/keyserver/src/shared/state-sync/users-state-sync-spec.js b/keyserver/src/shared/state-sync/users-state-sync-spec.js index b9b2c1557..b64975532 100644 --- a/keyserver/src/shared/state-sync/users-state-sync-spec.js +++ b/keyserver/src/shared/state-sync/users-state-sync-spec.js @@ -1,24 +1,32 @@ // @flow import type { CalendarQuery } from 'lib/types/entry-types.js'; -import type { UserInfos } from 'lib/types/user-types.js'; +import type { UserInfos, UserInfo } from 'lib/types/user-types.js'; import type { StateSyncSpec } from './state-sync-spec.js'; import { fetchKnownUserInfos } from '../../fetchers/user-fetchers.js'; import type { Viewer } from '../../session/viewer.js'; -export const usersStateSyncSpec: StateSyncSpec = Object.freeze({ - fetch( - viewer: Viewer, - query: $ReadOnlyArray, - ids?: $ReadOnlySet, - ) { - if (ids) { - return fetchKnownUserInfos(viewer, [...ids]); - } +export const usersStateSyncSpec: StateSyncSpec = + Object.freeze({ + fetch( + viewer: Viewer, + query: $ReadOnlyArray, + ids?: $ReadOnlySet, + ) { + if (ids) { + return fetchKnownUserInfos(viewer, [...ids]); + } - return fetchKnownUserInfos(viewer); - }, - hashKey: 'userInfos', - innerHashKey: 'userInfo', -}); + return fetchKnownUserInfos(viewer); + }, + hashKey: 'userInfos', + innerHashSpec: { + hashKey: 'userInfo', + deleteKey: 'deleteUserInfoIDs', + rawInfosKey: 'userInfos', + additionalDeleteCondition(user: UserInfo) { + return !user.username; + }, + }, + }); diff --git a/keyserver/src/socket/session-utils.js b/keyserver/src/socket/session-utils.js index 0d7fba378..120c29b35 100644 --- a/keyserver/src/socket/session-utils.js +++ b/keyserver/src/socket/session-utils.js @@ -1,528 +1,504 @@ // @flow import invariant from 'invariant'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import type { UpdateActivityResult } from 'lib/types/activity-types.js'; import type { IdentityKeysBlob } from 'lib/types/crypto-types.js'; import { isDeviceType } from 'lib/types/device-types.js'; import type { CalendarQuery, DeltaEntryInfosResponse, } from 'lib/types/entry-types.js'; import { reportTypes, type ThreadInconsistencyReportCreationRequest, type EntryInconsistencyReportCreationRequest, } from 'lib/types/report-types.js'; import { serverRequestTypes, type ThreadInconsistencyClientResponse, type EntryInconsistencyClientResponse, type ClientResponse, type ServerServerRequest, type ServerCheckStateServerRequest, } from 'lib/types/request-types.js'; import { sessionCheckFrequency } from 'lib/types/session-types.js'; import { signedIdentityKeysBlobValidator } from 'lib/utils/crypto-utils.js'; import { hash, values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { tShape, tPlatform, tPlatformDetails, } from 'lib/utils/validation-utils.js'; import { createOlmSession } from '../creators/olm-session-creator.js'; import { saveOneTimeKeys } from '../creators/one-time-keys-creator.js'; import createReport from '../creators/report-creator.js'; import { fetchEntriesForSession } from '../fetchers/entry-fetchers.js'; import { checkIfSessionHasEnoughOneTimeKeys } from '../fetchers/key-fetchers.js'; import { activityUpdatesInputValidator } from '../responders/activity-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { threadInconsistencyReportValidatorShape, entryInconsistencyReportValidatorShape, } from '../responders/report-responders.js'; import { setNewSession, setCookiePlatform, setCookiePlatformDetails, setCookieSignedIdentityKeysBlob, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { activityUpdater } from '../updaters/activity-updaters.js'; import { compareNewCalendarQuery } from '../updaters/entry-updaters.js'; import type { SessionUpdate } from '../updaters/session-updaters.js'; import { getOlmUtility } from '../utils/olm-utils.js'; const clientResponseInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'serverRequestTypes.PLATFORM', x => x === serverRequestTypes.PLATFORM, ), platform: tPlatform, }), tShape({ ...threadInconsistencyReportValidatorShape, type: t.irreducible( 'serverRequestTypes.THREAD_INCONSISTENCY', x => x === serverRequestTypes.THREAD_INCONSISTENCY, ), }), tShape({ ...entryInconsistencyReportValidatorShape, type: t.irreducible( 'serverRequestTypes.ENTRY_INCONSISTENCY', x => x === serverRequestTypes.ENTRY_INCONSISTENCY, ), }), tShape({ type: t.irreducible( 'serverRequestTypes.PLATFORM_DETAILS', x => x === serverRequestTypes.PLATFORM_DETAILS, ), platformDetails: tPlatformDetails, }), tShape({ type: t.irreducible( 'serverRequestTypes.CHECK_STATE', x => x === serverRequestTypes.CHECK_STATE, ), hashResults: t.dict(t.String, t.Boolean), }), tShape({ type: t.irreducible( 'serverRequestTypes.INITIAL_ACTIVITY_UPDATES', x => x === serverRequestTypes.INITIAL_ACTIVITY_UPDATES, ), activityUpdates: activityUpdatesInputValidator, }), tShape({ type: t.irreducible( 'serverRequestTypes.MORE_ONE_TIME_KEYS', x => x === serverRequestTypes.MORE_ONE_TIME_KEYS, ), keys: t.list(t.String), }), tShape({ type: t.irreducible( 'serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB', x => x === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, ), signedIdentityKeysBlob: signedIdentityKeysBlobValidator, }), tShape({ type: t.irreducible( 'serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE', x => x === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, ), initialNotificationsEncryptedMessage: t.String, }), ]); type StateCheckStatus = | { status: 'state_validated' } | { status: 'state_invalid', invalidKeys: $ReadOnlyArray } | { status: 'state_check' }; type ProcessClientResponsesResult = { serverRequests: ServerServerRequest[], stateCheckStatus: ?StateCheckStatus, activityUpdateResult: ?UpdateActivityResult, }; async function processClientResponses( viewer: Viewer, clientResponses: $ReadOnlyArray, ): Promise { let viewerMissingPlatform = !viewer.platform; const { platformDetails } = viewer; let viewerMissingPlatformDetails = !platformDetails || (isDeviceType(viewer.platform) && (platformDetails.codeVersion === null || platformDetails.codeVersion === undefined || platformDetails.stateVersion === null || platformDetails.stateVersion === undefined)); const promises = []; let activityUpdates = []; let stateCheckStatus = null; const clientSentPlatformDetails = clientResponses.some( response => response.type === serverRequestTypes.PLATFORM_DETAILS, ); for (const clientResponse of clientResponses) { if ( clientResponse.type === serverRequestTypes.PLATFORM && !clientSentPlatformDetails ) { promises.push(setCookiePlatform(viewer, clientResponse.platform)); viewerMissingPlatform = false; if (!isDeviceType(clientResponse.platform)) { viewerMissingPlatformDetails = false; } } else if ( clientResponse.type === serverRequestTypes.THREAD_INCONSISTENCY ) { promises.push(recordThreadInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.ENTRY_INCONSISTENCY) { promises.push(recordEntryInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.PLATFORM_DETAILS) { promises.push( setCookiePlatformDetails(viewer, clientResponse.platformDetails), ); viewerMissingPlatform = false; viewerMissingPlatformDetails = false; } else if ( clientResponse.type === serverRequestTypes.INITIAL_ACTIVITY_UPDATES ) { activityUpdates = [...activityUpdates, ...clientResponse.activityUpdates]; } else if (clientResponse.type === serverRequestTypes.CHECK_STATE) { const invalidKeys = []; for (const key in clientResponse.hashResults) { const result = clientResponse.hashResults[key]; if (!result) { invalidKeys.push(key); } } stateCheckStatus = invalidKeys.length > 0 ? { status: 'state_invalid', invalidKeys } : { status: 'state_validated' }; } else if (clientResponse.type === serverRequestTypes.MORE_ONE_TIME_KEYS) { invariant(clientResponse.keys, 'keys expected in client response'); handleAsyncPromise(saveOneTimeKeys(viewer, clientResponse.keys)); } else if ( clientResponse.type === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB ) { invariant( clientResponse.signedIdentityKeysBlob, 'signedIdentityKeysBlob expected in client response', ); const { signedIdentityKeysBlob } = clientResponse; const identityKeys: IdentityKeysBlob = JSON.parse( signedIdentityKeysBlob.payload, ); const olmUtil = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); handleAsyncPromise( setCookieSignedIdentityKeysBlob( viewer.cookieID, signedIdentityKeysBlob, ), ); } catch (e) { continue; } } else if ( clientResponse.type === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE ) { invariant( t.String.is(clientResponse.initialNotificationsEncryptedMessage), 'initialNotificationsEncryptedMessage expected in client response', ); const { initialNotificationsEncryptedMessage } = clientResponse; try { await createOlmSession( initialNotificationsEncryptedMessage, 'notifications', viewer.cookieID, ); } catch (e) { continue; } } } const activityUpdatePromise = (async () => { if (activityUpdates.length === 0) { return undefined; } return await activityUpdater(viewer, { updates: activityUpdates }); })(); const serverRequests = []; const checkOneTimeKeysPromise = (async () => { if (!viewer.loggedIn) { return; } const enoughOneTimeKeys = await checkIfSessionHasEnoughOneTimeKeys( viewer.session, ); if (!enoughOneTimeKeys) { serverRequests.push({ type: serverRequestTypes.MORE_ONE_TIME_KEYS }); } })(); const { activityUpdateResult } = await promiseAll({ all: Promise.all(promises), activityUpdateResult: activityUpdatePromise, checkOneTimeKeysPromise, }); if ( !stateCheckStatus && viewer.loggedIn && viewer.sessionLastValidated + sessionCheckFrequency < Date.now() ) { stateCheckStatus = { status: 'state_check' }; } if (viewerMissingPlatform) { serverRequests.push({ type: serverRequestTypes.PLATFORM }); } if (viewerMissingPlatformDetails) { serverRequests.push({ type: serverRequestTypes.PLATFORM_DETAILS }); } return { serverRequests, stateCheckStatus, activityUpdateResult }; } async function recordThreadInconsistency( viewer: Viewer, response: ThreadInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.THREAD_INCONSISTENCY, }: ThreadInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } async function recordEntryInconsistency( viewer: Viewer, response: EntryInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.ENTRY_INCONSISTENCY, }: EntryInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } type SessionInitializationResult = | { sessionContinued: false } | { sessionContinued: true, deltaEntryInfoResult: DeltaEntryInfosResponse, sessionUpdate: SessionUpdate, }; async function initializeSession( viewer: Viewer, calendarQuery: CalendarQuery, oldLastUpdate: number, ): Promise { if (!viewer.loggedIn) { return { sessionContinued: false }; } if (!viewer.hasSessionInfo) { // If the viewer has no session info but is logged in, that is indicative // of an expired / invalidated session and we should generate a new one await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } if (oldLastUpdate < viewer.sessionLastUpdated) { // If the client has an older last_update than the server is tracking for // that client, then the client either had some issue persisting its store, // or the user restored the client app from a backup. Either way, we should // invalidate the existing session, since the server has assumed that the // checkpoint is further along than it is on the client, and might not still // have all of the updates necessary to do an incremental update await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } let comparisonResult = null; try { comparisonResult = compareNewCalendarQuery(viewer, calendarQuery); } catch (e) { if (e.message !== 'unknown_error') { throw e; } } if (comparisonResult) { const { difference, oldCalendarQuery } = comparisonResult; const sessionUpdate = { ...comparisonResult.sessionUpdate, lastUpdate: oldLastUpdate, }; const deltaEntryInfoResult = await fetchEntriesForSession( viewer, difference, oldCalendarQuery, ); return { sessionContinued: true, deltaEntryInfoResult, sessionUpdate }; } else { await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } } type StateCheckResult = { sessionUpdate?: SessionUpdate, checkStateRequest?: ServerCheckStateServerRequest, }; async function checkState( viewer: Viewer, status: StateCheckStatus, calendarQuery: CalendarQuery, ): Promise { const query = [calendarQuery]; if (status.status === 'state_validated') { return { sessionUpdate: { lastValidated: Date.now() } }; } else if (status.status === 'state_check') { const promises = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, (async () => { const data = await spec.fetch(viewer, query); return hash(data); })(), ]), ); const hashesToCheck = await promiseAll(promises); const checkStateRequest = { type: serverRequestTypes.CHECK_STATE, hashesToCheck, }; return { checkStateRequest }; } const invalidKeys = new Set(status.invalidKeys); const shouldFetchAll = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, invalidKeys.has(spec.hashKey), ]), ); const idsToFetch = Object.fromEntries( values(serverStateSyncSpecs) - .filter(spec => spec.innerHashKey) - .map(spec => [spec.innerHashKey, new Set()]), + .filter(spec => spec.innerHashSpec?.hashKey) + .map(spec => [spec.innerHashSpec?.hashKey, new Set()]), ); for (const key of invalidKeys) { const [innerHashKey, id] = key.split('|'); if (innerHashKey && id) { idsToFetch[innerHashKey]?.add(id); } } const fetchPromises = {}; for (const spec of values(serverStateSyncSpecs)) { if (shouldFetchAll[spec.hashKey]) { fetchPromises[spec.hashKey] = spec.fetch(viewer, query); - } else if (idsToFetch[spec.innerHashKey]?.size > 0) { + } else if (idsToFetch[spec.innerHashSpec?.hashKey]?.size > 0) { fetchPromises[spec.hashKey] = spec.fetch( viewer, query, - idsToFetch[spec.innerHashKey], + idsToFetch[spec.innerHashSpec?.hashKey], ); } } const fetchedData = await promiseAll(fetchPromises); const specPerHashKey = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [spec.hashKey, spec]), ); + const specPerInnerHashKey = Object.fromEntries( + values(serverStateSyncSpecs) + .filter(spec => spec.innerHashSpec?.hashKey) + .map(spec => [spec.innerHashSpec?.hashKey, spec]), + ); const hashesToCheck = {}, failUnmentioned = {}, stateChanges = {}; for (const key of invalidKeys) { const spec = specPerHashKey[key]; - const innerHashKey = spec?.innerHashKey; + const innerHashKey = spec?.innerHashSpec?.hashKey; const isTopLevelKey = !!spec; if (isTopLevelKey && innerHashKey) { // Instead of returning all the infos, we want to narrow down and figure // out which infos don't match first const infos = fetchedData[key]; for (const infoID in infos) { hashesToCheck[`${innerHashKey}|${infoID}`] = hash(infos[infoID]); } failUnmentioned[key] = true; } else if (isTopLevelKey) { stateChanges[key] = fetchedData[key]; - } else if (key.startsWith('threadInfo|')) { - const [, threadID] = key.split('|'); - const threadInfos = fetchedData[serverStateSyncSpecs.threads.hashKey]; - const threadInfo = threadInfos[threadID]; - if (!threadInfo) { - if (!stateChanges.deleteThreadIDs) { - stateChanges.deleteThreadIDs = []; - } - stateChanges.deleteThreadIDs.push(threadID); + } else { + const [keyPrefix, id] = key.split('|'); + const innerSpec = specPerInnerHashKey[keyPrefix]; + const innerHashSpec = innerSpec?.innerHashSpec; + if (!innerHashSpec || !id) { continue; } - if (!stateChanges.rawThreadInfos) { - stateChanges.rawThreadInfos = []; - } - stateChanges.rawThreadInfos.push(threadInfo); - } else if (key.startsWith('entryInfo|')) { - const [, entryID] = key.split('|'); - const entryInfos = fetchedData[serverStateSyncSpecs.entries.hashKey]; - const entryInfo = entryInfos[entryID]; - if (!entryInfo) { - if (!stateChanges.deleteEntryIDs) { - stateChanges.deleteEntryIDs = []; + const infos = fetchedData[innerSpec.hashKey]; + const info = infos[id]; + if (!info || innerHashSpec.additionalDeleteCondition?.(info)) { + if (!stateChanges[innerHashSpec.deleteKey]) { + stateChanges[innerHashSpec.deleteKey] = []; } - stateChanges.deleteEntryIDs.push(entryID); + stateChanges[innerHashSpec.deleteKey].push(id); continue; } - if (!stateChanges.rawEntryInfos) { - stateChanges.rawEntryInfos = []; - } - stateChanges.rawEntryInfos.push(entryInfo); - } else if (key.startsWith('userInfo|')) { - const [, userID] = key.split('|'); - const userInfos = fetchedData[serverStateSyncSpecs.users.hashKey]; - const userInfo = userInfos[userID]; - if (!userInfo || !userInfo.username) { - if (!stateChanges.deleteUserInfoIDs) { - stateChanges.deleteUserInfoIDs = []; - } - stateChanges.deleteUserInfoIDs.push(userID); - } else { - if (!stateChanges.userInfos) { - stateChanges.userInfos = []; - } - stateChanges.userInfos.push({ - ...userInfo, - // Flow gets confused if we don't do this - username: userInfo.username, - }); + if (!stateChanges[innerHashSpec.rawInfosKey]) { + stateChanges[innerHashSpec.rawInfosKey] = []; } + stateChanges[innerHashSpec.rawInfosKey].push(info); } } const checkStateRequest = { type: serverRequestTypes.CHECK_STATE, hashesToCheck, failUnmentioned, stateChanges, }; if (Object.keys(hashesToCheck).length === 0) { return { checkStateRequest, sessionUpdate: { lastValidated: Date.now() } }; } else { return { checkStateRequest }; } } export { clientResponseInputValidator, processClientResponses, initializeSession, checkState, };