Changeset View
Changeset View
Standalone View
Standalone View
keyserver/src/socket/socket.js
// @flow | // @flow | ||||
import type { $Request } from 'express'; | import type { $Request } from 'express'; | ||||
import invariant from 'invariant'; | import invariant from 'invariant'; | ||||
import _debounce from 'lodash/debounce.js'; | import _debounce from 'lodash/debounce.js'; | ||||
import t from 'tcomb'; | import t from 'tcomb'; | ||||
import type { TUnion } from 'tcomb'; | |||||
import WebSocket from 'ws'; | import WebSocket from 'ws'; | ||||
import { baseLegalPolicies } from 'lib/facts/policies.js'; | import { baseLegalPolicies } from 'lib/facts/policies.js'; | ||||
import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; | import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; | ||||
import { | import { | ||||
serverRequestSocketTimeout, | serverRequestSocketTimeout, | ||||
serverResponseTimeout, | serverResponseTimeout, | ||||
} from 'lib/shared/timeouts.js'; | } from 'lib/shared/timeouts.js'; | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | |||||
import { assertSecureRequest } from '../utils/security-utils.js'; | import { assertSecureRequest } from '../utils/security-utils.js'; | ||||
import { | import { | ||||
checkInputValidator, | checkInputValidator, | ||||
checkClientSupported, | checkClientSupported, | ||||
policiesValidator, | policiesValidator, | ||||
validateOutput, | validateOutput, | ||||
} from '../utils/validation-utils.js'; | } from '../utils/validation-utils.js'; | ||||
const clientSocketMessageInputValidator = t.union([ | const clientSocketMessageInputValidator: TUnion<ClientSocketMessage> = t.union([ | ||||
tShape({ | tShape({ | ||||
type: t.irreducible( | type: t.irreducible( | ||||
'clientSocketMessageTypes.INITIAL', | 'clientSocketMessageTypes.INITIAL', | ||||
x => x === clientSocketMessageTypes.INITIAL, | x => x === clientSocketMessageTypes.INITIAL, | ||||
), | ), | ||||
id: t.Number, | id: t.Number, | ||||
payload: tShape({ | payload: tShape({ | ||||
sessionIdentification: tShape({ | sessionIdentification: tShape({ | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | class Socket { | ||||
onMessage = async ( | onMessage = async ( | ||||
messageString: string | Buffer | ArrayBuffer | Array<Buffer>, | messageString: string | Buffer | ArrayBuffer | Array<Buffer>, | ||||
) => { | ) => { | ||||
invariant(typeof messageString === 'string', 'message should be string'); | invariant(typeof messageString === 'string', 'message should be string'); | ||||
let clientSocketMessage: ?ClientSocketMessage; | let clientSocketMessage: ?ClientSocketMessage; | ||||
try { | try { | ||||
this.resetTimeout(); | this.resetTimeout(); | ||||
clientSocketMessage = JSON.parse(messageString); | const messageObject = JSON.parse(messageString); | ||||
checkInputValidator( | clientSocketMessage = checkInputValidator( | ||||
clientSocketMessageInputValidator, | clientSocketMessageInputValidator, | ||||
clientSocketMessage, | messageObject, | ||||
); | ); | ||||
if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { | if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { | ||||
if (this.viewer) { | if (this.viewer) { | ||||
// This indicates that the user sent multiple INITIAL messages. | // This indicates that the user sent multiple INITIAL messages. | ||||
throw new ServerError('socket_already_initialized'); | throw new ServerError('socket_already_initialized'); | ||||
} | } | ||||
this.viewer = await fetchViewerForSocket( | this.viewer = await fetchViewerForSocket( | ||||
this.httpRequest, | this.httpRequest, | ||||
▲ Show 20 Lines • Show All 639 Lines • Show Last 20 Lines |