diff --git a/lib/socket/api-request-handler.react.js b/lib/socket/api-request-handler.react.js index 78acf3ddc..e69c7456a 100644 --- a/lib/socket/api-request-handler.react.js +++ b/lib/socket/api-request-handler.react.js @@ -1,98 +1,99 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; -import { InflightRequests, SocketOffline } from './inflight-requests.js'; +import { InflightRequests } from './inflight-requests.js'; import type { APIRequest } from '../types/endpoints.js'; import { clientSocketMessageTypes, serverSocketMessageTypes, type ClientSocketMessageWithoutID, type ConnectionInfo, } from '../types/socket-types.js'; import { registerActiveSocket } from '../utils/action-utils.js'; +import { SocketOffline } from '../utils/errors.js'; import { useSelector } from '../utils/redux-utils.js'; type BaseProps = { +inflightRequests: ?InflightRequests, +sendMessage: (message: ClientSocketMessageWithoutID) => number, }; type Props = { ...BaseProps, +connection: ConnectionInfo, }; class APIRequestHandler extends React.PureComponent { static isConnected(props: Props, request?: APIRequest) { const { inflightRequests, connection } = props; if (!inflightRequests) { return false; } // This is a hack. We actually have a race condition between // ActivityHandler and Socket. Both of them respond to a backgrounding, but // we want ActivityHandler to go first. Once it sends its message, Socket // will wait for the response before shutting down. But if Socket starts // shutting down first, we'll have a problem. Note that this approach only // stops the race in fetchResponse below, and not in action-utils (which // happens earlier via the registerActiveSocket call below), but empircally // that hasn't been an issue. // The reason I didn't rewrite this to happen in a single component is // because I want to maintain separation of concerns. Upcoming React Hooks // will be a great way to rewrite them to be related but still separated. return ( connection.status === 'connected' || (request && request.endpoint === 'update_activity') ); } get registeredResponseFetcher() { return APIRequestHandler.isConnected(this.props) ? this.fetchResponse : null; } componentDidMount() { registerActiveSocket(this.registeredResponseFetcher); } componentWillUnmount() { registerActiveSocket(null); } componentDidUpdate(prevProps: Props) { const isConnected = APIRequestHandler.isConnected(this.props); const wasConnected = APIRequestHandler.isConnected(prevProps); if (isConnected !== wasConnected) { registerActiveSocket(this.registeredResponseFetcher); } } render() { return null; } fetchResponse = async (request: APIRequest): Promise => { if (!APIRequestHandler.isConnected(this.props, request)) { throw new SocketOffline('socket_offline'); } const { inflightRequests } = this.props; invariant(inflightRequests, 'inflightRequests falsey inside fetchResponse'); const messageID = this.props.sendMessage({ type: clientSocketMessageTypes.API_REQUEST, payload: request, }); const response = await inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.API_RESPONSE, ); return response.payload; }; } const ConnectedAPIRequestHandler: React.ComponentType = React.memo(function ConnectedAPIRequestHandler(props) { const connection = useSelector(state => state.connection); return ; }); export default ConnectedAPIRequestHandler; diff --git a/lib/socket/inflight-requests.js b/lib/socket/inflight-requests.js index 2969710d9..23c8e81ad 100644 --- a/lib/socket/inflight-requests.js +++ b/lib/socket/inflight-requests.js @@ -1,251 +1,242 @@ // @flow import invariant from 'invariant'; import { clientRequestVisualTimeout, clientRequestSocketTimeout, } from '../shared/timeouts.js'; import { type ClientServerSocketMessage, type ClientStateSyncServerSocketMessage, type ClientRequestsServerSocketMessage, type ActivityUpdateResponseServerSocketMessage, type PongServerSocketMessage, type APIResponseServerSocketMessage, type ServerSocketMessageType, serverSocketMessageTypes, } from '../types/socket-types.js'; -import { ServerError, ExtendableError } from '../utils/errors.js'; +import { ServerError, SocketOffline, SocketTimeout } from '../utils/errors.js'; import sleep from '../utils/sleep.js'; type ValidResponseMessageMap = { a: ClientStateSyncServerSocketMessage, b: ClientRequestsServerSocketMessage, c: ActivityUpdateResponseServerSocketMessage, d: PongServerSocketMessage, e: APIResponseServerSocketMessage, }; type BaseInflightRequest = { expectedResponseType: $PropertyType, resolve: (response: Response) => void, reject: (error: Error) => void, messageID: number, }; type InflightRequestMap = $ObjMap< ValidResponseMessageMap, (T) => BaseInflightRequest<$Exact>, >; type ValidResponseMessage = $Values; type InflightRequest = $Values; const remainingTimeAfterVisualTimeout = clientRequestSocketTimeout - clientRequestVisualTimeout; -class SocketOffline extends ExtendableError {} - -class SocketTimeout extends ExtendableError { - expectedResponseType: ServerSocketMessageType; - constructor(expectedType: ServerSocketMessageType) { - super(`socket timed out waiting for response type ${expectedType}`); - this.expectedResponseType = expectedType; - } -} type Callbacks = { timeout: () => void, setLateResponse: (messageID: number, isLate: boolean) => void, }; class InflightRequests { data: InflightRequest[] = []; timeoutCallback: () => void; setLateResponse: (messageID: number, isLate: boolean) => void; constructor(callbacks: Callbacks) { this.timeoutCallback = callbacks.timeout; this.setLateResponse = callbacks.setLateResponse; } async fetchResponse( messageID: number, expectedType: $PropertyType, ): Promise { let inflightRequest: ?InflightRequest; const responsePromise = new Promise((resolve, reject) => { // Flow makes us do these unnecessary runtime checks... if (expectedType === serverSocketMessageTypes.STATE_SYNC) { inflightRequest = { expectedResponseType: serverSocketMessageTypes.STATE_SYNC, resolve, reject, messageID, }; } else if (expectedType === serverSocketMessageTypes.REQUESTS) { inflightRequest = { expectedResponseType: serverSocketMessageTypes.REQUESTS, resolve, reject, messageID, }; } else if ( expectedType === serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE ) { inflightRequest = { expectedResponseType: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, resolve, reject, messageID, }; } else if (expectedType === serverSocketMessageTypes.PONG) { inflightRequest = { expectedResponseType: serverSocketMessageTypes.PONG, resolve, reject, messageID, }; } else if (expectedType === serverSocketMessageTypes.API_RESPONSE) { inflightRequest = { expectedResponseType: serverSocketMessageTypes.API_RESPONSE, resolve, reject, messageID, }; } }); invariant( inflightRequest, `${expectedType} is an invalid server response type`, ); this.data.push(inflightRequest); // We create this object so we can pass it by reference to the timeout // function below. That function will avoid setting this request as late if // the response has already arrived. const requestResult = { concluded: false, lateResponse: false }; try { const response = await Promise.race([ responsePromise, this.timeout(messageID, expectedType, requestResult), ]); requestResult.concluded = true; if (requestResult.lateResponse) { this.setLateResponse(messageID, false); } this.clearRequest(inflightRequest); // Flow is unable to narrow the return type based on the expectedType return (response: any); } catch (e) { requestResult.concluded = true; this.clearRequest(inflightRequest); if (e instanceof SocketTimeout) { this.rejectAll(new Error('socket closed due to timeout')); this.timeoutCallback(); } else if (requestResult.lateResponse) { this.setLateResponse(messageID, false); } throw e; } } async timeout( messageID: number, expectedType: ServerSocketMessageType, requestResult: { concluded: boolean, lateResponse: boolean }, ) { await sleep(clientRequestVisualTimeout); if (requestResult.concluded) { // We're just doing this to bail out. If requestResult.concluded we can // conclude that responsePromise already won the race. Returning here // gives Flow errors since Flow is worried response will be undefined. throw new Error(); } requestResult.lateResponse = true; this.setLateResponse(messageID, true); await sleep(remainingTimeAfterVisualTimeout); throw new SocketTimeout(expectedType); } clearRequest(requestToClear: InflightRequest) { this.data = this.data.filter(request => request !== requestToClear); } resolveRequestsForMessage(message: ClientServerSocketMessage) { for (const inflightRequest of this.data) { if ( message.responseTo === null || message.responseTo === undefined || inflightRequest.messageID !== message.responseTo ) { continue; } if (message.type === serverSocketMessageTypes.ERROR) { const error = message.payload ? new ServerError(message.message, message.payload) : new ServerError(message.message); inflightRequest.reject(error); } else if (message.type === serverSocketMessageTypes.AUTH_ERROR) { inflightRequest.reject(new SocketOffline('auth_error')); } else if ( message.type === serverSocketMessageTypes.STATE_SYNC && inflightRequest.expectedResponseType === serverSocketMessageTypes.STATE_SYNC ) { inflightRequest.resolve(message); } else if ( message.type === serverSocketMessageTypes.REQUESTS && inflightRequest.expectedResponseType === serverSocketMessageTypes.REQUESTS ) { inflightRequest.resolve(message); } else if ( message.type === serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE && inflightRequest.expectedResponseType === serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE ) { inflightRequest.resolve(message); } else if ( message.type === serverSocketMessageTypes.PONG && inflightRequest.expectedResponseType === serverSocketMessageTypes.PONG ) { inflightRequest.resolve(message); } else if ( message.type === serverSocketMessageTypes.API_RESPONSE && inflightRequest.expectedResponseType === serverSocketMessageTypes.API_RESPONSE ) { inflightRequest.resolve(message); } } } rejectAll(error: Error) { const { data } = this; // Though the promise rejections below should call clearRequest when they're // caught in fetchResponse, that doesn't happen synchronously. Socket won't // close unless all requests are resolved, so we clear this.data immediately this.data = []; for (const inflightRequest of data) { const { reject } = inflightRequest; reject(error); } } allRequestsResolvedExcept(excludeMessageID: ?number): boolean { for (const inflightRequest of this.data) { const { expectedResponseType } = inflightRequest; if ( expectedResponseType !== serverSocketMessageTypes.PONG && (excludeMessageID === null || excludeMessageID === undefined || excludeMessageID !== inflightRequest.messageID) ) { return false; } } return true; } } -export { SocketOffline, SocketTimeout, InflightRequests }; +export { InflightRequests }; diff --git a/lib/socket/request-response-handler.react.js b/lib/socket/request-response-handler.react.js index 759b80dc8..86be52d37 100644 --- a/lib/socket/request-response-handler.react.js +++ b/lib/socket/request-response-handler.react.js @@ -1,150 +1,150 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import { useDispatch } from 'react-redux'; -import { InflightRequests, SocketTimeout } from './inflight-requests.js'; +import { InflightRequests } from './inflight-requests.js'; import type { CalendarQuery } from '../types/entry-types.js'; import type { Dispatch } from '../types/redux-types.js'; import { processServerRequestsActionType, type ClientClientResponse, type ClientServerRequest, } from '../types/request-types.js'; import { type ClientRequestsServerSocketMessage, type ClientServerSocketMessage, clientSocketMessageTypes, serverSocketMessageTypes, type ClientSocketMessageWithoutID, type SocketListener, type ConnectionInfo, } from '../types/socket-types.js'; -import { ServerError } from '../utils/errors.js'; +import { ServerError, SocketTimeout } from '../utils/errors.js'; import { useSelector } from '../utils/redux-utils.js'; type BaseProps = { +inflightRequests: ?InflightRequests, +sendMessage: (message: ClientSocketMessageWithoutID) => number, +addListener: (listener: SocketListener) => void, +removeListener: (listener: SocketListener) => void, +getClientResponses: ( activeServerRequests: $ReadOnlyArray, ) => Promise<$ReadOnlyArray>, +currentCalendarQuery: () => CalendarQuery, }; type Props = { ...BaseProps, +connection: ConnectionInfo, +dispatch: Dispatch, }; class RequestResponseHandler extends React.PureComponent { componentDidMount() { this.props.addListener(this.onMessage); } componentWillUnmount() { this.props.removeListener(this.onMessage); } render() { return null; } onMessage = (message: ClientServerSocketMessage) => { if (message.type !== serverSocketMessageTypes.REQUESTS) { return; } const { serverRequests } = message.payload; if (serverRequests.length === 0) { return; } const calendarQuery = this.props.currentCalendarQuery(); this.props.dispatch({ type: processServerRequestsActionType, payload: { serverRequests, calendarQuery, }, }); if (this.props.inflightRequests) { const clientResponsesPromise = this.props.getClientResponses(serverRequests); this.sendAndHandleClientResponsesToServerRequests(clientResponsesPromise); } }; sendClientResponses( clientResponses: $ReadOnlyArray, ): Promise { const { inflightRequests } = this.props; invariant( inflightRequests, 'inflightRequests falsey inside sendClientResponses', ); const messageID = this.props.sendMessage({ type: clientSocketMessageTypes.RESPONSES, payload: { clientResponses }, }); return inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.REQUESTS, ); } async sendAndHandleClientResponsesToServerRequests( clientResponsesPromise: Promise<$ReadOnlyArray>, ) { const clientResponses = await clientResponsesPromise; if (clientResponses.length === 0) { return; } const promise = this.sendClientResponses(clientResponses); this.handleClientResponsesToServerRequests(promise, clientResponses); } async handleClientResponsesToServerRequests( promise: Promise, clientResponses: $ReadOnlyArray, retriesLeft: number = 1, ): Promise { try { await promise; } catch (e) { console.log(e); if ( !(e instanceof SocketTimeout) && (!(e instanceof ServerError) || e.message === 'unknown_error') && retriesLeft > 0 && this.props.connection.status === 'connected' && this.props.inflightRequests ) { // We'll only retry if the connection is healthy and the error is either // an unknown_error ServerError or something is neither a ServerError // nor a SocketTimeout. const newPromise = this.sendClientResponses(clientResponses); await this.handleClientResponsesToServerRequests( newPromise, clientResponses, retriesLeft - 1, ); } } } } const ConnectedRequestResponseHandler: React.ComponentType = React.memo(function ConnectedRequestResponseHandler(props) { const connection = useSelector(state => state.connection); const dispatch = useDispatch(); return ( ); }); export default ConnectedRequestResponseHandler; diff --git a/lib/socket/socket.react.js b/lib/socket/socket.react.js index 99f0066f0..58ff02099 100644 --- a/lib/socket/socket.react.js +++ b/lib/socket/socket.react.js @@ -1,778 +1,774 @@ // @flow import invariant from 'invariant'; import _throttle from 'lodash/throttle.js'; import * as React from 'react'; import ActivityHandler from './activity-handler.react.js'; import APIRequestHandler from './api-request-handler.react.js'; import CalendarQueryHandler from './calendar-query-handler.react.js'; -import { - InflightRequests, - SocketTimeout, - SocketOffline, -} from './inflight-requests.js'; +import { InflightRequests } from './inflight-requests.js'; import MessageHandler from './message-handler.react.js'; import ReportHandler from './report-handler.react.js'; import RequestResponseHandler from './request-response-handler.react.js'; import UpdateHandler from './update-handler.react.js'; import { updateActivityActionTypes } from '../actions/activity-actions.js'; import { logOutActionTypes } from '../actions/user-actions.js'; import { unsupervisedBackgroundActionType } from '../reducers/lifecycle-state-reducer.js'; import { pingFrequency, serverRequestSocketTimeout, clientRequestVisualTimeout, clientRequestSocketTimeout, } from '../shared/timeouts.js'; import { logInActionSources, type LogOutResult, } from '../types/account-types.js'; import { isWebPlatform } from '../types/device-types.js'; import type { CalendarQuery } from '../types/entry-types.js'; import { forcePolicyAcknowledgmentActionType } from '../types/policy-types.js'; import type { Dispatch } from '../types/redux-types.js'; import { serverRequestTypes, type ClientClientResponse, type ClientServerRequest, } from '../types/request-types.js'; import { type SessionState, type SessionIdentification, type PreRequestUserState, } from '../types/session-types.js'; import { clientSocketMessageTypes, type ClientClientSocketMessage, serverSocketMessageTypes, type ClientServerSocketMessage, stateSyncPayloadTypes, fullStateSyncActionType, incrementalStateSyncActionType, updateConnectionStatusActionType, type ConnectionInfo, type ClientInitialClientSocketMessage, type ClientResponsesClientSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, type ClientSocketMessageWithoutID, type SocketListener, type ConnectionStatus, setLateResponseActionType, type CommTransportLayer, } from '../types/socket-types.js'; import { actionLogger } from '../utils/action-logger.js'; import type { DispatchActionPromise } from '../utils/action-utils.js'; import { setNewSessionActionType, fetchNewCookieFromNativeCredentials, } from '../utils/action-utils.js'; import { getConfig } from '../utils/config.js'; -import { ServerError } from '../utils/errors.js'; +import { ServerError, SocketTimeout, SocketOffline } from '../utils/errors.js'; import { promiseAll } from '../utils/promises.js'; import sleep from '../utils/sleep.js'; const remainingTimeAfterVisualTimeout = clientRequestSocketTimeout - clientRequestVisualTimeout; export type BaseSocketProps = { +detectUnsupervisedBackgroundRef?: ( detectUnsupervisedBackground: (alreadyClosed: boolean) => boolean, ) => void, }; type Props = { ...BaseSocketProps, // Redux state +active: boolean, +openSocket: () => CommTransportLayer, +getClientResponses: ( activeServerRequests: $ReadOnlyArray, ) => Promise<$ReadOnlyArray>, +activeThread: ?string, +sessionStateFunc: () => SessionState, +sessionIdentification: SessionIdentification, +cookie: ?string, +urlPrefix: string, +connection: ConnectionInfo, +currentCalendarQuery: () => CalendarQuery, +canSendReports: boolean, +frozen: boolean, +preRequestUserState: PreRequestUserState, +noDataAfterPolicyAcknowledgment?: boolean, // Redux dispatch functions +dispatch: Dispatch, +dispatchActionPromise: DispatchActionPromise, // async functions that hit server APIs +logOut: (preRequestUserState: PreRequestUserState) => Promise, +socketCrashLoopRecovery?: () => Promise, }; type State = { +inflightRequests: ?InflightRequests, }; class Socket extends React.PureComponent { state: State = { inflightRequests: null, }; socket: ?CommTransportLayer; nextClientMessageID: number = 0; listeners: Set = new Set(); pingTimeoutID: ?TimeoutID; messageLastReceived: ?number; initialPlatformDetailsSent: boolean = isWebPlatform( getConfig().platformDetails.platform, ); reopenConnectionAfterClosing: boolean = false; invalidationRecoveryInProgress: boolean = false; initializedWithUserState: ?PreRequestUserState; failuresAfterPolicyAcknowledgment: number = 0; openSocket(newStatus: ConnectionStatus) { if ( this.props.frozen || (!isWebPlatform(getConfig().platformDetails.platform) && (!this.props.cookie || !this.props.cookie.startsWith('user='))) ) { return; } if (this.socket) { const { status } = this.props.connection; if (status === 'forcedDisconnecting') { this.reopenConnectionAfterClosing = true; return; } else if (status === 'disconnecting' && this.socket.readyState === 1) { this.markSocketInitialized(); return; } else if ( status === 'connected' || status === 'connecting' || status === 'reconnecting' ) { return; } if (this.socket.readyState < 2) { this.socket.close(); console.log(`this.socket seems open, but Redux thinks it's ${status}`); } } this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: newStatus }, }); const socket = this.props.openSocket(); const openObject = {}; socket.onopen = () => { if (this.socket === socket) { this.initializeSocket(); openObject.initializeMessageSent = true; } }; socket.onmessage = this.receiveMessage; socket.onclose = () => { if (this.socket === socket) { this.onClose(); } }; this.socket = socket; (async () => { await sleep(clientRequestVisualTimeout); if (this.socket !== socket || openObject.initializeMessageSent) { return; } this.setLateResponse(-1, true); await sleep(remainingTimeAfterVisualTimeout); if (this.socket !== socket || openObject.initializeMessageSent) { return; } this.finishClosingSocket(); })(); this.setState({ inflightRequests: new InflightRequests({ timeout: () => { if (this.socket === socket) { this.finishClosingSocket(); } }, setLateResponse: (messageID: number, isLate: boolean) => { if (this.socket === socket) { this.setLateResponse(messageID, isLate); } }, }), }); } markSocketInitialized() { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'connected' }, }); this.resetPing(); } closeSocket( // This param is a hack. When closing a socket there is a race between this // function and the one to propagate the activity update. We make sure that // the activity update wins the race by passing in this param. activityUpdatePending: boolean, ) { const { status } = this.props.connection; if (status === 'disconnected') { return; } else if (status === 'disconnecting' || status === 'forcedDisconnecting') { this.reopenConnectionAfterClosing = false; return; } this.stopPing(); this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnecting' }, }); if (!activityUpdatePending) { this.finishClosingSocket(); } } forceCloseSocket() { this.stopPing(); const { status } = this.props.connection; if (status !== 'forcedDisconnecting' && status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'forcedDisconnecting' }, }); } this.finishClosingSocket(); } finishClosingSocket(receivedResponseTo?: ?number) { const { inflightRequests } = this.state; if ( inflightRequests && !inflightRequests.allRequestsResolvedExcept(receivedResponseTo) ) { return; } if (this.socket && this.socket.readyState < 2) { // If it's not closing already, close it this.socket.close(); } this.socket = null; this.stopPing(); this.setState({ inflightRequests: null }); if (this.props.connection.status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnected' }, }); } if (this.reopenConnectionAfterClosing) { this.reopenConnectionAfterClosing = false; if (this.props.active) { this.openSocket('connecting'); } } } reconnect: $Call void, number> = _throttle( () => this.openSocket('reconnecting'), 2000, ); componentDidMount() { if (this.props.detectUnsupervisedBackgroundRef) { this.props.detectUnsupervisedBackgroundRef( this.detectUnsupervisedBackground, ); } if (this.props.active) { this.openSocket('connecting'); } } componentWillUnmount() { this.closeSocket(false); this.reconnect.cancel(); } componentDidUpdate(prevProps: Props) { if (this.props.active && !prevProps.active) { this.openSocket('connecting'); } else if (!this.props.active && prevProps.active) { this.closeSocket(!!prevProps.activeThread); } else if ( this.props.active && prevProps.openSocket !== this.props.openSocket ) { // This case happens when the baseURL/urlPrefix is changed this.reopenConnectionAfterClosing = true; this.forceCloseSocket(); } else if ( this.props.active && this.props.connection.status === 'disconnected' && prevProps.connection.status !== 'disconnected' && !this.invalidationRecoveryInProgress ) { this.reconnect(); } } render(): React.Node { // It's important that APIRequestHandler get rendered first here. This is so // that it is registered with Redux first, so that its componentDidUpdate // processes before the other Handlers. This allows APIRequestHandler to // register itself with action-utils before other Handlers call // dispatchActionPromise in response to the componentDidUpdate triggered by // the same Redux change (state.connection.status). return ( ); } sendMessageWithoutID: (message: ClientSocketMessageWithoutID) => number = message => { const id = this.nextClientMessageID++; // These conditions all do the same thing and the runtime checks are only // necessary for Flow if (message.type === clientSocketMessageTypes.INITIAL) { this.sendMessage( ({ ...message, id }: ClientInitialClientSocketMessage), ); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.sendMessage( ({ ...message, id }: ClientResponsesClientSocketMessage), ); } else if (message.type === clientSocketMessageTypes.PING) { this.sendMessage(({ ...message, id }: PingClientSocketMessage)); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.sendMessage(({ ...message, id }: AckUpdatesClientSocketMessage)); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.sendMessage(({ ...message, id }: APIRequestClientSocketMessage)); } return id; }; sendMessage(message: ClientClientSocketMessage) { const socket = this.socket; invariant(socket, 'should be set'); socket.send(JSON.stringify(message)); } static messageFromEvent(event: MessageEvent): ?ClientServerSocketMessage { if (typeof event.data !== 'string') { console.log('socket received a non-string message'); return null; } try { return JSON.parse(event.data); } catch (e) { console.log(e); return null; } } receiveMessage: (event: MessageEvent) => Promise = async event => { const message = Socket.messageFromEvent(event); if (!message) { return; } this.failuresAfterPolicyAcknowledgment = 0; const { inflightRequests } = this.state; if (!inflightRequests) { // inflightRequests can be falsey here if we receive a message after we've // begun shutting down the socket. It's possible for a React Native // WebSocket to deliver a message even after close() is called on it. In // this case the message is probably a PONG, which we can safely ignore. // If it's not a PONG, it has to be something server-initiated (like // UPDATES or MESSAGES), since InflightRequests.allRequestsResolvedExcept // will wait for all responses to client-initiated requests to be // delivered before closing a socket. UPDATES and MESSAGES are both // checkpointed on the client, so should be okay to just ignore here and // redownload them later, probably in an incremental STATE_SYNC. return; } // If we receive any message, that indicates that our connection is healthy, // so we can reset the ping timeout. this.resetPing(); inflightRequests.resolveRequestsForMessage(message); const { status } = this.props.connection; if (status === 'disconnecting' || status === 'forcedDisconnecting') { this.finishClosingSocket( // We do this for Flow message.responseTo !== undefined ? message.responseTo : null, ); } for (const listener of this.listeners) { listener(message); } if (message.type === serverSocketMessageTypes.ERROR) { const { message: errorMessage, payload } = message; if (payload) { console.log(`socket sent error ${errorMessage} with payload`, payload); } else { console.log(`socket sent error ${errorMessage}`); } if (errorMessage === 'policies_not_accepted') { this.props.dispatch({ type: forcePolicyAcknowledgmentActionType, payload, }); } } else if (message.type === serverSocketMessageTypes.AUTH_ERROR) { const { sessionChange } = message; const cookie = sessionChange ? sessionChange.cookie : this.props.cookie; this.invalidationRecoveryInProgress = true; const recoverySessionChange = await fetchNewCookieFromNativeCredentials( this.props.dispatch, cookie, this.props.urlPrefix, logInActionSources.socketAuthErrorResolutionAttempt, ); if (!recoverySessionChange && sessionChange) { // This should only happen in the cookieSources.BODY (native) case when // the resolution attempt failed const { cookie: newerCookie, currentUserInfo } = sessionChange; this.props.dispatch({ type: setNewSessionActionType, payload: { sessionChange: { cookieInvalidated: true, currentUserInfo, cookie: newerCookie, }, preRequestUserState: this.initializedWithUserState, error: null, logInActionSource: logInActionSources.socketAuthErrorResolutionAttempt, }, }); } else if (!recoverySessionChange) { this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } this.invalidationRecoveryInProgress = false; } }; addListener: (listener: SocketListener) => void = listener => { this.listeners.add(listener); }; removeListener: (listener: SocketListener) => void = listener => { this.listeners.delete(listener); }; onClose: () => void = () => { const { status } = this.props.connection; this.socket = null; this.stopPing(); if (this.state.inflightRequests) { this.state.inflightRequests.rejectAll(new Error('socket closed')); this.setState({ inflightRequests: null }); } const handled = this.detectUnsupervisedBackground(true); if (!handled && status !== 'disconnected') { this.props.dispatch({ type: updateConnectionStatusActionType, payload: { status: 'disconnected' }, }); } }; async sendInitialMessage() { const { inflightRequests } = this.state; invariant( inflightRequests, 'inflightRequests falsey inside sendInitialMessage', ); const messageID = this.nextClientMessageID++; const promises = {}; const clientResponses = []; if (!this.initialPlatformDetailsSent) { this.initialPlatformDetailsSent = true; clientResponses.push({ type: serverRequestTypes.PLATFORM_DETAILS, platformDetails: getConfig().platformDetails, }); } const { queuedActivityUpdates } = this.props.connection; if (queuedActivityUpdates.length > 0) { clientResponses.push({ type: serverRequestTypes.INITIAL_ACTIVITY_UPDATES, activityUpdates: queuedActivityUpdates, }); promises.activityUpdateMessage = inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, ); } const sessionState = this.props.sessionStateFunc(); const { sessionIdentification } = this.props; const initialMessage = { type: clientSocketMessageTypes.INITIAL, id: messageID, payload: { clientResponses, sessionState, sessionIdentification, }, }; this.initializedWithUserState = this.props.preRequestUserState; this.sendMessage(initialMessage); promises.stateSyncMessage = inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.STATE_SYNC, ); const { stateSyncMessage, activityUpdateMessage } = await promiseAll( promises, ); if (activityUpdateMessage) { this.props.dispatch({ type: updateActivityActionTypes.success, payload: { activityUpdates: queuedActivityUpdates, result: activityUpdateMessage.payload, }, }); } if (stateSyncMessage.payload.type === stateSyncPayloadTypes.FULL) { const { sessionID, type, ...actionPayload } = stateSyncMessage.payload; this.props.dispatch({ type: fullStateSyncActionType, payload: { ...actionPayload, calendarQuery: sessionState.calendarQuery, }, }); if (sessionID !== null && sessionID !== undefined) { invariant( this.initializedWithUserState, 'initializedWithUserState should be set when state sync received', ); this.props.dispatch({ type: setNewSessionActionType, payload: { sessionChange: { cookieInvalidated: false, sessionID }, preRequestUserState: this.initializedWithUserState, error: null, logInActionSource: undefined, }, }); } } else { const { type, ...actionPayload } = stateSyncMessage.payload; this.props.dispatch({ type: incrementalStateSyncActionType, payload: { ...actionPayload, calendarQuery: sessionState.calendarQuery, }, }); } const currentAsOf = stateSyncMessage.payload.type === stateSyncPayloadTypes.FULL ? stateSyncMessage.payload.updatesCurrentAsOf : stateSyncMessage.payload.updatesResult.currentAsOf; this.sendMessageWithoutID({ type: clientSocketMessageTypes.ACK_UPDATES, payload: { currentAsOf }, }); this.markSocketInitialized(); } initializeSocket: (retriesLeft?: number) => Promise = async ( retriesLeft = 1, ) => { try { await this.sendInitialMessage(); } catch (e) { if (this.props.noDataAfterPolicyAcknowledgment) { this.failuresAfterPolicyAcknowledgment++; } else { this.failuresAfterPolicyAcknowledgment = 0; } if ( this.failuresAfterPolicyAcknowledgment >= 2 && this.props.socketCrashLoopRecovery ) { this.failuresAfterPolicyAcknowledgment = 0; try { await this.props.socketCrashLoopRecovery(); } catch (error) { console.log(error); this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } return; } console.log(e); const { status } = this.props.connection; if ( e instanceof SocketTimeout || e instanceof SocketOffline || (status !== 'connecting' && status !== 'reconnecting') ) { // This indicates that the socket will be closed. Do nothing, since the // connection status update will trigger a reconnect. } else if ( retriesLeft === 0 || (e instanceof ServerError && e.message !== 'unknown_error') ) { if (e.message === 'not_logged_in') { this.props.dispatchActionPromise( logOutActionTypes, this.props.logOut(this.props.preRequestUserState), ); } else if (this.socket) { this.socket.close(); } } else { await this.initializeSocket(retriesLeft - 1); } } }; stopPing() { if (this.pingTimeoutID) { clearTimeout(this.pingTimeoutID); this.pingTimeoutID = null; } } resetPing() { this.stopPing(); const socket = this.socket; this.messageLastReceived = Date.now(); this.pingTimeoutID = setTimeout(() => { if (this.socket === socket) { this.sendPing(); } }, pingFrequency); } async sendPing() { if (this.props.connection.status !== 'connected') { // This generally shouldn't happen because anything that changes the // connection status should call stopPing(), but it's good to make sure return; } const messageID = this.sendMessageWithoutID({ type: clientSocketMessageTypes.PING, }); try { invariant( this.state.inflightRequests, 'inflightRequests falsey inside sendPing', ); await this.state.inflightRequests.fetchResponse( messageID, serverSocketMessageTypes.PONG, ); } catch (e) {} } setLateResponse: (messageID: number, isLate: boolean) => void = ( messageID, isLate, ) => { this.props.dispatch({ type: setLateResponseActionType, payload: { messageID, isLate }, }); }; cleanUpServerTerminatedSocket() { if (this.socket && this.socket.readyState < 2) { this.socket.close(); } else { this.onClose(); } } detectUnsupervisedBackground: (alreadyClosed: boolean) => boolean = alreadyClosed => { // On native, sometimes the app is backgrounded without the proper // callbacks getting triggered. This leaves us in an incorrect state for // two reasons: // (1) The connection is still considered to be active, causing API // requests to be processed via socket and failing. // (2) We rely on flipping foreground state in Redux to detect activity // changes, and thus won't think we need to update activity. if ( this.props.connection.status !== 'connected' || !this.messageLastReceived || this.messageLastReceived + serverRequestSocketTimeout >= Date.now() || (actionLogger.mostRecentActionTime && actionLogger.mostRecentActionTime + 3000 < Date.now()) ) { return false; } if (!alreadyClosed) { this.cleanUpServerTerminatedSocket(); } this.props.dispatch({ type: unsupervisedBackgroundActionType, payload: null, }); return true; }; } export default Socket; diff --git a/lib/utils/call-server-endpoint.js b/lib/utils/call-server-endpoint.js index 2096a3bb8..425726b22 100644 --- a/lib/utils/call-server-endpoint.js +++ b/lib/utils/call-server-endpoint.js @@ -1,215 +1,219 @@ // @flow import { getConfig } from './config.js'; -import { ServerError, FetchTimeout } from './errors.js'; +import { + ServerError, + FetchTimeout, + SocketOffline, + SocketTimeout, +} from './errors.js'; import sleep from './sleep.js'; import { uploadBlob, type UploadBlob } from './upload-blob.js'; import { callServerEndpointTimeout } from '../shared/timeouts.js'; -import { SocketOffline, SocketTimeout } from '../socket/inflight-requests.js'; import type { Shape } from '../types/core.js'; import { type Endpoint, type SocketAPIHandler, endpointIsSocketPreferred, endpointIsSocketOnly, } from '../types/endpoints.js'; import { forcePolicyAcknowledgmentActionType } from '../types/policy-types.js'; import type { Dispatch } from '../types/redux-types.js'; import type { ServerSessionChange, ClientSessionChange, } from '../types/session-types.js'; import type { ConnectionStatus } from '../types/socket-types'; import type { CurrentUserInfo } from '../types/user-types.js'; export type CallServerEndpointOptions = Shape<{ // null timeout means no timeout, which is the default for uploadBlob +timeout: ?number, // in milliseconds // getResultInfo will be called right before callServerEndpoint successfully // resolves and includes additional information about the request +getResultInfo: (resultInfo: CallServerEndpointResultInfo) => mixed, +blobUpload: boolean | UploadBlob, // the rest (onProgress, abortHandler) only work with blobUpload +onProgress: (percent: number) => void, // abortHandler will receive an abort function once the upload starts +abortHandler: (abort: () => void) => void, }>; export type CallServerEndpointResultInfoInterface = 'socket' | 'REST'; export type CallServerEndpointResultInfo = { +interface: CallServerEndpointResultInfoInterface, }; export type CallServerEndpointResponse = Shape<{ +cookieChange: ServerSessionChange, +currentUserInfo: CurrentUserInfo, +error: string, +payload: Object, }>; // You'll notice that this is not the type of the callServerEndpoint // function below. This is because the first several parameters to that // function get bound in by the helpers in lib/utils/action-utils.js. // This type represents the form of the callServerEndpoint function that // gets passed to the action function in lib/actions. export type CallServerEndpoint = ( endpoint: Endpoint, input: Object, options?: ?CallServerEndpointOptions, ) => Promise; type RequestData = { input: { [key: string]: mixed }, cookie?: ?string, sessionID?: ?string, }; // If cookie is undefined, then we will defer to the underlying environment to // handle cookies, and we won't worry about them. We do this on the web since // our cookies are httponly to protect against XSS attacks. On the other hand, // on native we want to keep track of the cookies since we don't trust the // underlying implementations and prefer for things to be explicit, and XSS // isn't a thing on native. Note that for native, cookie might be null // (indicating we don't have one), and we will then set an empty Cookie header. async function callServerEndpoint( cookie: ?string, setNewSession: (sessionChange: ClientSessionChange, error: ?string) => void, waitIfCookieInvalidated: () => Promise, cookieInvalidationRecovery: ( sessionChange: ClientSessionChange, ) => Promise, urlPrefix: string, sessionID: ?string, connectionStatus: ConnectionStatus, socketAPIHandler: ?SocketAPIHandler, endpoint: Endpoint, input: { [key: string]: mixed }, dispatch: Dispatch, options?: ?CallServerEndpointOptions, ): Promise { const possibleReplacement = await waitIfCookieInvalidated(); if (possibleReplacement) { return await possibleReplacement(endpoint, input, options); } if ( endpointIsSocketPreferred(endpoint) && connectionStatus === 'connected' && socketAPIHandler ) { try { const result = await socketAPIHandler({ endpoint, input }); options?.getResultInfo?.({ interface: 'socket' }); return result; } catch (e) { if (endpointIsSocketOnly(endpoint)) { throw e; } else if (e instanceof SocketOffline) { // nothing } else if (e instanceof SocketTimeout) { // nothing } else { throw e; } } } if (endpointIsSocketOnly(endpoint)) { throw new SocketOffline('socket_offline'); } const url = urlPrefix ? `${urlPrefix}/${endpoint}` : endpoint; let json; if (options && options.blobUpload) { const uploadBlobCallback = typeof options.blobUpload === 'function' ? options.blobUpload : uploadBlob; json = await uploadBlobCallback(url, cookie, sessionID, input, options); } else { const mergedData: RequestData = { input }; if (getConfig().setCookieOnRequest) { // We make sure that if setCookieOnRequest is true, we never set cookie to // undefined. null has a special meaning here: we don't currently have a // cookie, and we want the server to specify the new cookie it will // generate in the response body rather than the response header. See // session-types.js for more details on why we specify cookies in the body mergedData.cookie = cookie ? cookie : null; } if (getConfig().setSessionIDOnRequest) { // We make sure that if setSessionIDOnRequest is true, we never set // sessionID to undefined. null has a special meaning here: we cannot // consider the cookieID to be a unique session identifier, but we do not // have a sessionID to use either. This should only happen when the user // is not logged in on web. mergedData.sessionID = sessionID ? sessionID : null; } const callEndpointPromise = (async (): Promise => { const response = await fetch(url, { method: 'POST', // This is necessary to allow cookie headers to get passed down to us credentials: 'same-origin', body: JSON.stringify(mergedData), headers: { 'Accept': 'application/json', 'Content-Type': 'application/json', }, }); const text = await response.text(); try { return JSON.parse(text); } catch (e) { console.log(text); throw e; } })(); const timeout = options && options.timeout ? options.timeout : callServerEndpointTimeout; if (!timeout) { json = await callEndpointPromise; } else { const rejectPromise = (async () => { await sleep(timeout); throw new FetchTimeout( `callServerEndpoint timed out call to ${endpoint}`, endpoint, ); })(); json = await Promise.race([callEndpointPromise, rejectPromise]); } } const { cookieChange, error, payload, currentUserInfo } = json; const sessionChange: ?ServerSessionChange = cookieChange; if (sessionChange) { const { threadInfos, userInfos, ...rest } = sessionChange; const clientSessionChange = rest.cookieInvalidated ? rest : { cookieInvalidated: false, currentUserInfo, ...rest }; if (clientSessionChange.cookieInvalidated) { const maybeReplacement = await cookieInvalidationRecovery( clientSessionChange, ); if (maybeReplacement) { return await maybeReplacement(endpoint, input, options); } } setNewSession(clientSessionChange, error); } if (error === 'policies_not_accepted') { dispatch({ type: forcePolicyAcknowledgmentActionType, payload, }); } if (error) { throw new ServerError(error, payload); } options?.getResultInfo?.({ interface: 'REST' }); return json; } export default callServerEndpoint; diff --git a/lib/utils/errors.js b/lib/utils/errors.js index ba804c50c..ab3b95d6e 100644 --- a/lib/utils/errors.js +++ b/lib/utils/errors.js @@ -1,68 +1,81 @@ // @flow import copyError from 'utils-copy-error'; import type { PlatformDetails } from '../types/device-types.js'; +import type { ServerSocketMessageType } from '../types/socket-types.js'; class ExtendableError extends Error { constructor(message: string) { super(message); this.name = this.constructor.name; this.message = message; if (typeof Error.captureStackTrace === 'function') { Error.captureStackTrace(this, this.constructor); } else { this.stack = new Error(message).stack; } } } class ServerError extends ExtendableError { // When specified on server side, will get passed down to client // Only used in updateEntry and deleteEntry currently payload: ?Object; // Used for client_version_unsupported on server-side only platformDetails: ?PlatformDetails; // Used for input validators on server-side only sanitizedInput: mixed; constructor(error: string, payload?: ?Object) { super(error); this.payload = payload; } } class FetchTimeout extends ExtendableError { url: string; constructor(error: string, url: string) { super(error); this.url = url; } } +class SocketOffline extends ExtendableError {} + +class SocketTimeout extends ExtendableError { + expectedResponseType: ServerSocketMessageType; + constructor(expectedType: ServerSocketMessageType) { + super(`socket timed out waiting for response type ${expectedType}`); + this.expectedResponseType = expectedType; + } +} + function getMessageForException(e: mixed): ?string { if (typeof e === 'string') { return e; } else if ( e && typeof e === 'object' && e.message && typeof e.message === 'string' ) { return e.message; } return undefined; } function cloneError(e: E): E { return copyError(e); } export { ExtendableError, ServerError, FetchTimeout, getMessageForException, cloneError, + SocketOffline, + SocketTimeout, };