Spaces:
Paused
Paused
| import EventEmitter from 'events' | |
| import { Transform, pipeline } from 'streamx' | |
| import arrayRemove from 'unordered-array-remove' | |
| import debugFactory from 'debug' | |
| import Wire from 'bittorrent-protocol' | |
| const CONNECT_TIMEOUT_TCP = 5_000 | |
| const CONNECT_TIMEOUT_UTP = 5_000 | |
| const CONNECT_TIMEOUT_WEBRTC = 25_000 | |
| const HANDSHAKE_TIMEOUT = 25_000 | |
| // Types of peers | |
| const TYPE_TCP_INCOMING = 'tcpIncoming' | |
| const TYPE_TCP_OUTGOING = 'tcpOutgoing' | |
| const TYPE_UTP_INCOMING = 'utpIncoming' | |
| const TYPE_UTP_OUTGOING = 'utpOutgoing' | |
| const TYPE_WEBRTC = 'webrtc' | |
| const TYPE_WEBSEED = 'webSeed' | |
| // Source used to obtain the peer | |
| const SOURCE_MANUAL = 'manual' | |
| const SOURCE_TRACKER = 'tracker' | |
| const SOURCE_DHT = 'dht' | |
| const SOURCE_LSD = 'lsd' | |
| const SOURCE_UT_PEX = 'ut_pex' | |
| const debug = debugFactory('webtorrent:peer') | |
| let secure = false | |
| export const enableSecure = () => { | |
| secure = true | |
| } | |
| /** | |
| * Peer. Represents a peer in the torrent swarm. | |
| * | |
| * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) | |
| * @param {string} type the type of the peer | |
| */ | |
| export default class Peer extends EventEmitter { | |
| constructor (id, type) { | |
| super() | |
| this.id = id | |
| this.type = type | |
| debug('new %s Peer %s', type, id) | |
| this.addr = null | |
| this.conn = null | |
| this.swarm = null | |
| this.wire = null | |
| this.source = null | |
| this.connected = false | |
| this.destroyed = false | |
| this.timeout = null // handshake timeout | |
| this.retries = 0 // outgoing TCP connection retry count | |
| this.sentPe1 = false | |
| this.sentPe2 = false | |
| this.sentPe3 = false | |
| this.sentPe4 = false | |
| this.sentHandshake = false | |
| } | |
| /** | |
| * Called once the peer is connected (i.e. fired 'connect' event) | |
| * @param {Socket} conn | |
| */ | |
| onConnect () { | |
| if (this.destroyed) return | |
| this.connected = true | |
| debug('Peer %s connected', this.id) | |
| clearTimeout(this.connectTimeout) | |
| const conn = this.conn | |
| conn.once('end', () => { | |
| this.destroy() | |
| }) | |
| conn.once('close', () => { | |
| this.destroy() | |
| }) | |
| conn.once('finish', () => { | |
| this.destroy() | |
| }) | |
| conn.once('error', err => { | |
| this.destroy(err) | |
| }) | |
| const wire = this.wire = new Wire(this.type, this.retries, secure) | |
| wire.once('end', () => { | |
| this.destroy() | |
| }) | |
| wire.once('close', () => { | |
| this.destroy() | |
| }) | |
| wire.once('finish', () => { | |
| this.destroy() | |
| }) | |
| wire.once('error', err => { | |
| this.destroy(err) | |
| }) | |
| wire.once('pe1', () => { | |
| this.onPe1() | |
| }) | |
| wire.once('pe2', () => { | |
| this.onPe2() | |
| }) | |
| wire.once('pe3', () => { | |
| this.onPe3() | |
| }) | |
| wire.once('pe4', () => { | |
| this.onPe4() | |
| }) | |
| wire.once('handshake', (infoHash, peerId) => { | |
| this.onHandshake(infoHash, peerId) | |
| }) | |
| this.startHandshakeTimeout() | |
| this.setThrottlePipes() | |
| if (this.swarm) { | |
| if (this.type === 'tcpOutgoing') { | |
| if (secure && this.retries === 0 && !this.sentPe1) this.sendPe1() | |
| else if (!this.sentHandshake) this.handshake() | |
| } else if (this.type !== 'tcpIncoming' && !this.sentHandshake) this.handshake() | |
| } | |
| } | |
| sendPe1 () { | |
| this.wire.sendPe1() | |
| this.sentPe1 = true | |
| } | |
| onPe1 () { | |
| this.sendPe2() | |
| } | |
| sendPe2 () { | |
| this.wire.sendPe2() | |
| this.sentPe2 = true | |
| } | |
| onPe2 () { | |
| this.sendPe3() | |
| } | |
| sendPe3 () { | |
| this.wire.sendPe3(this.swarm.infoHash) | |
| this.sentPe3 = true | |
| } | |
| onPe3 (infoHashHash) { | |
| if (this.swarm) { | |
| if (this.swarm.infoHashHash !== infoHashHash) { | |
| this.destroy(new Error('unexpected crypto handshake info hash for this swarm')) | |
| } | |
| this.sendPe4() | |
| } | |
| } | |
| sendPe4 () { | |
| this.wire.sendPe4(this.swarm.infoHash) | |
| this.sentPe4 = true | |
| } | |
| onPe4 () { | |
| if (!this.sentHandshake) this.handshake() | |
| } | |
| clearPipes () { | |
| this.conn.unpipe() | |
| this.wire.unpipe() | |
| } | |
| setThrottlePipes () { | |
| const self = this | |
| pipeline( | |
| this.conn, | |
| this.throttleGroups.down.throttle(), | |
| new Transform({ | |
| transform (chunk, callback) { | |
| self.emit('download', chunk.length) | |
| if (self.destroyed) return | |
| callback(null, chunk) | |
| } | |
| }), | |
| this.wire, | |
| this.throttleGroups.up.throttle(), | |
| new Transform({ | |
| transform (chunk, callback) { | |
| self.emit('upload', chunk.length) | |
| if (self.destroyed) return | |
| callback(null, chunk) | |
| } | |
| }), | |
| this.conn | |
| ) | |
| } | |
| /** | |
| * Called when handshake is received from remote peer. | |
| * @param {string} infoHash | |
| * @param {string} peerId | |
| */ | |
| onHandshake (infoHash, peerId) { | |
| if (!this.swarm) return // `this.swarm` not set yet, so do nothing | |
| if (this.destroyed) return | |
| if (this.swarm.destroyed) { | |
| return this.destroy(new Error('swarm already destroyed')) | |
| } | |
| if (infoHash !== this.swarm.infoHash) { | |
| return this.destroy(new Error('unexpected handshake info hash for this swarm')) | |
| } | |
| if (peerId === this.swarm.peerId) { | |
| return this.destroy(new Error('refusing to connect to ourselves')) | |
| } | |
| debug('Peer %s got handshake %s', this.id, infoHash) | |
| clearTimeout(this.handshakeTimeout) | |
| this.retries = 0 | |
| let addr = this.addr | |
| if (!addr && this.conn.remoteAddress && this.conn.remotePort) { | |
| addr = `${this.conn.remoteAddress}:${this.conn.remotePort}` | |
| } | |
| this.swarm._onWire(this.wire, addr) | |
| // swarm could be destroyed in user's 'wire' event handler | |
| if (!this.swarm || this.swarm.destroyed) return | |
| if (!this.sentHandshake) this.handshake() | |
| } | |
| handshake () { | |
| const opts = { | |
| dht: this.swarm.private ? false : !!this.swarm.client.dht, | |
| fast: true | |
| } | |
| this.wire.handshake(this.swarm.infoHash, this.swarm.client.peerId, opts) | |
| this.sentHandshake = true | |
| } | |
| startConnectTimeout () { | |
| clearTimeout(this.connectTimeout) | |
| const connectTimeoutValues = { | |
| webrtc: CONNECT_TIMEOUT_WEBRTC, | |
| tcpOutgoing: CONNECT_TIMEOUT_TCP, | |
| utpOutgoing: CONNECT_TIMEOUT_UTP | |
| } | |
| this.connectTimeout = setTimeout(() => { | |
| this.destroy(new Error('connect timeout')) | |
| }, connectTimeoutValues[this.type]) | |
| if (this.connectTimeout.unref) this.connectTimeout.unref() | |
| } | |
| startHandshakeTimeout () { | |
| clearTimeout(this.handshakeTimeout) | |
| this.handshakeTimeout = setTimeout(() => { | |
| this.destroy(new Error('handshake timeout')) | |
| }, HANDSHAKE_TIMEOUT) | |
| if (this.handshakeTimeout.unref) this.handshakeTimeout.unref() | |
| } | |
| destroy (err) { | |
| if (this.destroyed) return | |
| this.destroyed = true | |
| this.connected = false | |
| debug('destroy %s %s (error: %s)', this.type, this.id, err && (err.message || err)) | |
| clearTimeout(this.connectTimeout) | |
| clearTimeout(this.handshakeTimeout) | |
| const swarm = this.swarm | |
| const conn = this.conn | |
| const wire = this.wire | |
| this.swarm = null | |
| this.conn = null | |
| this.wire = null | |
| if (swarm && wire) { | |
| arrayRemove(swarm.wires, swarm.wires.indexOf(wire)) | |
| } | |
| if (conn) { | |
| conn.on('error', () => {}) | |
| conn.destroy() | |
| } | |
| if (wire) wire.destroy() | |
| if (swarm) swarm.removePeer(this.id) | |
| } | |
| } | |
| Peer.TYPE_TCP_INCOMING = TYPE_TCP_INCOMING | |
| Peer.TYPE_TCP_OUTGOING = TYPE_TCP_OUTGOING | |
| Peer.TYPE_UTP_INCOMING = TYPE_UTP_INCOMING | |
| Peer.TYPE_UTP_OUTGOING = TYPE_UTP_OUTGOING | |
| Peer.TYPE_WEBRTC = TYPE_WEBRTC | |
| Peer.TYPE_WEBSEED = TYPE_WEBSEED | |
| Peer.SOURCE_MANUAL = SOURCE_MANUAL | |
| Peer.SOURCE_TRACKER = SOURCE_TRACKER | |
| Peer.SOURCE_DHT = SOURCE_DHT | |
| Peer.SOURCE_LSD = SOURCE_LSD | |
| Peer.SOURCE_UT_PEX = SOURCE_UT_PEX | |
| /** | |
| * WebRTC peer connections start out connected, because WebRTC peers require an | |
| * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address | |
| * that lets you refer to a WebRTC endpoint. | |
| */ | |
| Peer.createWebRTCPeer = (conn, swarm, throttleGroups) => { | |
| const peer = new Peer(conn.id, 'webrtc') | |
| peer.conn = conn | |
| peer.swarm = swarm | |
| peer.throttleGroups = throttleGroups | |
| if (peer.conn.connected) { | |
| peer.onConnect() | |
| } else { | |
| const cleanup = () => { | |
| peer.conn.removeListener('connect', onConnect) | |
| peer.conn.removeListener('error', onError) | |
| } | |
| const onConnect = () => { | |
| cleanup() | |
| peer.onConnect() | |
| } | |
| const onError = err => { | |
| cleanup() | |
| peer.destroy(err) | |
| } | |
| peer.conn.once('connect', onConnect) | |
| peer.conn.once('error', onError) | |
| peer.startConnectTimeout() | |
| } | |
| return peer | |
| } | |
| /** | |
| * Incoming TCP peers start out connected, because the remote peer connected to the | |
| * listening port of the TCP server. Until the remote peer sends a handshake, we don't | |
| * know what swarm the connection is intended for. | |
| */ | |
| Peer.createTCPIncomingPeer = (conn, throttleGroups) => { | |
| return Peer._createIncomingPeer(conn, TYPE_TCP_INCOMING, throttleGroups) | |
| } | |
| /** | |
| * Incoming uTP peers start out connected, because the remote peer connected to the | |
| * listening port of the uTP server. Until the remote peer sends a handshake, we don't | |
| * know what swarm the connection is intended for. | |
| */ | |
| Peer.createUTPIncomingPeer = (conn, throttleGroups) => { | |
| return Peer._createIncomingPeer(conn, TYPE_UTP_INCOMING, throttleGroups) | |
| } | |
| /** | |
| * Outgoing TCP peers start out with just an IP address. At some point (when there is an | |
| * available connection), the client can attempt to connect to the address. | |
| */ | |
| Peer.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => { | |
| return Peer._createOutgoingPeer(addr, swarm, TYPE_TCP_OUTGOING, throttleGroups) | |
| } | |
| /** | |
| * Outgoing uTP peers start out with just an IP address. At some point (when there is an | |
| * available connection), the client can attempt to connect to the address. | |
| */ | |
| Peer.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => { | |
| return Peer._createOutgoingPeer(addr, swarm, TYPE_UTP_OUTGOING, throttleGroups) | |
| } | |
| Peer._createIncomingPeer = (conn, type, throttleGroups) => { | |
| const addr = `${conn.remoteAddress}:${conn.remotePort}` | |
| const peer = new Peer(addr, type) | |
| peer.conn = conn | |
| peer.addr = addr | |
| peer.throttleGroups = throttleGroups | |
| peer.onConnect() | |
| return peer | |
| } | |
| Peer._createOutgoingPeer = (addr, swarm, type, throttleGroups) => { | |
| const peer = new Peer(addr, type) | |
| peer.addr = addr | |
| peer.swarm = swarm | |
| peer.throttleGroups = throttleGroups | |
| return peer | |
| } | |
| /** | |
| * Peer that represents a Web Seed (BEP17 / BEP19). | |
| */ | |
| Peer.createWebSeedPeer = (conn, id, swarm, throttleGroups) => { | |
| const peer = new Peer(id, TYPE_WEBSEED) | |
| peer.swarm = swarm | |
| peer.conn = conn | |
| peer.throttleGroups = throttleGroups | |
| peer.onConnect() | |
| return peer | |
| } | |