diff --git a/keyserver/src/socket/fetch-data.js b/keyserver/src/socket/fetch-data.js index 8bb4e0a17..8d6df9f73 100644 --- a/keyserver/src/socket/fetch-data.js +++ b/keyserver/src/socket/fetch-data.js @@ -1,143 +1,143 @@ // @flow import invariant from 'invariant'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import type { RawEntryInfo } from 'lib/types/entry-types.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import type { SessionState } from 'lib/types/session-types.js'; import { type ServerStateSyncSocketPayload, type ServerStateSyncFullSocketPayload, stateSyncPayloadTypes, } from 'lib/types/socket-types.js'; import type { LegacyRawThreadInfos } from 'lib/types/thread-types.js'; import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { initializeSession } from './session-utils.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { fetchMessageInfosSince } from '../fetchers/message-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { verifyCalendarQueryThreadIDs } from '../responders/entry-responders.js'; import { Viewer } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; async function fetchDataForSocketInit( viewer: Viewer, sessionState: SessionState, ): Promise { const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors: { [string]: null } = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const fetchMessagesResult = await fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; if (!sessionInitializationResult.sessionContinued) { const promises: { +[string]: Promise } = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), ]), ); // We have a type error here because Flow doesn't know spec.hashKey const castPromises: { +threadInfos: Promise, +currentUserInfo: Promise, +entryInfos: Promise<$ReadOnlyArray>, +userInfos: Promise<$ReadOnlyArray>, } = (promises: any); const results = await promiseAll(castPromises); - const payload: ServerStateSyncFullSocketPayload = { + let payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: results.threadInfos, currentUserInfo: results.currentUserInfo, rawEntryInfos: results.entryInfos, userInfos: results.userInfos, updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters sessionIdentifierTypes.BODY_SESSION_ID // but the session is unspecified or expired, it will set a new sessionID // and specify viewer.sessionChanged const { sessionID } = viewer; invariant(sessionID !== null && sessionID !== undefined, 'should be set'); - payload.sessionID = sessionID; + payload = { ...payload, sessionID }; viewer.sessionChanged = false; } return payload; } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; const deleteExpiredUpdatesPromise = deleteUpdatesBeforeTimeTargetingSession( viewer, oldUpdatesCurrentAsOf, ); const fetchUpdateResultPromise = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); const [fetchUpdateResult] = await Promise.all([ fetchUpdateResultPromise, deleteExpiredUpdatesPromise, sessionUpdatePromise, ]); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; return { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }; } } export { fetchDataForSocketInit }; diff --git a/lib/types/socket-types.js b/lib/types/socket-types.js index 3c5975f81..faa36e12b 100644 --- a/lib/types/socket-types.js +++ b/lib/types/socket-types.js @@ -1,564 +1,564 @@ // @flow import invariant from 'invariant'; import t, { type TInterface, type TUnion } from 'tcomb'; import { type RecoveryFromReduxActionSource, recoveryFromReduxActionSources, } from './account-types.js'; import { type ActivityUpdate, activityUpdateValidator, type UpdateActivityResult, updateActivityResultValidator, } from './activity-types.js'; import { type CompressedData, compressedDataValidator, } from './compression-types.js'; import type { APIRequest } from './endpoints.js'; import { type RawEntryInfo, rawEntryInfoValidator, type CalendarQuery, } from './entry-types.js'; import { type MessagesResponse, messagesResponseValidator, type NewMessagesPayload, newMessagesPayloadValidator, } from './message-types.js'; import { type ServerServerRequest, serverServerRequestValidator, type ClientServerRequest, type ClientResponse, type ClientClientResponse, } from './request-types.js'; import type { SessionState, SessionIdentification } from './session-types.js'; import type { MixedRawThreadInfos, RawThreadInfos } from './thread-types.js'; import { type ClientUpdatesResult, type ClientUpdatesResultWithUserInfos, type ServerUpdatesResult, serverUpdatesResultValidator, type ServerUpdatesResultWithUserInfos, serverUpdatesResultWithUserInfosValidator, } from './update-types.js'; import { type UserInfo, userInfoValidator, type CurrentUserInfo, currentUserInfoValidator, type LoggedOutUserInfo, loggedOutUserInfoValidator, } from './user-types.js'; import { mixedRawThreadInfoValidator } from '../permissions/minimally-encoded-raw-thread-info-validators.js'; import { values } from '../utils/objects.js'; import { tShape, tNumber, tID } from '../utils/validation-utils.js'; // The types of messages that the client sends across the socket export const clientSocketMessageTypes = Object.freeze({ INITIAL: 0, RESPONSES: 1, //ACTIVITY_UPDATES: 2, (DEPRECATED) PING: 3, ACK_UPDATES: 4, API_REQUEST: 5, }); export type ClientSocketMessageType = $Values; export function assertClientSocketMessageType( ourClientSocketMessageType: number, ): ClientSocketMessageType { invariant( ourClientSocketMessageType === 0 || ourClientSocketMessageType === 1 || ourClientSocketMessageType === 3 || ourClientSocketMessageType === 4 || ourClientSocketMessageType === 5, 'number is not ClientSocketMessageType enum', ); return ourClientSocketMessageType; } export type InitialClientSocketMessage = { +type: 0, +id: number, +payload: { +sessionIdentification: SessionIdentification, +sessionState: SessionState, +clientResponses: $ReadOnlyArray, }, }; export type ResponsesClientSocketMessage = { +type: 1, +id: number, +payload: { +clientResponses: $ReadOnlyArray, }, }; export type PingClientSocketMessage = { +type: 3, +id: number, }; export type AckUpdatesClientSocketMessage = { +type: 4, +id: number, +payload: { +currentAsOf: number, }, }; export type APIRequestClientSocketMessage = { +type: 5, +id: number, +payload: APIRequest, }; export type ClientSocketMessage = | InitialClientSocketMessage | ResponsesClientSocketMessage | PingClientSocketMessage | AckUpdatesClientSocketMessage | APIRequestClientSocketMessage; export type ClientInitialClientSocketMessage = { +type: 0, +id: number, +payload: { +sessionIdentification: SessionIdentification, +sessionState: SessionState, +clientResponses: $ReadOnlyArray, }, }; export type ClientResponsesClientSocketMessage = { +type: 1, +id: number, +payload: { +clientResponses: $ReadOnlyArray, }, }; export type ClientClientSocketMessage = | ClientInitialClientSocketMessage | ClientResponsesClientSocketMessage | PingClientSocketMessage | AckUpdatesClientSocketMessage | APIRequestClientSocketMessage; export type ClientSocketMessageWithoutID = $Diff< ClientClientSocketMessage, { id: number }, >; // The types of messages that the server sends across the socket export const serverSocketMessageTypes = Object.freeze({ STATE_SYNC: 0, REQUESTS: 1, ERROR: 2, AUTH_ERROR: 3, ACTIVITY_UPDATE_RESPONSE: 4, PONG: 5, UPDATES: 6, MESSAGES: 7, API_RESPONSE: 8, COMPRESSED_MESSAGE: 9, }); export type ServerSocketMessageType = $Values; export function assertServerSocketMessageType( ourServerSocketMessageType: number, ): ServerSocketMessageType { invariant( ourServerSocketMessageType === 0 || ourServerSocketMessageType === 1 || ourServerSocketMessageType === 2 || ourServerSocketMessageType === 3 || ourServerSocketMessageType === 4 || ourServerSocketMessageType === 5 || ourServerSocketMessageType === 6 || ourServerSocketMessageType === 7 || ourServerSocketMessageType === 8 || ourServerSocketMessageType === 9, 'number is not ServerSocketMessageType enum', ); return ourServerSocketMessageType; } export const stateSyncPayloadTypes = Object.freeze({ FULL: 0, INCREMENTAL: 1, }); export const fullStateSyncActionType = 'FULL_STATE_SYNC'; export type BaseFullStateSync = { +messagesResult: MessagesResponse, +rawEntryInfos: $ReadOnlyArray, +userInfos: $ReadOnlyArray, +updatesCurrentAsOf: number, }; const baseFullStateSyncValidator = tShape({ messagesResult: messagesResponseValidator, rawEntryInfos: t.list(rawEntryInfoValidator), userInfos: t.list(userInfoValidator), updatesCurrentAsOf: t.Number, }); export type ClientFullStateSync = $ReadOnly<{ ...BaseFullStateSync, +threadInfos: RawThreadInfos, +currentUserInfo: CurrentUserInfo, }>; export type StateSyncFullActionPayload = $ReadOnly<{ ...ClientFullStateSync, +calendarQuery: CalendarQuery, +keyserverID: string, }>; export type ClientStateSyncFullSocketPayload = $ReadOnly<{ ...ClientFullStateSync, +type: 0, // Included iff client is using sessionIdentifierTypes.BODY_SESSION_ID +sessionID?: string, }>; export type ServerFullStateSync = $ReadOnly<{ ...BaseFullStateSync, +threadInfos: MixedRawThreadInfos, +currentUserInfo: CurrentUserInfo, }>; const serverFullStateSyncValidator = tShape({ ...baseFullStateSyncValidator.meta.props, threadInfos: t.dict(tID, mixedRawThreadInfoValidator), currentUserInfo: currentUserInfoValidator, }); -export type ServerStateSyncFullSocketPayload = { +export type ServerStateSyncFullSocketPayload = $ReadOnly<{ ...ServerFullStateSync, +type: 0, // Included iff client is using sessionIdentifierTypes.BODY_SESSION_ID +sessionID?: string, -}; +}>; const serverStateSyncFullSocketPayloadValidator = tShape({ ...serverFullStateSyncValidator.meta.props, type: tNumber(stateSyncPayloadTypes.FULL), sessionID: t.maybe(t.String), }); export const incrementalStateSyncActionType = 'INCREMENTAL_STATE_SYNC'; export type BaseIncrementalStateSync = { +messagesResult: MessagesResponse, +deltaEntryInfos: $ReadOnlyArray, +deletedEntryIDs: $ReadOnlyArray, +userInfos: $ReadOnlyArray, }; const baseIncrementalStateSyncValidator = tShape({ messagesResult: messagesResponseValidator, deltaEntryInfos: t.list(rawEntryInfoValidator), deletedEntryIDs: t.list(tID), userInfos: t.list(userInfoValidator), }); -export type ClientIncrementalStateSync = { +export type ClientIncrementalStateSync = $ReadOnly<{ ...BaseIncrementalStateSync, +updatesResult: ClientUpdatesResult, -}; -export type StateSyncIncrementalActionPayload = { +}>; +export type StateSyncIncrementalActionPayload = $ReadOnly<{ ...ClientIncrementalStateSync, +calendarQuery: CalendarQuery, +keyserverID: string, -}; -type ClientStateSyncIncrementalSocketPayload = { +}>; +export type ClientStateSyncIncrementalSocketPayload = $ReadOnly<{ +type: 1, ...ClientIncrementalStateSync, -}; +}>; -export type ServerIncrementalStateSync = { +export type ServerIncrementalStateSync = $ReadOnly<{ ...BaseIncrementalStateSync, +updatesResult: ServerUpdatesResult, -}; +}>; const serverIncrementalStateSyncValidator = tShape({ ...baseIncrementalStateSyncValidator.meta.props, updatesResult: serverUpdatesResultValidator, }); -type ServerStateSyncIncrementalSocketPayload = { +type ServerStateSyncIncrementalSocketPayload = $ReadOnly<{ +type: 1, ...ServerIncrementalStateSync, -}; +}>; const serverStateSyncIncrementalSocketPayloadValidator = tShape({ type: tNumber(stateSyncPayloadTypes.INCREMENTAL), ...serverIncrementalStateSyncValidator.meta.props, }); export type ClientStateSyncSocketPayload = | ClientStateSyncFullSocketPayload | ClientStateSyncIncrementalSocketPayload; export type ServerStateSyncSocketPayload = | ServerStateSyncFullSocketPayload | ServerStateSyncIncrementalSocketPayload; export const serverStateSyncSocketPayloadValidator: TUnion = t.union([ serverStateSyncFullSocketPayloadValidator, serverStateSyncIncrementalSocketPayloadValidator, ]); export type ClientStateSyncFullSocketResult = $ReadOnly<{ ...ClientStateSyncFullSocketPayload, +keyserverID: string, }>; export type ClientStateSyncIncrementalSocketResult = $ReadOnly<{ ...ClientStateSyncIncrementalSocketPayload, +keyserverID: string, }>; export type ClientStateSyncSocketResult = | ClientStateSyncFullSocketResult | ClientStateSyncIncrementalSocketResult; export type ServerStateSyncServerSocketMessage = { +type: 0, +responseTo: number, +payload: ServerStateSyncSocketPayload, }; export const serverStateSyncServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.STATE_SYNC), responseTo: t.Number, payload: serverStateSyncSocketPayloadValidator, }); type ServerRequestsServerSocketMessagePayload = { +serverRequests: $ReadOnlyArray, }; export type ServerRequestsServerSocketMessage = { +type: 1, +responseTo?: number, +payload: ServerRequestsServerSocketMessagePayload, }; export const serverRequestsServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.REQUESTS), responseTo: t.maybe(t.Number), payload: tShape({ serverRequests: t.list(serverServerRequestValidator), }), }); export type ErrorServerSocketMessage = { type: 2, responseTo?: number, message: string, payload?: Object, }; export const errorServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.ERROR), responseTo: t.maybe(t.Number), message: t.String, payload: t.maybe(t.Object), }); type SessionChange = { +cookie: string, +currentUserInfo: LoggedOutUserInfo, }; export type AuthErrorServerSocketMessage = { +type: 3, +responseTo: number, +message: string, +sessionChange: SessionChange, }; export const authErrorServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.AUTH_ERROR), responseTo: t.Number, message: t.String, sessionChange: t.maybe( tShape({ cookie: t.String, currentUserInfo: loggedOutUserInfoValidator, }), ), }); export type ActivityUpdateResponseServerSocketMessage = { +type: 4, +responseTo: number, +payload: UpdateActivityResult, }; export const activityUpdateResponseServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE), responseTo: t.Number, payload: updateActivityResultValidator, }); export type PongServerSocketMessage = { +type: 5, +responseTo: number, }; export const pongServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.PONG), responseTo: t.Number, }); export type ServerUpdatesServerSocketMessage = { +type: 6, +payload: ServerUpdatesResultWithUserInfos, }; export const serverUpdatesServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.UPDATES), payload: serverUpdatesResultWithUserInfosValidator, }); export type MessagesServerSocketMessage = { +type: 7, +payload: NewMessagesPayload, }; export const messagesServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.MESSAGES), payload: newMessagesPayloadValidator, }); export type APIResponseServerSocketMessage = { +type: 8, +responseTo: number, +payload?: Object, }; export const apiResponseServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.API_RESPONSE), responseTo: t.Number, payload: t.maybe(t.Object), }); export type CompressedMessageServerSocketMessage = { +type: 9, +payload: CompressedData, }; export const compressedMessageServerSocketMessageValidator: TInterface = tShape({ type: tNumber(serverSocketMessageTypes.COMPRESSED_MESSAGE), payload: compressedDataValidator, }); export type ServerServerSocketMessage = | ServerStateSyncServerSocketMessage | ServerRequestsServerSocketMessage | ErrorServerSocketMessage | AuthErrorServerSocketMessage | ActivityUpdateResponseServerSocketMessage | PongServerSocketMessage | ServerUpdatesServerSocketMessage | MessagesServerSocketMessage | APIResponseServerSocketMessage | CompressedMessageServerSocketMessage; export const serverServerSocketMessageValidator: TUnion = t.union([ serverStateSyncServerSocketMessageValidator, serverRequestsServerSocketMessageValidator, errorServerSocketMessageValidator, authErrorServerSocketMessageValidator, activityUpdateResponseServerSocketMessageValidator, pongServerSocketMessageValidator, serverUpdatesServerSocketMessageValidator, messagesServerSocketMessageValidator, apiResponseServerSocketMessageValidator, compressedMessageServerSocketMessageValidator, ]); export type ClientRequestsServerSocketMessage = { +type: 1, +responseTo?: number, +payload: { +serverRequests: $ReadOnlyArray, }, }; export type ClientStateSyncServerSocketMessage = { +type: 0, +responseTo: number, +payload: ClientStateSyncSocketPayload, }; export type ClientUpdatesServerSocketMessage = { +type: 6, +payload: ClientUpdatesResultWithUserInfos, }; export type ClientServerSocketMessage = | ClientStateSyncServerSocketMessage | ClientRequestsServerSocketMessage | ErrorServerSocketMessage | AuthErrorServerSocketMessage | ActivityUpdateResponseServerSocketMessage | PongServerSocketMessage | ClientUpdatesServerSocketMessage | MessagesServerSocketMessage | APIResponseServerSocketMessage | CompressedMessageServerSocketMessage; export type SocketListener = (message: ClientServerSocketMessage) => void; export type ConnectionStatus = | 'connecting' | 'connected' | 'reconnecting' | 'disconnecting' | 'forcedDisconnecting' | 'disconnected'; export type ConnectionIssue = 'client_version_unsupported'; export type ConnectionInfo = { +status: ConnectionStatus, +queuedActivityUpdates: $ReadOnlyArray, +lateResponses: $ReadOnlyArray, +unreachable: boolean, +connectionIssue: ?ConnectionIssue, // When this is flipped to truthy, a session recovery is attempted // This can happen when the keyserver invalidates the session +activeSessionRecovery: null | RecoveryFromReduxActionSource, }; export const connectionInfoValidator: TInterface = tShape({ status: t.enums.of([ 'connecting', 'connected', 'reconnecting', 'disconnecting', 'forcedDisconnecting', 'disconnected', ]), queuedActivityUpdates: t.list(activityUpdateValidator), lateResponses: t.list(t.Number), unreachable: t.Boolean, connectionIssue: t.maybe(t.enums.of([])), activeSessionRecovery: t.maybe( t.enums.of(values(recoveryFromReduxActionSources)), ), }); export const defaultConnectionInfo: ConnectionInfo = { status: 'connecting', queuedActivityUpdates: [], lateResponses: [], unreachable: false, connectionIssue: null, activeSessionRecovery: null, }; export type SetActiveSessionRecoveryPayload = { +activeSessionRecovery: null | RecoveryFromReduxActionSource, +keyserverID: string, }; export type OneTimeKeyGenerator = (inc: number) => string; export type GRPCStream = { readyState: number, onopen: (ev: any) => mixed, onmessage: (ev: MessageEvent) => mixed, onclose: (ev: CloseEvent) => mixed, close(code?: number, reason?: string): void, send(data: string | Blob | ArrayBuffer | $ArrayBufferView): void, }; export type CommTransportLayer = GRPCStream | WebSocket;