diff --git a/keyserver/src/socket/fetch-data.js b/keyserver/src/socket/fetch-data.js new file mode 100644 --- /dev/null +++ b/keyserver/src/socket/fetch-data.js @@ -0,0 +1,143 @@ +// @flow + +import invariant from 'invariant'; + +import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; +import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; +import type { RawEntryInfo } from 'lib/types/entry-types.js'; +import { defaultNumberPerThread } from 'lib/types/message-types.js'; +import type { SessionState } from 'lib/types/session-types.js'; +import { + type ServerStateSyncSocketPayload, + type ServerStateSyncFullSocketPayload, + stateSyncPayloadTypes, +} from 'lib/types/socket-types.js'; +import type { LegacyRawThreadInfos } from 'lib/types/thread-types.js'; +import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; +import { values } from 'lib/utils/objects.js'; +import { promiseAll } from 'lib/utils/promises.js'; + +import { initializeSession } from './session-utils.js'; +import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; +import { fetchMessageInfosSince } from '../fetchers/message-fetchers.js'; +import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; +import { verifyCalendarQueryThreadIDs } from '../responders/entry-responders.js'; +import { Viewer } from '../session/viewer.js'; +import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; +import { commitSessionUpdate } from '../updaters/session-updaters.js'; + +async function fetchDataForSocketInit( + viewer: Viewer, + sessionState: SessionState, +): Promise { + const { + calendarQuery, + updatesCurrentAsOf: oldUpdatesCurrentAsOf, + messagesCurrentAsOf: oldMessagesCurrentAsOf, + watchedIDs, + } = sessionState; + await verifyCalendarQueryThreadIDs(calendarQuery); + + const sessionInitializationResult = await initializeSession( + viewer, + calendarQuery, + oldUpdatesCurrentAsOf, + ); + + const threadCursors: { [string]: null } = {}; + for (const watchedThreadID of watchedIDs) { + threadCursors[watchedThreadID] = null; + } + const messageSelectionCriteria = { + threadCursors, + joinedThreads: true, + newerThan: oldMessagesCurrentAsOf, + }; + const fetchMessagesResult = await fetchMessageInfosSince( + viewer, + messageSelectionCriteria, + defaultNumberPerThread, + ); + const messagesResult = { + rawMessageInfos: fetchMessagesResult.rawMessageInfos, + truncationStatuses: fetchMessagesResult.truncationStatuses, + currentAsOf: mostRecentMessageTimestamp( + fetchMessagesResult.rawMessageInfos, + oldMessagesCurrentAsOf, + ), + }; + + if (!sessionInitializationResult.sessionContinued) { + const promises: { +[string]: Promise } = Object.fromEntries( + values(serverStateSyncSpecs).map(spec => [ + spec.hashKey, + spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), + ]), + ); + // We have a type error here because Flow doesn't know spec.hashKey + const castPromises: { + +threadInfos: Promise, + +currentUserInfo: Promise, + +entryInfos: Promise<$ReadOnlyArray>, + +userInfos: Promise<$ReadOnlyArray>, + } = (promises: any); + const results = await promiseAll(castPromises); + const payload: ServerStateSyncFullSocketPayload = { + type: stateSyncPayloadTypes.FULL, + messagesResult, + threadInfos: results.threadInfos, + currentUserInfo: results.currentUserInfo, + rawEntryInfos: results.entryInfos, + userInfos: results.userInfos, + updatesCurrentAsOf: oldUpdatesCurrentAsOf, + }; + if (viewer.sessionChanged) { + // If initializeSession encounters sessionIdentifierTypes.BODY_SESSION_ID + // but the session is unspecified or expired, it will set a new sessionID + // and specify viewer.sessionChanged + const { sessionID } = viewer; + invariant(sessionID !== null && sessionID !== undefined, 'should be set'); + payload.sessionID = sessionID; + viewer.sessionChanged = false; + } + return payload; + } else { + const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; + + const deleteExpiredUpdatesPromise = deleteUpdatesBeforeTimeTargetingSession( + viewer, + oldUpdatesCurrentAsOf, + ); + const fetchUpdateResultPromise = fetchUpdateInfos( + viewer, + oldUpdatesCurrentAsOf, + calendarQuery, + ); + const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); + const [fetchUpdateResult] = await Promise.all([ + fetchUpdateResultPromise, + deleteExpiredUpdatesPromise, + sessionUpdatePromise, + ]); + + const { updateInfos, userInfos } = fetchUpdateResult; + const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( + [...updateInfos], + oldUpdatesCurrentAsOf, + ); + const updatesResult = { + newUpdates: updateInfos, + currentAsOf: newUpdatesCurrentAsOf, + }; + return { + type: stateSyncPayloadTypes.INCREMENTAL, + messagesResult, + updatesResult, + deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, + deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, + userInfos: values(userInfos), + }; + } +} + +export { fetchDataForSocketInit }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -17,8 +17,6 @@ import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; -import type { RawEntryInfo } from 'lib/types/entry-types.js'; -import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes, @@ -33,7 +31,6 @@ type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, - type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, @@ -41,36 +38,25 @@ type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, - stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; -import type { LegacyRawThreadInfos } from 'lib/types/thread-types.js'; -import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; -import { promiseAll, ignorePromiseRejections } from 'lib/utils/promises.js'; +import { ignorePromiseRejections } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; +import { fetchDataForSocketInit } from './fetch-data.js'; import { RedisSubscriber } from './redis.js'; -import { - processClientResponses, - initializeSession, - checkState, -} from './session-utils.js'; +import { processClientResponses, checkState } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; -import { - fetchMessageInfosSince, - getMessageFetchResultFromRedisMessages, -} from '../fetchers/message-fetchers.js'; -import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; -import { verifyCalendarQueryThreadIDs } from '../responders/entry-responders.js'; +import { getMessageFetchResultFromRedisMessages } from '../fetchers/message-fetchers.js'; import { fetchViewerForSocket, updateCookie, @@ -80,7 +66,6 @@ } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import type { AnonymousViewerData } from '../session/viewer.js'; -import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; @@ -460,142 +445,23 @@ const responses: Array = []; const { sessionState, clientResponses } = message.payload; - const { - calendarQuery, - updatesCurrentAsOf: oldUpdatesCurrentAsOf, - messagesCurrentAsOf: oldMessagesCurrentAsOf, - watchedIDs, - } = sessionState; - await verifyCalendarQueryThreadIDs(calendarQuery); - - const sessionInitializationResult = await initializeSession( - viewer, - calendarQuery, - oldUpdatesCurrentAsOf, - ); - - const threadCursors: { [string]: null } = {}; - for (const watchedThreadID of watchedIDs) { - threadCursors[watchedThreadID] = null; - } - const messageSelectionCriteria = { - threadCursors, - joinedThreads: true, - newerThan: oldMessagesCurrentAsOf, - }; - const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = - await Promise.all([ - fetchMessageInfosSince( - viewer, - messageSelectionCriteria, - defaultNumberPerThread, - ), - processClientResponses(viewer, clientResponses), - ]); - const messagesResult = { - rawMessageInfos: fetchMessagesResult.rawMessageInfos, - truncationStatuses: fetchMessagesResult.truncationStatuses, - currentAsOf: mostRecentMessageTimestamp( - fetchMessagesResult.rawMessageInfos, - oldMessagesCurrentAsOf, - ), - }; - - const isCookieMissingSignedIdentityKeysBlobPromise = - isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); - const isCookieMissingOlmNotificationsSessionPromise = - isCookieMissingOlmNotificationsSession(viewer); - - if (!sessionInitializationResult.sessionContinued) { - const promises: { +[string]: Promise } = Object.fromEntries( - values(serverStateSyncSpecs).map(spec => [ - spec.hashKey, - spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), - ]), - ); - // We have a type error here because Flow doesn't know spec.hashKey - const castPromises: { - +threadInfos: Promise, - +currentUserInfo: Promise, - +entryInfos: Promise<$ReadOnlyArray>, - +userInfos: Promise<$ReadOnlyArray>, - } = (promises: any); - const results = await promiseAll(castPromises); - const payload: ServerStateSyncFullSocketPayload = { - type: stateSyncPayloadTypes.FULL, - messagesResult, - threadInfos: results.threadInfos, - currentUserInfo: results.currentUserInfo, - rawEntryInfos: results.entryInfos, - userInfos: results.userInfos, - updatesCurrentAsOf: oldUpdatesCurrentAsOf, - }; - if (viewer.sessionChanged) { - // If initializeSession encounters, - // sessionIdentifierTypes.BODY_SESSION_ID but the session - // is unspecified or expired, - // it will set a new sessionID and specify viewer.sessionChanged - const { sessionID } = viewer; - invariant( - sessionID !== null && sessionID !== undefined, - 'should be set', - ); - payload.sessionID = sessionID; - viewer.sessionChanged = false; - } - responses.push({ - type: serverSocketMessageTypes.STATE_SYNC, - responseTo: message.id, - payload, - }); - } else { - const { sessionUpdate, deltaEntryInfoResult } = - sessionInitializationResult; - - const deleteExpiredUpdatesPromise = - deleteUpdatesBeforeTimeTargetingSession(viewer, oldUpdatesCurrentAsOf); - const fetchUpdateResultPromise = fetchUpdateInfos( - viewer, - oldUpdatesCurrentAsOf, - calendarQuery, - ); - const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); - const [fetchUpdateResult] = await Promise.all([ - fetchUpdateResultPromise, - deleteExpiredUpdatesPromise, - sessionUpdatePromise, - ]); - - const { updateInfos, userInfos } = fetchUpdateResult; - const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( - [...updateInfos], - oldUpdatesCurrentAsOf, - ); - const updatesResult = { - newUpdates: updateInfos, - currentAsOf: newUpdatesCurrentAsOf, - }; - - responses.push({ - type: serverSocketMessageTypes.STATE_SYNC, - responseTo: message.id, - payload: { - type: stateSyncPayloadTypes.INCREMENTAL, - messagesResult, - updatesResult, - deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, - deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, - userInfos: values(userInfos), - }, - }); - } + const payload = await fetchDataForSocketInit(viewer, sessionState); + responses.push({ + type: serverSocketMessageTypes.STATE_SYNC, + responseTo: message.id, + payload, + }); - const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = - await Promise.all([ - isCookieMissingSignedIdentityKeysBlobPromise, - isCookieMissingOlmNotificationsSessionPromise, - ]); + const [ + { serverRequests, activityUpdateResult }, + signedIdentityKeysBlobMissing, + olmNotificationsSessionMissing, + ] = await Promise.all([ + processClientResponses(viewer, clientResponses), + isCookieMissingSignedIdentityKeysBlob(viewer.cookieID), + isCookieMissingOlmNotificationsSession(viewer), + ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({