Pendrokar's picture
relocate folders
ed18ebf
raw
history blame
11.4 kB
import os
import re
import json
import math
from ._errors import FFmpegNormalizeError
from ._cmd_utils import NUL, CommandRunner, dict_to_filter_opts
from ._logger import setup_custom_logger
logger = setup_custom_logger("ffmpeg_normalize")
class MediaStream(object):
def __init__(self, ffmpeg_normalize, media_file, stream_type, stream_id):
"""
Arguments:
media_file {MediaFile} -- parent media file
stream_type {str} -- stream type
stream_id {int} -- Audio stream id
"""
self.ffmpeg_normalize = ffmpeg_normalize
self.media_file = media_file
self.stream_type = stream_type
self.stream_id = stream_id
def __repr__(self):
return "<{}, {} stream {}>".format(
os.path.basename(self.media_file.input_file),
self.stream_type,
self.stream_id,
)
class VideoStream(MediaStream):
def __init__(self, ffmpeg_normalize, media_file, stream_id):
super(VideoStream, self).__init__(
media_file, ffmpeg_normalize, "video", stream_id
)
class SubtitleStream(MediaStream):
def __init__(self, ffmpeg_normalize, media_file, stream_id):
super(SubtitleStream, self).__init__(
media_file, ffmpeg_normalize, "subtitle", stream_id
)
class AudioStream(MediaStream):
def __init__(
self,
ffmpeg_normalize,
media_file,
stream_id,
sample_rate=None,
bit_depth=None,
duration=None,
):
"""
Arguments:
sample_rate {int} -- in Hz
bit_depth {int}
duration {int} -- duration in seconds
"""
super(AudioStream, self).__init__(
media_file, ffmpeg_normalize, "audio", stream_id
)
self.loudness_statistics = {"ebu": None, "mean": None, "max": None}
self.sample_rate = sample_rate
self.bit_depth = bit_depth
self.duration = duration
if (
self.ffmpeg_normalize.normalization_type == "ebu"
and self.duration
and self.duration <= 3
):
logger.warn(
"Audio stream has a duration of less than 3 seconds. "
"Normalization may not work. "
"See https://github.com/slhck/ffmpeg-normalize/issues/87 for more info."
)
def __repr__(self):
return "<{}, audio stream {}>".format(
os.path.basename(self.media_file.input_file), self.stream_id
)
def get_stats(self):
"""
Return statistics
"""
stats = {
"input_file": self.media_file.input_file,
"output_file": self.media_file.output_file,
"stream_id": self.stream_id,
}
stats.update(self.loudness_statistics)
return stats
def get_pcm_codec(self):
if not self.bit_depth:
return "pcm_s16le"
elif self.bit_depth <= 8:
return "pcm_s8"
elif self.bit_depth in [16, 24, 32, 64]:
return f"pcm_s{self.bit_depth}le"
else:
logger.warning(
f"Unsupported bit depth {self.bit_depth}, falling back to pcm_s16le"
)
return "pcm_s16le"
def _get_filter_str_with_pre_filter(self, current_filter):
"""
Get a filter stringΒ for current_filter, with the pre-filter
added before. Applies the input label before.
"""
input_label = f"[0:{self.stream_id}]"
filter_chain = []
if self.media_file.ffmpeg_normalize.pre_filter:
filter_chain.append(self.media_file.ffmpeg_normalize.pre_filter)
filter_chain.append(current_filter)
filter_str = input_label + ",".join(filter_chain)
return filter_str
def parse_volumedetect_stats(self):
"""
Use ffmpeg with volumedetect filter to get the mean volume of the input file.
"""
logger.info(
f"Running first pass volumedetect filter for stream {self.stream_id}"
)
filter_str = self._get_filter_str_with_pre_filter("volumedetect")
cmd = [
self.media_file.ffmpeg_normalize.ffmpeg_exe,
"-nostdin",
"-y",
"-i",
self.media_file.input_file,
"-filter_complex",
filter_str,
"-vn",
"-sn",
"-f",
"null",
NUL,
]
cmd_runner = CommandRunner(cmd)
for progress in cmd_runner.run_ffmpeg_command():
yield progress
output = cmd_runner.get_output()
logger.debug("Volumedetect command output:")
logger.debug(output)
mean_volume_matches = re.findall(r"mean_volume: ([\-\d\.]+) dB", output)
if mean_volume_matches:
self.loudness_statistics["mean"] = float(mean_volume_matches[0])
else:
raise FFmpegNormalizeError(
f"Could not get mean volume for {self.media_file.input_file}"
)
max_volume_matches = re.findall(r"max_volume: ([\-\d\.]+) dB", output)
if max_volume_matches:
self.loudness_statistics["max"] = float(max_volume_matches[0])
else:
raise FFmpegNormalizeError(
f"Could not get max volume for {self.media_file.input_file}"
)
def parse_loudnorm_stats(self):
"""
Run a first pass loudnorm filter to get measured data.
"""
logger.info(f"Running first pass loudnorm filter for stream {self.stream_id}")
opts = {
"i": self.media_file.ffmpeg_normalize.target_level,
"lra": self.media_file.ffmpeg_normalize.loudness_range_target,
"tp": self.media_file.ffmpeg_normalize.true_peak,
"offset": self.media_file.ffmpeg_normalize.offset,
"print_format": "json",
}
if self.media_file.ffmpeg_normalize.dual_mono:
opts["dual_mono"] = "true"
filter_str = self._get_filter_str_with_pre_filter(
"loudnorm=" + dict_to_filter_opts(opts)
)
cmd = [
self.media_file.ffmpeg_normalize.ffmpeg_exe,
"-nostdin",
"-y",
"-i",
self.media_file.input_file,
"-filter_complex",
filter_str,
"-vn",
"-sn",
"-f",
"null",
NUL,
]
cmd_runner = CommandRunner(cmd)
for progress in cmd_runner.run_ffmpeg_command():
yield progress
output = cmd_runner.get_output()
logger.debug("Loudnorm first pass command output:")
logger.debug(output)
output_lines = [line.strip() for line in output.split("\n")]
self.loudness_statistics["ebu"] = AudioStream._parse_loudnorm_output(
output_lines
)
@staticmethod
def _parse_loudnorm_output(output_lines):
loudnorm_start = False
loudnorm_end = False
for index, line in enumerate(output_lines):
if line.startswith("[Parsed_loudnorm"):
loudnorm_start = index + 1
continue
if loudnorm_start and line.startswith("}"):
loudnorm_end = index + 1
break
if not (loudnorm_start and loudnorm_end):
raise FFmpegNormalizeError(
"Could not parse loudnorm stats; no loudnorm-related output found"
)
try:
loudnorm_stats = json.loads(
"\n".join(output_lines[loudnorm_start:loudnorm_end])
)
logger.debug(f"Loudnorm stats parsed: {json.dumps(loudnorm_stats)}")
for key in [
"input_i",
"input_tp",
"input_lra",
"input_thresh",
"output_i",
"output_tp",
"output_lra",
"output_thresh",
"target_offset",
]:
# handle infinite values
if float(loudnorm_stats[key]) == -float("inf"):
loudnorm_stats[key] = -99
elif float(loudnorm_stats[key]) == float("inf"):
loudnorm_stats[key] = 0
else:
# convert to floats
loudnorm_stats[key] = float(loudnorm_stats[key])
return loudnorm_stats
except Exception as e:
raise FFmpegNormalizeError(
f"Could not parse loudnorm stats; wrong JSON format in string: {e}"
)
def get_second_pass_opts_ebu(self):
"""
Return second pass loudnorm filter options string for ffmpeg
"""
if not self.loudness_statistics["ebu"]:
raise FFmpegNormalizeError(
"First pass not run, you must call parse_loudnorm_stats first"
)
input_i = float(self.loudness_statistics["ebu"]["input_i"])
if input_i > 0:
logger.warn(
"Input file had measured input loudness greater than zero ({}), capping at 0".format(
"input_i"
)
)
self.loudness_statistics["ebu"]["input_i"] = 0
opts = {
"i": self.media_file.ffmpeg_normalize.target_level,
"lra": self.media_file.ffmpeg_normalize.loudness_range_target,
"tp": self.media_file.ffmpeg_normalize.true_peak,
"offset": float(self.loudness_statistics["ebu"]["target_offset"]),
"measured_i": float(self.loudness_statistics["ebu"]["input_i"]),
"measured_lra": float(self.loudness_statistics["ebu"]["input_lra"]),
"measured_tp": float(self.loudness_statistics["ebu"]["input_tp"]),
"measured_thresh": float(self.loudness_statistics["ebu"]["input_thresh"]),
"linear": "true",
"print_format": "json",
}
if self.media_file.ffmpeg_normalize.dual_mono:
opts["dual_mono"] = "true"
return "loudnorm=" + dict_to_filter_opts(opts)
def get_second_pass_opts_peakrms(self):
"""
Set the adjustment gain based on chosen option and mean/max volume,
return the matching ffmpeg volume filter.
"""
normalization_type = self.media_file.ffmpeg_normalize.normalization_type
target_level = self.media_file.ffmpeg_normalize.target_level
if normalization_type == "peak":
adjustment = 0 + target_level - self.loudness_statistics["max"]
elif normalization_type == "rms":
adjustment = target_level - self.loudness_statistics["mean"]
else:
raise FFmpegNormalizeError(
"Can only set adjustment for peak and RMS normalization"
)
logger.info(
"Adjusting stream {} by {} dB to reach {}".format(
self.stream_id, adjustment, target_level
)
)
if self.loudness_statistics["max"] + adjustment > 0:
logger.warning(
"Adjusting will lead to clipping of {} dB".format(
self.loudness_statistics["max"] + adjustment
)
)
return f"volume={adjustment}dB"