no1b4me's picture
Upload 5037 files
95f4e64 verified
import EventEmitter from 'events'
import fs from 'fs'
import net from 'net' // browser exclude
import os from 'os' // browser exclude
import path from 'path'
import addrToIPPort from 'addr-to-ip-port'
import BitField from 'bitfield'
import CacheChunkStore from 'cache-chunk-store'
import { chunkStoreWrite } from 'chunk-store-iterator'
import cpus from 'cpus'
import debugFactory from 'debug'
import Discovery from 'torrent-discovery'
import FSChunkStore from 'fs-chunk-store' // browser: `hybrid-chunk-store`
import fetch from 'cross-fetch-ponyfill'
import ImmediateChunkStore from 'immediate-chunk-store'
import ltDontHave from 'lt_donthave'
import MemoryChunkStore from 'memory-chunk-store'
import HybridChunkStore from 'hybrid-chunk-store'
import joinIterator from 'join-async-iterator'
import parallel from 'run-parallel'
import parallelLimit from 'run-parallel-limit'
import parseTorrent, { toMagnetURI, toTorrentFile, remote } from 'parse-torrent'
import Piece from 'torrent-piece'
import queueMicrotask from 'queue-microtask'
import randomIterate from 'random-iterate'
import { hash, arr2hex } from 'uint8-util'
import throughput from 'throughput'
import utMetadata from 'ut_metadata'
import utPex from 'ut_pex' // browser exclude
import File from './file.js'
import Peer from './peer.js'
import RarityMap from './rarity-map.js'
import utp from './utp.cjs' // browser exclude
import WebConn from './webconn.js'
import { Selections } from './selections.js'
import VERSION from '../version.cjs'
const debug = debugFactory('webtorrent:torrent')
const MAX_BLOCK_LENGTH = 128 * 1024
const PIECE_TIMEOUT = 30_000
const CHOKE_TIMEOUT = 5_000
const SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH
const PIPELINE_MIN_DURATION = 0.5
const PIPELINE_MAX_DURATION = 1
const RECHOKE_INTERVAL = 10_000 // 10 seconds
const RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
const DEFAULT_NO_PEERS_INTERVAL = 30_000 // 30 seconds
// IndexedDB chunk stores used in the browser benefit from high concurrency
const FILESYSTEM_CONCURRENCY = process.browser ? cpus().length : 2
const RECONNECT_WAIT = [1_000, 5_000, 15_000]
const USER_AGENT = `WebTorrent/${VERSION} (https://webtorrent.io)`
let TMP
try {
TMP = path.join(fs.statSync('/tmp') && '/tmp', 'webtorrent')
} catch (err) {
TMP = path.join(typeof os.tmpdir === 'function' ? os.tmpdir() : '/', 'webtorrent')
}
const IDLE_CALLBACK = typeof window !== 'undefined' && typeof window.requestIdleCallback === 'function' && window.requestIdleCallback
export default class Torrent extends EventEmitter {
constructor (torrentId, client, opts) {
super()
this._debugId = 'unknown infohash'
this.client = client
this.announce = opts.announce
this.urlList = opts.urlList
this.path = opts.path || TMP
this.addUID = opts.addUID || false
this.rootDir = opts.rootDir || null
this.skipVerify = !!opts.skipVerify
this._store = opts.store || FSChunkStore
this._preloadedStore = opts.preloadedStore || null
this._storeCacheSlots = opts.storeCacheSlots !== undefined ? opts.storeCacheSlots : 20
this._destroyStoreOnDestroy = opts.destroyStoreOnDestroy || false
this.store = null
this.storeOpts = opts.storeOpts
this.alwaysChokeSeeders = opts.alwaysChokeSeeders ?? true
this._getAnnounceOpts = opts.getAnnounceOpts
// if defined, `opts.private` overrides default privacy of torrent
if (typeof opts.private === 'boolean') this.private = opts.private
this.strategy = opts.strategy || 'sequential'
this.maxWebConns = opts.maxWebConns || 4
this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0)
? 0
: (+opts.uploads || 10)
this._rechokeOptimisticWire = null
this._rechokeOptimisticTime = 0
this._rechokeIntervalId = null
this._noPeersIntervalId = null
this._noPeersIntervalTime = opts.noPeersIntervalTime ? opts.noPeersIntervalTime * 1000 : DEFAULT_NO_PEERS_INTERVAL
this._startAsDeselected = opts.deselect || false
this.ready = false
this.destroyed = false
this.paused = opts.paused || false
this.done = false
this.metadata = null
this.files = []
// Pieces that need to be downloaded, indexed by piece index
this.pieces = []
this._amInterested = false
this._selections = new Selections()
this._critical = []
this.wires = [] // open wires (added *after* handshake)
this._queue = [] // queue of outgoing tcp peers to connect to
this._peers = {} // connected peers (addr/peerId -> Peer)
this._peersLength = 0 // number of elements in `this._peers` (cache, for perf)
// stats
this.received = 0
this.uploaded = 0
this._downloadSpeed = throughput()
this._uploadSpeed = throughput()
// for cleanup
this._servers = []
this._xsRequests = []
// TODO: remove this and expose a hook instead
// optimization: don't recheck every file if it hasn't changed
this._fileModtimes = opts.fileModtimes
if (torrentId !== null) this._onTorrentId(torrentId)
this._debug('new torrent')
}
get timeRemaining () {
if (this.done) return 0
if (this.downloadSpeed === 0) return Infinity
return ((this.length - this.downloaded) / this.downloadSpeed) * 1000
}
get downloaded () {
if (!this.bitfield) return 0
let downloaded = 0
for (let index = 0, len = this.pieces.length; index < len; ++index) {
if (this.bitfield.get(index)) { // verified data
downloaded += (index === len - 1) ? this.lastPieceLength : this.pieceLength
} else { // "in progress" data
const piece = this.pieces[index]
downloaded += (piece.length - piece.missing)
}
}
return downloaded
}
// TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode.
// Object.defineProperty(Storage.prototype, 'numMissing', {
// get: function () {
// var self = this
// var numMissing = self.pieces.length
// for (var index = 0, len = self.pieces.length; index < len; index++) {
// numMissing -= self.bitfield.get(index)
// }
// return numMissing
// }
// })
get downloadSpeed () { return this._downloadSpeed() }
get uploadSpeed () { return this._uploadSpeed() }
get progress () { return this.length ? this.downloaded / this.length : 0 }
get ratio () { return this.uploaded / (this.received || this.length) }
get numPeers () { return this.wires.length }
get torrentFileBlob () {
if (!this.torrentFile) return null
return new Blob([this.torrentFile], { type: 'application/x-bittorrent' })
}
get _numQueued () {
return this._queue.length + (this._peersLength - this._numConns)
}
get _numConns () {
let numConns = 0
for (const id in this._peers) {
if (this._peers[id].connected) numConns += 1
}
return numConns
}
async _onTorrentId (torrentId) {
if (this.destroyed) return
let parsedTorrent
try { parsedTorrent = await parseTorrent(torrentId) } catch (err) {}
if (parsedTorrent) {
// Attempt to set infoHash property synchronously
this.infoHash = parsedTorrent.infoHash
this._debugId = arr2hex(parsedTorrent.infoHash).substring(0, 7)
queueMicrotask(() => {
if (this.destroyed) return
this._onParsedTorrent(parsedTorrent)
})
} else {
// If torrentId failed to parse, it could be in a form that requires an async
// operation, i.e. http/https link, filesystem path, or Blob.
remote(torrentId, (err, parsedTorrent) => {
if (this.destroyed) return
if (err) return this._destroy(err)
this._onParsedTorrent(parsedTorrent)
})
}
}
_onParsedTorrent (parsedTorrent) {
if (this.destroyed) return
this._processParsedTorrent(parsedTorrent)
if (!this.infoHash) {
return this._destroy(new Error('Malformed torrent data: No info hash'))
}
this._rechokeIntervalId = setInterval(() => {
this._rechoke()
}, RECHOKE_INTERVAL)
if (this._rechokeIntervalId.unref) this._rechokeIntervalId.unref()
// Private 'infoHash' event allows client.add to check for duplicate torrents and
// destroy them before the normal 'infoHash' event is emitted. Prevents user
// applications from needing to deal with duplicate 'infoHash' events.
this.emit('_infoHash', this.infoHash)
if (this.destroyed) return
this.emit('infoHash', this.infoHash)
if (this.destroyed) return // user might destroy torrent in event handler
if (this.client.listening) {
this._onListening()
} else {
this.client.once('listening', () => {
this._onListening()
})
}
}
_processParsedTorrent (parsedTorrent) {
this._debugId = arr2hex(parsedTorrent.infoHash).substring(0, 7)
if (typeof this.private !== 'undefined') {
// `private` option overrides default, only if it's defined
parsedTorrent.private = this.private
}
if (Array.isArray(this.announce)) {
// Allow specifying trackers via `opts` parameter
parsedTorrent.announce = parsedTorrent.announce.concat(this.announce)
}
if (this.client.tracker && Array.isArray(this.client.tracker.announce) && !parsedTorrent.private) {
// If the client has a default tracker, add it to the announce list if torrent is not private
parsedTorrent.announce = parsedTorrent.announce.concat(this.client.tracker.announce)
}
if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !parsedTorrent.private) {
// So `webtorrent-hybrid` can force specific trackers to be used
parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE)
}
if (this.urlList) {
// Allow specifying web seeds via `opts` parameter
parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList)
}
// remove duplicates by converting to Set and back
parsedTorrent.announce = Array.from(new Set(parsedTorrent.announce))
parsedTorrent.urlList = Array.from(new Set(parsedTorrent.urlList))
Object.assign(this, parsedTorrent)
this.magnetURI = toMagnetURI(parsedTorrent)
this.torrentFile = toTorrentFile(parsedTorrent)
}
_onListening () {
if (this.destroyed) return
if (this.info) {
// if full metadata was included in initial torrent id, use it immediately. Otherwise,
// wait for torrent-discovery to find peers and ut_metadata to get the metadata.
this._onMetadata(this)
} else {
if (this.xs) this._getMetadataFromServer()
this._startDiscovery()
}
}
_startDiscovery () {
if (this.discovery || this.destroyed) return
let trackerOpts = this.client.tracker
if (trackerOpts) {
trackerOpts = Object.assign({}, this.client.tracker, {
getAnnounceOpts: () => {
if (this.destroyed) return
const opts = {
uploaded: this.uploaded,
downloaded: this.downloaded,
left: Math.max(this.length - this.downloaded, 0)
}
if (this.client.tracker.getAnnounceOpts) {
Object.assign(opts, this.client.tracker.getAnnounceOpts())
}
if (this._getAnnounceOpts) {
// TODO: consider deprecating this, as it's redundant with the former case
Object.assign(opts, this._getAnnounceOpts())
}
return opts
}
})
}
// add BEP09 peer-address
if (this.peerAddresses) {
this.peerAddresses.forEach(peer => this.addPeer(peer, Peer.SOURCE_MANUAL))
}
// begin discovering peers via DHT and trackers
this.discovery = new Discovery({
infoHash: this.infoHash,
announce: this.announce,
peerId: this.client.peerId,
dht: !this.private && this.client.dht,
tracker: trackerOpts,
port: this.client.torrentPort,
userAgent: USER_AGENT,
lsd: this.client.lsd
})
this.discovery.on('error', (err) => {
this._destroy(err)
})
this.discovery.on('peer', (peer, source) => {
this._debug('peer %s discovered via %s', peer, source)
// Don't create new outgoing connections when torrent is done and seedOutgoingConnections is false.
if (!this.client.seedOutgoingConnections && this.done) {
this._debug('ignoring peer %s: torrent is done and seedOutgoingConnections is false', peer)
return
}
this.addPeer(peer, source)
})
this.discovery.on('trackerAnnounce', () => {
this.emit('trackerAnnounce')
})
this.discovery.on('dhtAnnounce', () => {
this.emit('dhtAnnounce')
})
this.discovery.on('warning', (err) => {
this.emit('warning', err)
})
this._noPeersIntervalId = setInterval(() => {
if (this.destroyed) return
const counters = {
[Peer.SOURCE_TRACKER]: {
enabled: !!this.client.tracker,
numPeers: 0
},
[Peer.SOURCE_DHT]: {
enabled: !!this.client.dht,
numPeers: 0
},
[Peer.SOURCE_LSD]: {
enabled: !!this.client.lsd,
numPeers: 0
},
[Peer.SOURCE_UT_PEX]: {
enabled: (this.client.utPex && typeof utPex === 'function'),
numPeers: 0
}
}
for (const peer of Object.values(this._peers)) {
const counter = counters[peer.source]
if (typeof counter !== 'undefined') counter.numPeers++
}
for (const source of Object.keys(counters)) {
const counter = counters[source]
if (counter.enabled && counter.numPeers === 0) this.emit('noPeers', source)
}
}, this._noPeersIntervalTime)
if (this._noPeersIntervalId.unref) this._noPeersIntervalId.unref()
}
_getMetadataFromServer () {
// to allow function hoisting
const self = this
const urls = Array.isArray(this.xs) ? this.xs : [this.xs]
self._xsRequestsController = new AbortController()
const signal = self._xsRequestsController.signal
const tasks = urls.map(url => cb => {
getMetadataFromURL(url, cb)
})
parallel(tasks)
async function getMetadataFromURL (url, cb) {
if (url.indexOf('http://') !== 0 && url.indexOf('https://') !== 0) {
self.emit('warning', new Error(`skipping non-http xs param: ${url}`))
return cb(null)
}
const opts = {
method: 'GET',
headers: {
'user-agent': USER_AGENT
},
signal
}
let res
try {
res = await fetch(url, opts)
} catch (err) {
self.emit('warning', new Error(`http error from xs param: ${url}`))
return cb(null)
}
if (self.destroyed) return cb(null)
if (self.metadata) return cb(null)
if (res.status !== 200) {
self.emit('warning', new Error(`non-200 status code ${res.status} from xs param: ${url}`))
return cb(null)
}
let torrent
try {
torrent = new Uint8Array(await res.arrayBuffer())
} catch (e) {
self.emit('warning', e)
return cb(null)
}
let parsedTorrent
try {
parsedTorrent = await parseTorrent(torrent)
} catch (err) {}
if (!parsedTorrent) {
self.emit('warning', new Error(`got invalid torrent file from xs param: ${url}`))
return cb(null)
}
if (parsedTorrent.infoHash !== self.infoHash) {
self.emit('warning', new Error(`got torrent file with incorrect info hash from xs param: ${url}`))
return cb(null)
}
self._onMetadata(parsedTorrent)
cb(null)
}
}
/**
* Called when the full torrent metadata is received.
*/
async _onMetadata (metadata) {
if (this.metadata || this.destroyed) return
this._debug('got metadata')
this._xsRequestsController?.abort()
this._xsRequestsController = null
let parsedTorrent
if (metadata && metadata.infoHash) {
// `metadata` is a parsed torrent (from parse-torrent module)
parsedTorrent = metadata
} else {
try {
parsedTorrent = await parseTorrent(metadata)
} catch (err) {
return this._destroy(err)
}
}
this._processParsedTorrent(parsedTorrent)
this.metadata = this.torrentFile
// add web seed urls (BEP19)
if (this.client.enableWebSeeds) {
this.urlList.forEach(url => {
this.addWebSeed(url)
})
}
this._rarityMap = new RarityMap(this)
this.files = this.files.map(file => new File(this, file))
let rawStore = this._preloadedStore
if (!rawStore) {
rawStore = new this._store(this.pieceLength, {
...this.storeOpts,
torrent: this,
path: this.path,
files: this.files,
length: this.length,
name: this.name + ' - ' + this.infoHash.slice(0, 8),
addUID: this.addUID,
rootDir: this.rootDir,
max: this._storeCacheSlots
})
}
// don't use the cache if the store is already in memory
if (this._storeCacheSlots > 0 && !(rawStore instanceof MemoryChunkStore || rawStore instanceof HybridChunkStore)) {
rawStore = new CacheChunkStore(rawStore, {
max: this._storeCacheSlots
})
}
this.store = new ImmediateChunkStore(
rawStore
)
// Select only specified files (BEP53) http://www.bittorrent.org/beps/bep_0053.html
if (this.so && !this._startAsDeselected) {
this.files.forEach((v, i) => {
if (this.so.includes(i)) {
this.files[i].select()
}
})
} else {
// start off selecting the entire torrent with low priority
if (this.pieces.length !== 0 && !this._startAsDeselected) {
this.select(0, this.pieces.length - 1, 0)
}
}
this._hashes = this.pieces
this.pieces = this.pieces.map((hash, i) => {
const pieceLength = (i === this.pieces.length - 1)
? this.lastPieceLength
: this.pieceLength
return new Piece(pieceLength)
})
this._reservations = this.pieces.map(() => [])
this.bitfield = new BitField(this.pieces.length)
// Emit 'metadata' before 'ready' and 'done'
this.emit('metadata')
// User might destroy torrent in response to 'metadata' event
if (this.destroyed) return
if (this.skipVerify) {
// Skip verifying exisitng data and just assume it's correct
this._markAllVerified()
this._onStore()
} else {
const onPiecesVerified = (err) => {
if (err) return this._destroy(err)
this._debug('done verifying')
this._onStore()
}
this._debug('verifying existing torrent data')
if (this._fileModtimes && this._store === FSChunkStore) {
// don't verify if the files haven't been modified since we last checked
this.getFileModtimes((err, fileModtimes) => {
if (err) return this._destroy(err)
const unchanged = this.files.map((_, index) => fileModtimes[index] === this._fileModtimes[index]).every(x => x)
if (unchanged) {
this._markAllVerified()
this._onStore()
} else {
this._verifyPieces(onPiecesVerified)
}
})
} else {
this._verifyPieces(onPiecesVerified)
}
}
}
/*
* TODO: remove this
* Gets the last modified time of every file on disk for this torrent.
* Only valid in Node, not in the browser.
*/
getFileModtimes (cb) {
const ret = []
parallelLimit(this.files.map((file, index) => cb => {
const filePath = this.addUID ? path.join(this.name + ' - ' + this.infoHash.slice(0, 8)) : path.join(this.path, file.path)
fs.stat(filePath, (err, stat) => {
if (err && err.code !== 'ENOENT') return cb(err)
ret[index] = stat && stat.mtime.getTime()
cb(null)
})
}), FILESYSTEM_CONCURRENCY, err => {
this._debug('done getting file modtimes')
cb(err, ret)
})
}
_verifyPieces (cb) {
parallelLimit(this.pieces.map((piece, index) => cb => {
if (this.destroyed) return cb(new Error('torrent is destroyed'))
const getOpts = {}
// Specify length for the last piece in case it is zero-padded
if (index === this.pieces.length - 1) {
getOpts.length = this.lastPieceLength
}
this.store.get(index, getOpts, async (err, buf) => {
if (this.destroyed) return cb(new Error('torrent is destroyed'))
if (err) return queueMicrotask(() => cb(null)) // ignore error
const hex = await hash(buf, 'hex')
if (this.destroyed) return cb(new Error('torrent is destroyed'))
if (hex === this._hashes[index]) {
this._debug('piece verified %s', index)
this._markVerified(index)
} else {
this._markUnverified(index)
this._debug('piece invalid %s', index)
}
cb(null)
})
}), FILESYSTEM_CONCURRENCY, cb)
}
rescanFiles (cb) {
if (this.destroyed) throw new Error('torrent is destroyed')
if (!cb) cb = noop
this._verifyPieces((err) => {
if (err) {
this._destroy(err)
return cb(err)
}
this._checkDone()
cb(null)
})
}
_markAllVerified () {
for (let index = 0; index < this.pieces.length; index++) {
this._markVerified(index)
}
}
_markVerified (index) {
this.pieces[index] = null
this._reservations[index] = null
this.bitfield.set(index, true)
this.emit('verified', index)
}
_markUnverified (index) {
const len = (index === this.pieces.length - 1)
? this.lastPieceLength
: this.pieceLength
this.pieces[index] = new Piece(len)
this.bitfield.set(index, false)
if (!this._startAsDeselected) this.select(index, index, 1)
this.files.forEach(file => {
if (file.done && file.includes(index)) file.done = false
})
}
_hasAllPieces () {
for (let index = 0; index < this.pieces.length; index++) {
if (!this.bitfield.get(index)) return false
}
return true
}
_hasNoPieces () {
return !this._hasMorePieces(0)
}
_hasMorePieces (threshold) {
let count = 0
for (let index = 0; index < this.pieces.length; index++) {
if (this.bitfield.get(index)) {
count += 1
if (count > threshold) return true
}
}
return false
}
/**
* Called when the metadata, listening server, and underlying chunk store is initialized.
*/
_onStore () {
if (this.destroyed) return
this._debug('on store')
// Start discovery before emitting 'ready'
this._startDiscovery()
this.ready = true
this.emit('ready')
// Files may start out done if the file was already in the store
this._checkDone()
// In case any selections were made before torrent was ready
this._updateSelections()
// Start requesting pieces after we have initially verified them
this.wires.forEach(wire => {
// If we didn't have the metadata at the time ut_metadata was initialized for this
// wire, we still want to make it available to the peer in case they request it.
if (wire.ut_metadata) wire.ut_metadata.setMetadata(this.metadata)
this._onWireWithMetadata(wire)
})
}
destroy (opts, cb) {
if (typeof opts === 'function') return this.destroy(null, opts)
this._destroy(null, opts, cb)
}
_destroy (err, opts, cb) {
if (typeof opts === 'function') return this._destroy(err, null, opts)
if (this.destroyed) return
this.destroyed = true
this._debug('destroy')
this.client._remove(this)
this._selections.clear()
clearInterval(this._rechokeIntervalId)
clearInterval(this._noPeersIntervalId)
this._xsRequestsController?.abort()
if (this._rarityMap) {
this._rarityMap.destroy()
}
for (const id in this._peers) {
this.removePeer(id)
}
this.files.forEach(file => {
if (file instanceof File) file._destroy()
})
const tasks = this._servers.map(server => cb => {
server.destroy(cb)
})
if (this.discovery) {
tasks.push(cb => {
this.discovery.destroy(cb)
})
}
if (this.store) {
let destroyStore = this._destroyStoreOnDestroy
if (opts && opts.destroyStore !== undefined) {
destroyStore = opts.destroyStore
}
tasks.push(cb => {
if (destroyStore) {
this.store.destroy(cb)
} else {
this.store.close(cb)
}
})
}
parallel(tasks, cb)
if (err) {
// Torrent errors are emitted at `torrent.on('error')`. If there are no 'error'
// event handlers on the torrent instance, then the error will be emitted at
// `client.on('error')`. This prevents throwing an uncaught exception
// (unhandled 'error' event), but it makes it impossible to distinguish client
// errors versus torrent errors. Torrent errors are not fatal, and the client
// is still usable afterwards. Therefore, always listen for errors in both
// places (`client.on('error')` and `torrent.on('error')`).
if (this.listenerCount('error') === 0) {
this.client.emit('error', err)
} else {
this.emit('error', err)
}
}
this.emit('close')
this.client = null
this.files = []
this.discovery = null
this.store = null
this._rarityMap = null
this._peers = null
this._servers = null
this._xsRequests = null
}
addPeer (peer, source) {
if (this.destroyed) throw new Error('torrent is destroyed')
if (!this.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event')
let host
if (this.client.blocked) {
if (typeof peer === 'string') {
let parts
try {
parts = addrToIPPort(peer)
} catch (e) {
this._debug('ignoring peer: invalid %s', peer)
this.emit('invalidPeer', peer)
return false
}
host = parts[0]
} else if (typeof peer.remoteAddress === 'string') {
host = peer.remoteAddress
}
if (host && this.client.blocked.contains(host)) {
this._debug('ignoring peer: blocked %s', peer)
if (typeof peer !== 'string') peer.destroy()
this.emit('blockedPeer', peer)
return false
}
}
// if the utp connection fails to connect, then it is replaced with a tcp connection to the same ip:port
const type = (this.client.utp && this._isIPv4(host)) ? 'utp' : 'tcp'
const wasAdded = !!this._addPeer(peer, type, source)
if (wasAdded) {
this.emit('peer', peer)
} else {
this.emit('invalidPeer', peer)
}
return wasAdded
}
_addPeer (peer, type, source) {
if (this.destroyed) {
if (typeof peer !== 'string') peer.destroy()
return null
}
if (typeof peer === 'string' && !this._validAddr(peer)) {
this._debug('ignoring peer: invalid %s', peer)
return null
}
const id = (peer && peer.id) || peer
if (this._peers[id]) {
this._debug('ignoring peer: duplicate (%s)', id)
if (typeof peer !== 'string') peer.destroy()
return null
}
if (this.paused) {
this._debug('ignoring peer: torrent is paused')
if (typeof peer !== 'string') peer.destroy()
return null
}
this._debug('add peer %s', id)
let newPeer
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
newPeer = type === 'utp'
? Peer.createUTPOutgoingPeer(peer, this, this.client.throttleGroups)
: Peer.createTCPOutgoingPeer(peer, this, this.client.throttleGroups)
} else {
// `peer` is a WebRTC connection (simple-peer)
newPeer = Peer.createWebRTCPeer(peer, this, this.client.throttleGroups)
}
this._registerPeer(newPeer)
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
this._queue.push(newPeer)
this._drain()
}
return newPeer
}
addWebSeed (urlOrConn) {
if (this.destroyed) throw new Error('torrent is destroyed')
let id
let conn
if (typeof urlOrConn === 'string') {
id = urlOrConn
if (!/^https?:\/\/.+/.test(id)) {
this.emit('warning', new Error(`ignoring invalid web seed: ${id}`))
this.emit('invalidPeer', id)
return
}
if (this._peers[id]) {
this.emit('warning', new Error(`ignoring duplicate web seed: ${id}`))
this.emit('invalidPeer', id)
return
}
conn = new WebConn(id, this)
} else if (urlOrConn && typeof urlOrConn.connId === 'string') {
conn = urlOrConn
id = conn.connId
if (this._peers[id]) {
this.emit('warning', new Error(`ignoring duplicate web seed: ${id}`))
this.emit('invalidPeer', id)
return
}
} else {
this.emit('warning', new Error('addWebSeed must be passed a string or connection object with id property'))
return
}
this._debug('add web seed %s', id)
const newPeer = Peer.createWebSeedPeer(conn, id, this, this.client.throttleGroups)
this._registerPeer(newPeer)
this.emit('peer', id)
}
/**
* Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a
* peer that has already sent a handshake.
*/
_addIncomingPeer (peer) {
if (this.destroyed) return peer.destroy(new Error('torrent is destroyed'))
if (this.paused) return peer.destroy(new Error('torrent is paused'))
this._debug('add incoming peer %s', peer.id)
this._registerPeer(peer)
}
_registerPeer (newPeer) {
newPeer.on('download', downloaded => {
if (this.destroyed) return
this.received += downloaded
this._downloadSpeed(downloaded)
this.client._downloadSpeed(downloaded)
this.emit('download', downloaded)
if (this.destroyed) return
this.client.emit('download', downloaded)
})
newPeer.on('upload', uploaded => {
if (this.destroyed) return
this.uploaded += uploaded
this._uploadSpeed(uploaded)
this.client._uploadSpeed(uploaded)
this.emit('upload', uploaded)
if (this.destroyed) return
this.client.emit('upload', uploaded)
})
this._peers[newPeer.id] = newPeer
this._peersLength += 1
}
removePeer (peer) {
const id = peer?.id || peer
if (peer && !peer.id) peer = this._peers?.[id]
if (!peer) return
peer.destroy()
if (this.destroyed) return
this._debug('removePeer %s', id)
delete this._peers[id]
this._peersLength -= 1
// If torrent swarm was at capacity before, try to open a new connection now
this._drain()
}
_select (start, end, priority, notify, isStreamSelection = false) {
if (this.destroyed) throw new Error('torrent is destroyed')
if (start < 0 || end < start || this.pieces.length <= end) {
throw new Error(`invalid selection ${start} : ${end}`)
}
priority = Number(priority) || 0
this._debug('select %s-%s (priority %s)', start, end, priority)
this._selections.insert({
from: start,
to: end,
offset: 0,
priority,
notify,
isStreamSelection
})
this._selections.sort((a, b) => b.priority - a.priority)
this._updateSelections()
}
select (start, end, priority, notify) {
this._select(start, end, priority, notify, false)
}
_deselect (from, to, isStreamSelection = false) {
if (this.destroyed) throw new Error('torrent is destroyed')
this._debug('deselect %s-%s', from, to)
this._selections.remove({ from, to, isStreamSelection })
this._updateSelections()
}
deselect (start, end) {
this._deselect(start, end, false)
}
critical (start, end) {
if (this.destroyed) throw new Error('torrent is destroyed')
this._debug('critical %s-%s', start, end)
for (let i = start; i <= end; ++i) {
this._critical[i] = true
}
this._updateSelections()
}
_onWire (wire, addr) {
this._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
this.wires.push(wire)
if (addr) {
// Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers
const parts = addrToIPPort(addr)
wire.remoteAddress = parts[0]
wire.remotePort = parts[1]
}
// When peer sends PORT message, add that DHT node to routing table
if (this.client.dht && this.client.dht.listening) {
wire.on('port', port => {
if (this.destroyed || this.client.dht.destroyed) {
return
}
if (!wire.remoteAddress) {
return this._debug('ignoring PORT from peer with no address')
}
if (port === 0 || port > 65536) {
return this._debug('ignoring invalid PORT from peer')
}
this._debug('port: %s (from %s)', port, addr)
this.client.dht.addNode({ host: wire.remoteAddress, port })
})
}
wire.on('timeout', () => {
this._debug('wire timeout (%s)', addr)
// TODO: this might be destroying wires too eagerly
wire.destroy()
})
// Timeout for piece requests to this peer
if (wire.type !== 'webSeed') { // webseeds always send 'unhave' on http timeout
wire.setTimeout(PIECE_TIMEOUT, true)
}
// Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
wire.setKeepAlive(true)
// use ut_metadata extension
wire.use(utMetadata(this.metadata))
wire.ut_metadata.on('warning', err => {
this._debug('ut_metadata warning: %s', err.message)
})
if (!this.metadata) {
wire.ut_metadata.on('metadata', metadata => {
this._debug('got metadata via ut_metadata')
this._onMetadata(metadata)
})
wire.ut_metadata.fetch()
}
// use ut_pex extension if the torrent is not flagged as private
if (this.client.utPex && typeof utPex === 'function' && !this.private) {
wire.use(utPex())
wire.ut_pex.on('peer', peer => {
// Only add potential new peers when we're not seeding
if (this.done) return
this._debug('ut_pex: got peer: %s (from %s)', peer, addr)
this.addPeer(peer, Peer.SOURCE_UT_PEX)
})
wire.ut_pex.on('dropped', peer => {
// the remote peer believes a given peer has been dropped from the torrent swarm.
// if we're not currently connected to it, then remove it from the queue.
const peerObj = this._peers[peer]
if (peerObj && !peerObj.connected) {
this._debug('ut_pex: dropped peer: %s (from %s)', peer, addr)
this.removePeer(peer)
}
})
wire.once('close', () => {
// Stop sending updates to remote peer
wire.ut_pex.reset()
})
}
wire.use(ltDontHave())
// Hook to allow user-defined `bittorrent-protocol` extensions
// More info: https://github.com/webtorrent/bittorrent-protocol#extension-api
this.emit('wire', wire, addr)
if (this.ready) {
queueMicrotask(() => {
// This allows wire.handshake() to be called (by Peer.onHandshake) before any
// messages get sent on the wire
this._onWireWithMetadata(wire)
})
}
}
_onWireWithMetadata (wire) {
let timeoutId = null
const onChokeTimeout = () => {
if (this.destroyed || wire.destroyed) return
if (this._numQueued > 2 * (this._numConns - this.numPeers) &&
wire.amInterested) {
wire.destroy()
} else {
timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
}
}
let i
const updateSeedStatus = () => {
if (wire.peerPieces.buffer.length !== this.bitfield.buffer.length) return
for (i = 0; i < this.pieces.length; ++i) {
if (!wire.peerPieces.get(i)) return
}
wire.isSeeder = true
if (this.alwaysChokeSeeders) wire.choke() // always choke seeders
}
wire.on('bitfield', () => {
updateSeedStatus()
this._update()
this._updateWireInterest(wire)
})
wire.on('have', () => {
updateSeedStatus()
this._update()
this._updateWireInterest(wire)
})
wire.lt_donthave.on('donthave', () => {
updateSeedStatus()
this._update()
this._updateWireInterest(wire)
})
// fast extension (BEP6)
wire.on('have-all', () => {
wire.isSeeder = true
if (this.alwaysChokeSeeders) wire.choke() // always choke seeders
this._update()
this._updateWireInterest(wire)
})
// fast extension (BEP6)
wire.on('have-none', () => {
wire.isSeeder = false
this._update()
this._updateWireInterest(wire)
})
// fast extension (BEP6)
wire.on('allowed-fast', (index) => {
this._update()
})
wire.once('interested', () => {
wire.unchoke()
})
wire.once('close', () => {
clearTimeout(timeoutId)
})
wire.on('choke', () => {
clearTimeout(timeoutId)
timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
})
wire.on('unchoke', () => {
clearTimeout(timeoutId)
this._update()
})
wire.on('request', (index, offset, length, cb) => {
if (length > MAX_BLOCK_LENGTH) {
// Per spec, disconnect from peers that request >128KB
return wire.destroy()
}
if (this.pieces[index]) return
this.store.get(index, { offset, length }, cb)
})
// always send bitfield or equivalent fast extension message (required)
if (wire.hasFast && this._hasAllPieces()) wire.haveAll()
else if (wire.hasFast && this._hasNoPieces()) wire.haveNone()
else wire.bitfield(this.bitfield)
// initialize interest in case bitfield message was already received before above handler was registered
this._updateWireInterest(wire)
// Send PORT message to peers that support DHT
if (wire.peerExtensions.dht && this.client.dht && this.client.dht.listening) {
wire.port(this.client.dht.address().port)
}
if (wire.type !== 'webSeed') { // do not choke on webseeds
timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
}
wire.isSeeder = false
updateSeedStatus()
}
/**
* Called on selection changes.
*/
_updateSelections () {
if (!this.ready || this.destroyed) return
queueMicrotask(() => {
this._gcSelections()
})
this._updateInterest()
this._update()
}
/**
* Garbage collect selections with respect to the store's current state.
*/
_gcSelections () {
for (const s of this._selections) {
const oldOffset = s.offset
// check for newly downloaded pieces in selection
while (this.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
s.offset += 1
}
if (oldOffset !== s.offset) s.notify?.()
if (s.to !== s.from + s.offset) continue
if (!this.bitfield.get(s.from + s.offset)) continue
s.remove() // remove fully downloaded selection
s.notify?.()
this._updateInterest()
}
if (!this._selections.length) this.emit('idle')
}
/**
* Update interested status for all peers.
*/
_updateInterest () {
const prev = this._amInterested
this._amInterested = !!this._selections.length
this.wires.forEach(wire => this._updateWireInterest(wire))
if (prev === this._amInterested) return
if (this._amInterested) this.emit('interested')
else this.emit('uninterested')
}
_updateWireInterest (wire) {
let interested = false
for (let index = 0; index < this.pieces.length; ++index) {
if (this.pieces[index] && wire.peerPieces.get(index)) {
interested = true
break
}
}
if (interested) wire.interested()
else wire.uninterested()
}
/**
* Heartbeat to update all peers and their requests.
*/
_update () {
if (IDLE_CALLBACK) {
IDLE_CALLBACK(() => this._updateWireWrapper(), { timeout: 250 })
} else {
this._updateWireWrapper()
}
}
_updateWireWrapper () {
if (this.destroyed) return
// update wires in random order for better request distribution
const ite = randomIterate(this.wires)
let wire
while ((wire = ite())) {
this._updateWire(wire)
}
}
/**
* Attempts to update a peer's requests
*/
_updateWire (wire) {
if (wire.destroyed) return false
// to allow function hoisting
const self = this
const minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION)
if (wire.requests.length >= minOutstandingRequests) return
const maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
if (wire.peerChoking) {
if (wire.hasFast && wire.peerAllowedFastSet.length > 0 &&
!this._hasMorePieces(wire.peerAllowedFastSet.length - 1)) {
requestAllowedFastSet()
}
return
}
if (!wire.downloaded) return validateWire()
trySelectWire(false) || trySelectWire(true)
function requestAllowedFastSet () {
if (wire.requests.length >= maxOutstandingRequests) return false
for (const piece of wire.peerAllowedFastSet) {
if (wire.peerPieces.get(piece) && !self.bitfield.get(piece)) {
while (self._request(wire, piece, false) &&
wire.requests.length < maxOutstandingRequests) {
// body intentionally empty
// request all non-reserved blocks in this piece
}
}
if (wire.requests.length < maxOutstandingRequests) continue
return true
}
return false
}
function genPieceFilterFunc (start, end, tried, rank) {
return i => i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i))
}
// TODO: Do we need both validateWire and trySelectWire?
function validateWire () {
if (wire.requests.length) return
let i = self._selections.length
while (i--) {
const next = self._selections.get(i)
let piece
if (self.strategy === 'rarest') {
const start = next.from + next.offset
const end = next.to
const len = end - start + 1
const tried = {}
let tries = 0
const filter = genPieceFilterFunc(start, end, tried)
while (tries < len) {
piece = self._rarityMap.getRarestPiece(filter)
if (piece < 0) break
if (self._request(wire, piece, false)) return
tried[piece] = true
tries += 1
}
} else {
for (piece = next.to; piece >= next.from + next.offset; --piece) {
if (!wire.peerPieces.get(piece)) continue
if (self._request(wire, piece, false)) return
}
}
}
// TODO: wire failed to validate as useful; should we close it?
// probably not, since 'have' and 'bitfield' messages might be coming
}
function speedRanker () {
const speed = wire.downloadSpeed() || 1
if (speed > SPEED_THRESHOLD) return () => true
const secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed
let tries = 10
let ptr = 0
return index => {
if (!tries || self.bitfield.get(index)) return true
let missing = self.pieces[index].missing
for (; ptr < self.wires.length; ptr++) {
const otherWire = self.wires[ptr]
const otherSpeed = otherWire.downloadSpeed()
if (otherSpeed < SPEED_THRESHOLD) continue
if (otherSpeed <= speed) continue
if (!otherWire.peerPieces.get(index)) continue
if ((missing -= otherSpeed * secs) > 0) continue
tries--
return false
}
return true
}
}
function shufflePriority (i) {
let last = i
for (let j = i; j < self._selections.length && self._selections.get(j).priority; j++) {
last = j
}
self._selections.swap(i, last)
}
function trySelectWire (hotswap) {
if (wire.requests.length >= maxOutstandingRequests) return true
const rank = speedRanker()
for (let i = 0; i < self._selections.length; i++) {
const next = self._selections.get(i)
let piece
if (self.strategy === 'rarest') {
const start = next.from + next.offset
const end = next.to
const len = end - start + 1
const tried = {}
let tries = 0
const filter = genPieceFilterFunc(start, end, tried, rank)
while (tries < len) {
piece = self._rarityMap.getRarestPiece(filter)
if (piece < 0) break
while (self._request(wire, piece, self._critical[piece] || hotswap) &&
wire.requests.length < maxOutstandingRequests) {
// body intentionally empty
// request all non-reserved blocks in this piece
}
if (wire.requests.length < maxOutstandingRequests) {
tried[piece] = true
tries++
continue
}
if (next.priority) shufflePriority(i)
return true
}
} else {
for (piece = next.from + next.offset; piece <= next.to; piece++) {
if (!wire.peerPieces.get(piece) || !rank(piece)) continue
while (self._request(wire, piece, self._critical[piece] || hotswap) &&
wire.requests.length < maxOutstandingRequests) {
// body intentionally empty
// request all non-reserved blocks in piece
}
if (wire.requests.length < maxOutstandingRequests) continue
if (next.priority) shufflePriority(i)
return true
}
}
}
return false
}
}
/**
* Called periodically to update the choked status of all peers, handling optimistic
* unchoking as described in BEP3.
*/
_rechoke () {
if (!this.ready) return
// wires in increasing order of quality (pop() gives next best peer)
const wireStack =
this.wires
.map(wire => ({ wire, random: Math.random() })) // insert a random seed for randomizing the sort
.sort((objA, objB) => {
const wireA = objA.wire
const wireB = objB.wire
// prefer peers that send us data faster
if (wireA.downloadSpeed() !== wireB.downloadSpeed()) {
return wireA.downloadSpeed() - wireB.downloadSpeed()
}
// then prefer peers that can download data from us faster
if (wireA.uploadSpeed() !== wireB.uploadSpeed()) {
return wireA.uploadSpeed() - wireB.uploadSpeed()
}
// then prefer already unchoked peers (to minimize fibrillation)
if (wireA.amChoking !== wireB.amChoking) {
return wireA.amChoking ? -1 : 1 // choking < unchoked
}
// otherwise random order
return objA.random - objB.random
})
.map(obj => obj.wire) // return array of wires (remove random seed)
if (this._rechokeOptimisticTime <= 0) {
// clear old optimistic peer, so it can be rechoked normally and then replaced
this._rechokeOptimisticWire = null
} else {
this._rechokeOptimisticTime -= 1
}
let numInterestedUnchoked = 0
// leave one rechoke slot open for optimistic unchoking
while (wireStack.length > 0 && numInterestedUnchoked < this._rechokeNumSlots - 1) {
const wire = wireStack.pop() // next best quality peer
if (wire.isSeeder || wire === this._rechokeOptimisticWire) {
continue
}
wire.unchoke()
// only stop unchoking once we fill the slots with interested peers that will actually download
if (wire.peerInterested) {
numInterestedUnchoked++
}
}
// fill optimistic unchoke slot if empty
if (this._rechokeOptimisticWire === null && this._rechokeNumSlots > 0) {
// don't optimistically unchoke uninterested peers
const remaining = wireStack.filter(wire => wire.peerInterested)
if (remaining.length > 0) {
// select random remaining (not yet unchoked) peer
const newOptimisticPeer = remaining[randomInt(remaining.length)]
newOptimisticPeer.unchoke()
this._rechokeOptimisticWire = newOptimisticPeer
this._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION
}
}
// choke the rest
wireStack
.filter(wire => wire !== this._rechokeOptimisticWire) // except the optimistically unchoked peer
.forEach(wire => wire.choke())
}
/**
* Attempts to cancel a slow block request from another wire such that the
* given wire may effectively swap out the request for one of its own.
*/
_hotswap (wire, index) {
const speed = wire.downloadSpeed()
if (speed < Piece.BLOCK_LENGTH) return false
if (!this._reservations[index]) return false
const r = this._reservations[index]
if (!r) {
return false
}
let minSpeed = Infinity
let minWire
let i
for (i = 0; i < r.length; i++) {
const otherWire = r[i]
if (!otherWire || otherWire === wire) continue
const otherSpeed = otherWire.downloadSpeed()
if (otherSpeed >= SPEED_THRESHOLD) continue
if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue
minWire = otherWire
minSpeed = otherSpeed
}
if (!minWire) return false
for (i = 0; i < r.length; i++) {
if (r[i] === minWire) r[i] = null
}
for (i = 0; i < minWire.requests.length; i++) {
const req = minWire.requests[i]
if (req.piece !== index) continue
this.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0)
}
this.emit('hotswap', minWire, wire, index)
return true
}
/**
* Attempts to request a block from the given wire.
*/
_request (wire, index, hotswap) {
const self = this
const numRequests = wire.requests.length
const isWebSeed = wire.type === 'webSeed'
if (self.bitfield.get(index)) return false
const maxOutstandingRequests = isWebSeed
? Math.min(
getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength),
self.maxWebConns
)
: getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
if (numRequests >= maxOutstandingRequests) return false
// var endGame = (wire.requests.length === 0 && self.store.numMissing < 30)
const piece = self.pieces[index]
let reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
if (reservation === -1 && hotswap && self._hotswap(wire, index)) {
reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
}
if (reservation === -1) return false
let r = self._reservations[index]
if (!r) r = self._reservations[index] = []
let i = r.indexOf(null)
if (i === -1) i = r.length
r[i] = wire
const chunkOffset = piece.chunkOffset(reservation)
const chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation)
wire.request(index, chunkOffset, chunkLength, async function onChunk (err, chunk) {
if (self.destroyed) return
// TODO: what is this for?
if (!self.ready) return self.once('ready', () => { onChunk(err, chunk) })
if (r[i] === wire) r[i] = null
if (piece !== self.pieces[index]) return onUpdateTick()
if (err) {
self._debug(
'error getting piece %s (offset: %s length: %s) from %s: %s',
index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}`,
err.message
)
isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation)
onUpdateTick()
return
}
self._debug(
'got piece %s (offset: %s length: %s) from %s',
index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}`
)
if (!piece.set(reservation, chunk, wire)) return onUpdateTick()
const buf = piece.flush()
// TODO: might need to set self.pieces[index] = null here since sha1 is async
const hex = await hash(buf, 'hex')
if (self.destroyed) return
if (hex === self._hashes[index]) {
self._debug('piece verified %s', index)
self.store.put(index, buf, err => {
if (err) {
self._destroy(err)
return
} else {
self.pieces[index] = null
self._markVerified(index)
self.wires.forEach(wire => {
wire.have(index)
})
}
// We also check `self.destroyed` since `torrent.destroy()` could have been
// called in the `torrent.on('done')` handler, triggered by `_checkDone()`.
if (self._checkDone() && !self.destroyed) self.discovery.complete()
onUpdateTick()
})
} else {
self.pieces[index] = new Piece(piece.length)
self.emit('warning', new Error(`Piece ${index} failed verification`))
onUpdateTick()
}
})
function onUpdateTick () {
queueMicrotask(() => { self._update() })
}
return true
}
_checkDone () {
if (this.destroyed) return
// are any new files done?
this.files.forEach(file => {
if (file.done) return
for (let i = file._startPiece; i <= file._endPiece; ++i) {
if (!this.bitfield.get(i)) return
}
file.done = true
file.emit('done')
this._debug(`file done: ${file.name}`)
})
// is the torrent done? (if all current selections are satisfied, or there are
// no selections, then torrent is done)
let done = true
for (const selection of this._selections) {
for (let piece = selection.from; piece <= selection.to; piece++) {
if (!this.bitfield.get(piece)) {
done = false
break
}
}
if (!done) break
}
if (!this.done && done) {
this.done = true
this._debug(`torrent done: ${this.infoHash}`)
this.emit('done')
} else {
this.done = false
}
this._gcSelections()
return done
}
async load (streams, cb) {
if (this.destroyed) throw new Error('torrent is destroyed')
if (!this.ready) return this.once('ready', () => { this.load(streams, cb) })
if (!Array.isArray(streams)) streams = [streams]
if (!cb) cb = noop
try {
await chunkStoreWrite(this.store, joinIterator(streams), { chunkLength: this.pieceLength })
this._markAllVerified()
this._checkDone()
cb(null)
} catch (err) {
cb(err)
return err
}
}
pause () {
if (this.destroyed) return
this._debug('pause')
this.paused = true
}
resume () {
if (this.destroyed) return
this._debug('resume')
this.paused = false
this._drain()
}
_debug () {
const args = [].slice.call(arguments)
args[0] = `[${this.client ? this.client._debugId : 'No Client'}] [${this._debugId}] ${args[0]}`
debug(...args)
}
/**
* Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
* the queue will usually have only one peer in it, except when there are too
* many peers (over `this.maxConns`) in which case they will just sit in the
* queue until another connection closes.
*/
_drain () {
this._debug('_drain numConns %s maxConns %s', this._numConns, this.client.maxConns)
if (typeof net.connect !== 'function' || this.destroyed || this.paused ||
this._numConns >= this.client.maxConns) {
return
}
this._debug('drain (%s queued, %s/%s peers)', this._numQueued, this.numPeers, this.client.maxConns)
const peer = this._queue.shift()
if (!peer) return // queue could be empty
this._debug('%s connect attempt to %s', peer.type, peer.addr)
const parts = addrToIPPort(peer.addr)
const opts = {
host: parts[0],
port: parts[1]
}
if (this.client.utp && peer.type === Peer.TYPE_UTP_OUTGOING) {
peer.conn = utp.connect(opts.port, opts.host)
} else {
peer.conn = net.connect(opts)
}
const conn = peer.conn
conn.once('connect', () => { if (!this.destroyed) peer.onConnect() })
conn.once('error', err => { peer.destroy(err) })
peer.startConnectTimeout()
// When connection closes, attempt reconnect after timeout (with exponential backoff)
conn.on('close', () => {
if (this.destroyed) return
if (peer.retries >= RECONNECT_WAIT.length) {
if (this.client.utp) {
const newPeer = this._addPeer(peer.addr, 'tcp', peer.source)
if (newPeer) newPeer.retries = 0
} else {
this._debug(
'conn %s closed: will not re-add (max %s attempts)',
peer.addr, RECONNECT_WAIT.length
)
}
return
}
const ms = RECONNECT_WAIT[peer.retries]
this._debug(
'conn %s closed: will re-add to queue in %sms (attempt %s)',
peer.addr, ms, peer.retries + 1
)
const reconnectTimeout = setTimeout(() => {
if (this.destroyed) return
const host = addrToIPPort(peer.addr)[0]
const type = (this.client.utp && this._isIPv4(host)) ? 'utp' : 'tcp'
const newPeer = this._addPeer(peer.addr, type, peer.source)
if (newPeer) newPeer.retries = peer.retries + 1
}, ms)
if (reconnectTimeout.unref) reconnectTimeout.unref()
})
}
/**
* Returns `true` if string is valid IPv4/6 address.
* @param {string} addr
* @return {boolean}
*/
_validAddr (addr) {
let parts
try {
parts = addrToIPPort(addr)
} catch (e) {
return false
}
const host = parts[0]
const port = parts[1]
return port > 0 && port < 65535 &&
!(host === '127.0.0.1' && port === this.client.torrentPort)
}
/**
* Return `true` if string is a valid IPv4 address.
* @param {string} addr
* @return {boolean}
*/
_isIPv4 (addr) {
const IPv4Pattern = /^((?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])[.]){3}(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$/
return IPv4Pattern.test(addr)
}
}
function getBlockPipelineLength (wire, duration) {
let length = 2 + Math.ceil(duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH)
// Honor reqq (maximum number of outstanding request messages) if specified by peer
if (wire.peerExtendedHandshake) {
const reqq = wire.peerExtendedHandshake.reqq
if (typeof reqq === 'number' && reqq > 0) {
length = Math.min(length, reqq)
}
}
return length
}
function getPiecePipelineLength (wire, duration, pieceLength) {
return 1 + Math.ceil(duration * wire.downloadSpeed() / pieceLength)
}
/**
* Returns a random integer in [0,high)
*/
function randomInt (high) {
return Math.random() * high | 0
}
function noop () {}