Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# | |
# git-restore-mtime - Change mtime of files based on commit date of last change | |
# | |
# Copyright (C) 2012 Rodrigo Silva (MestreLion) <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. See <http://www.gnu.org/licenses/gpl.html> | |
# | |
# Source: https://github.com/MestreLion/git-tools | |
# Version: July 13, 2023 (commit hash 5f832e72453e035fccae9d63a5056918d64476a2) | |
""" | |
Change the modification time (mtime) of files in work tree, based on the | |
date of the most recent commit that modified the file, including renames. | |
Ignores untracked files and uncommitted deletions, additions and renames, and | |
by default modifications too. | |
--- | |
Useful prior to generating release tarballs, so each file is archived with a | |
date that is similar to the date when the file was actually last modified, | |
assuming the actual modification date and its commit date are close. | |
""" | |
# TODO: | |
# - Add -z on git whatchanged/ls-files, so we don't deal with filename decoding | |
# - When Python is bumped to 3.7, use text instead of universal_newlines on subprocess | |
# - Update "Statistics for some large projects" with modern hardware and repositories. | |
# - Create a README.md for git-restore-mtime alone. It deserves extensive documentation | |
# - Move Statistics there | |
# - See git-extras as a good example on project structure and documentation | |
# FIXME: | |
# - When current dir is outside the worktree, e.g. using --work-tree, `git ls-files` | |
# assume any relative pathspecs are to worktree root, not the current dir. As such, | |
# relative pathspecs may not work. | |
# - Renames are tricky: | |
# - R100 should not change mtime, but original name is not on filelist. Should | |
# track renames until a valid (A, M) mtime found and then set on current name. | |
# - Should set mtime for both current and original directories. | |
# - Check mode changes with unchanged blobs? | |
# - Check file (A, D) for the directory mtime is not sufficient: | |
# - Renames also change dir mtime, unless rename was on a parent dir | |
# - If most recent change of all files in a dir was a Modification (M), | |
# dir might not be touched at all. | |
# - Dirs containing only subdirectories but no direct files will also | |
# not be touched. They're files' [grand]parent dir, but never their dirname(). | |
# - Some solutions: | |
# - After files done, perform some dir processing for missing dirs, finding latest | |
# file (A, D, R) | |
# - Simple approach: dir mtime is the most recent child (dir or file) mtime | |
# - Use a virtual concept of "created at most at" to fill missing info, bubble up | |
# to parents and grandparents | |
# - When handling [grand]parent dirs, stay inside <pathspec> | |
# - Better handling of merge commits. `-m` is plain *wrong*. `-c/--cc` is perfect, but | |
# painfully slow. First pass without merge commits is not accurate. Maybe add a new | |
# `--accurate` mode for `--cc`? | |
if __name__ != "__main__": | |
raise ImportError("{} should not be used as a module.".format(__name__)) | |
import argparse | |
import datetime | |
import logging | |
import os.path | |
import shlex | |
import signal | |
import subprocess | |
import sys | |
import time | |
__version__ = "2022.12+dev" | |
# Update symlinks only if the platform supports not following them | |
UPDATE_SYMLINKS = bool(os.utime in getattr(os, 'supports_follow_symlinks', [])) | |
# Call os.path.normpath() only if not in a POSIX platform (Windows) | |
NORMALIZE_PATHS = (os.path.sep != '/') | |
# How many files to process in each batch when re-trying merge commits | |
STEPMISSING = 100 | |
# (Extra) keywords for the os.utime() call performed by touch() | |
UTIME_KWS = {} if not UPDATE_SYMLINKS else {'follow_symlinks': False} | |
# Command-line interface ###################################################### | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description=__doc__.split('\n---')[0]) | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('--quiet', '-q', dest='loglevel', | |
action="store_const", const=logging.WARNING, default=logging.INFO, | |
help="Suppress informative messages and summary statistics.") | |
group.add_argument('--verbose', '-v', action="count", help=""" | |
Print additional information for each processed file. | |
Specify twice to further increase verbosity. | |
""") | |
parser.add_argument('--cwd', '-C', metavar="DIRECTORY", help=""" | |
Run as if %(prog)s was started in directory %(metavar)s. | |
This affects how --work-tree, --git-dir and PATHSPEC arguments are handled. | |
See 'man 1 git' or 'git --help' for more information. | |
""") | |
parser.add_argument('--git-dir', dest='gitdir', metavar="GITDIR", help=""" | |
Path to the git repository, by default auto-discovered by searching | |
the current directory and its parents for a .git/ subdirectory. | |
""") | |
parser.add_argument('--work-tree', dest='workdir', metavar="WORKTREE", help=""" | |
Path to the work tree root, by default the parent of GITDIR if it's | |
automatically discovered, or the current directory if GITDIR is set. | |
""") | |
parser.add_argument('--force', '-f', default=False, action="store_true", help=""" | |
Force updating files with uncommitted modifications. | |
Untracked files and uncommitted deletions, renames and additions are | |
always ignored. | |
""") | |
parser.add_argument('--merge', '-m', default=False, action="store_true", help=""" | |
Include merge commits. | |
Leads to more recent times and more files per commit, thus with the same | |
time, which may or may not be what you want. | |
Including merge commits may lead to fewer commits being evaluated as files | |
are found sooner, which can improve performance, sometimes substantially. | |
But as merge commits are usually huge, processing them may also take longer. | |
By default, merge commits are only used for files missing from regular commits. | |
""") | |
parser.add_argument('--first-parent', default=False, action="store_true", help=""" | |
Consider only the first parent, the "main branch", when evaluating merge commits. | |
Only effective when merge commits are processed, either when --merge is | |
used or when finding missing files after the first regular log search. | |
See --skip-missing. | |
""") | |
parser.add_argument('--skip-missing', '-s', dest="missing", default=True, | |
action="store_false", help=""" | |
Do not try to find missing files. | |
If merge commits were not evaluated with --merge and some files were | |
not found in regular commits, by default %(prog)s searches for these | |
files again in the merge commits. | |
This option disables this retry, so files found only in merge commits | |
will not have their timestamp updated. | |
""") | |
parser.add_argument('--no-directories', '-D', dest='dirs', default=True, | |
action="store_false", help=""" | |
Do not update directory timestamps. | |
By default, use the time of its most recently created, renamed or deleted file. | |
Note that just modifying a file will NOT update its directory time. | |
""") | |
parser.add_argument('--test', '-t', default=False, action="store_true", | |
help="Test run: do not actually update any file timestamp.") | |
parser.add_argument('--commit-time', '-c', dest='commit_time', default=False, | |
action='store_true', help="Use commit time instead of author time.") | |
parser.add_argument('--oldest-time', '-o', dest='reverse_order', default=False, | |
action='store_true', help=""" | |
Update times based on the oldest, instead of the most recent commit of a file. | |
This reverses the order in which the git log is processed to emulate a | |
file "creation" date. Note this will be inaccurate for files deleted and | |
re-created at later dates. | |
""") | |
parser.add_argument('--skip-older-than', metavar='SECONDS', type=int, help=""" | |
Ignore files that are currently older than %(metavar)s. | |
Useful in workflows that assume such files already have a correct timestamp, | |
as it may improve performance by processing fewer files. | |
""") | |
parser.add_argument('--skip-older-than-commit', '-N', default=False, | |
action='store_true', help=""" | |
Ignore files older than the timestamp it would be updated to. | |
Such files may be considered "original", likely in the author's repository. | |
""") | |
parser.add_argument('--unique-times', default=False, action="store_true", help=""" | |
Set the microseconds to a unique value per commit. | |
Allows telling apart changes that would otherwise have identical timestamps, | |
as git's time accuracy is in seconds. | |
""") | |
parser.add_argument('pathspec', nargs='*', metavar='PATHSPEC', help=""" | |
Only modify paths matching %(metavar)s, relative to current directory. | |
By default, update all but untracked files and submodules. | |
""") | |
parser.add_argument('--version', '-V', action='version', | |
version='%(prog)s version {version}'.format(version=get_version())) | |
args_ = parser.parse_args() | |
if args_.verbose: | |
args_.loglevel = max(logging.TRACE, logging.DEBUG // args_.verbose) | |
args_.debug = args_.loglevel <= logging.DEBUG | |
return args_ | |
def get_version(version=__version__): | |
if not version.endswith('+dev'): | |
return version | |
try: | |
cwd = os.path.dirname(os.path.realpath(__file__)) | |
return Git(cwd=cwd, errors=False).describe().lstrip('v') | |
except Git.Error: | |
return '-'.join((version, "unknown")) | |
# Helper functions ############################################################ | |
def setup_logging(): | |
"""Add TRACE logging level and corresponding method, return the root logger""" | |
logging.TRACE = TRACE = logging.DEBUG // 2 | |
logging.Logger.trace = lambda _, m, *a, **k: _.log(TRACE, m, *a, **k) | |
return logging.getLogger() | |
def normalize(path): | |
r"""Normalize paths from git, handling non-ASCII characters. | |
Git stores paths as UTF-8 normalization form C. | |
If path contains non-ASCII or non-printable characters, git outputs the UTF-8 | |
in octal-escaped notation, escaping double-quotes and backslashes, and then | |
double-quoting the whole path. | |
https://git-scm.com/docs/git-config#Documentation/git-config.txt-corequotePath | |
This function reverts this encoding, so: | |
normalize(r'"Back\\slash_double\"quote_a\303\247a\303\255"') => | |
r'Back\slash_double"quote_açaí') | |
Paths with invalid UTF-8 encoding, such as single 0x80-0xFF bytes (e.g, from | |
Latin1/Windows-1251 encoding) are decoded using surrogate escape, the same | |
method used by Python for filesystem paths. So 0xE6 ("æ" in Latin1, r'\\346' | |
from Git) is decoded as "\udce6". See https://peps.python.org/pep-0383/ and | |
https://vstinner.github.io/painful-history-python-filesystem-encoding.html | |
Also see notes on `windows/non-ascii-paths.txt` about path encodings on | |
non-UTF-8 platforms and filesystems. | |
""" | |
if path and path[0] == '"': | |
# Python 2: path = path[1:-1].decode("string-escape") | |
# Python 3: https://stackoverflow.com/a/46650050/624066 | |
path = (path[1:-1] # Remove enclosing double quotes | |
.encode('latin1') # Convert to bytes, required by 'unicode-escape' | |
.decode('unicode-escape') # Perform the actual octal-escaping decode | |
.encode('latin1') # 1:1 mapping to bytes, UTF-8 encoded | |
.decode('utf8', 'surrogateescape')) # Decode from UTF-8 | |
if NORMALIZE_PATHS: | |
# Make sure the slash matches the OS; for Windows we need a backslash | |
path = os.path.normpath(path) | |
return path | |
def dummy(*_args, **_kwargs): | |
"""No-op function used in dry-run tests""" | |
def touch(path, mtime): | |
"""The actual mtime update""" | |
os.utime(path, (mtime, mtime), **UTIME_KWS) | |
def touch_ns(path, mtime_ns): | |
"""The actual mtime update, using nanoseconds for unique timestamps""" | |
os.utime(path, None, ns=(mtime_ns, mtime_ns), **UTIME_KWS) | |
def isodate(secs: int): | |
# time.localtime() accepts floats, but discards fractional part | |
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(secs)) | |
def isodate_ns(ns: int): | |
# for integers fromtimestamp() is equivalent and ~16% slower than isodate() | |
return datetime.datetime.fromtimestamp(ns / 1000000000).isoformat(sep=' ') | |
def get_mtime_ns(secs: int, idx: int): | |
# Time resolution for filesystems and functions: | |
# ext-4 and other POSIX filesystems: 1 nanosecond | |
# NTFS (Windows default): 100 nanoseconds | |
# datetime.datetime() (due to 64-bit float epoch): 1 microsecond | |
us = idx % 1000000 # 10**6 | |
return 1000 * (1000000 * secs + us) | |
def get_mtime_path(path): | |
return os.path.getmtime(path) | |
# Git class and parse_log(), the heart of the script ########################## | |
class Git: | |
def __init__(self, workdir=None, gitdir=None, cwd=None, errors=True): | |
self.gitcmd = ['git'] | |
self.errors = errors | |
self._proc = None | |
if workdir: self.gitcmd.extend(('--work-tree', workdir)) | |
if gitdir: self.gitcmd.extend(('--git-dir', gitdir)) | |
if cwd: self.gitcmd.extend(('-C', cwd)) | |
self.workdir, self.gitdir = self._get_repo_dirs() | |
def ls_files(self, paths: list = None): | |
return (normalize(_) for _ in self._run('ls-files --full-name', paths)) | |
def ls_dirty(self, force=False): | |
return (normalize(_[3:].split(' -> ', 1)[-1]) | |
for _ in self._run('status --porcelain') | |
if _[:2] != '??' and (not force or (_[0] in ('R', 'A') | |
or _[1] == 'D'))) | |
def log(self, merge=False, first_parent=False, commit_time=False, | |
reverse_order=False, paths: list = None): | |
cmd = 'whatchanged --pretty={}'.format('%ct' if commit_time else '%at') | |
if merge: cmd += ' -m' | |
if first_parent: cmd += ' --first-parent' | |
if reverse_order: cmd += ' --reverse' | |
return self._run(cmd, paths) | |
def describe(self): | |
return self._run('describe --tags', check=True)[0] | |
def terminate(self): | |
if self._proc is None: | |
return | |
try: | |
self._proc.terminate() | |
except OSError: | |
# Avoid errors on OpenBSD | |
pass | |
def _get_repo_dirs(self): | |
return (os.path.normpath(_) for _ in | |
self._run('rev-parse --show-toplevel --absolute-git-dir', check=True)) | |
def _run(self, cmdstr: str, paths: list = None, output=True, check=False): | |
cmdlist = self.gitcmd + shlex.split(cmdstr) | |
if paths: | |
cmdlist.append('--') | |
cmdlist.extend(paths) | |
popen_args = dict(universal_newlines=True, encoding='utf8') | |
if not self.errors: | |
popen_args['stderr'] = subprocess.DEVNULL | |
log.trace("Executing: %s", ' '.join(cmdlist)) | |
if not output: | |
return subprocess.call(cmdlist, **popen_args) | |
if check: | |
try: | |
stdout: str = subprocess.check_output(cmdlist, **popen_args) | |
return stdout.splitlines() | |
except subprocess.CalledProcessError as e: | |
raise self.Error(e.returncode, e.cmd, e.output, e.stderr) | |
self._proc = subprocess.Popen(cmdlist, stdout=subprocess.PIPE, **popen_args) | |
return (_.rstrip() for _ in self._proc.stdout) | |
def __del__(self): | |
self.terminate() | |
class Error(subprocess.CalledProcessError): | |
"""Error from git executable""" | |
def parse_log(filelist, dirlist, stats, git, merge=False, filterlist=None): | |
mtime = 0 | |
datestr = isodate(0) | |
for line in git.log( | |
merge, | |
args.first_parent, | |
args.commit_time, | |
args.reverse_order, | |
filterlist | |
): | |
stats['loglines'] += 1 | |
# Blank line between Date and list of files | |
if not line: | |
continue | |
# Date line | |
if line[0] != ':': # Faster than `not line.startswith(':')` | |
stats['commits'] += 1 | |
mtime = int(line) | |
if args.unique_times: | |
mtime = get_mtime_ns(mtime, stats['commits']) | |
if args.debug: | |
datestr = isodate(mtime) | |
continue | |
# File line: three tokens if it describes a renaming, otherwise two | |
tokens = line.split('\t') | |
# Possible statuses: | |
# M: Modified (content changed) | |
# A: Added (created) | |
# D: Deleted | |
# T: Type changed: to/from regular file, symlinks, submodules | |
# R099: Renamed (moved), with % of unchanged content. 100 = pure rename | |
# Not possible in log: C=Copied, U=Unmerged, X=Unknown, B=pairing Broken | |
status = tokens[0].split(' ')[-1] | |
file = tokens[-1] | |
# Handles non-ASCII chars and OS path separator | |
file = normalize(file) | |
def do_file(): | |
if args.skip_older_than_commit and get_mtime_path(file) <= mtime: | |
stats['skip'] += 1 | |
return | |
if args.debug: | |
log.debug("%d\t%d\t%d\t%s\t%s", | |
stats['loglines'], stats['commits'], stats['files'], | |
datestr, file) | |
try: | |
touch(os.path.join(git.workdir, file), mtime) | |
stats['touches'] += 1 | |
except Exception as e: | |
log.error("ERROR: %s: %s", e, file) | |
stats['errors'] += 1 | |
def do_dir(): | |
if args.debug: | |
log.debug("%d\t%d\t-\t%s\t%s", | |
stats['loglines'], stats['commits'], | |
datestr, "{}/".format(dirname or '.')) | |
try: | |
touch(os.path.join(git.workdir, dirname), mtime) | |
stats['dirtouches'] += 1 | |
except Exception as e: | |
log.error("ERROR: %s: %s", e, dirname) | |
stats['direrrors'] += 1 | |
if file in filelist: | |
stats['files'] -= 1 | |
filelist.remove(file) | |
do_file() | |
if args.dirs and status in ('A', 'D'): | |
dirname = os.path.dirname(file) | |
if dirname in dirlist: | |
dirlist.remove(dirname) | |
do_dir() | |
# All files done? | |
if not stats['files']: | |
git.terminate() | |
return | |
# Main Logic ################################################################## | |
def main(): | |
start = time.time() # yes, Wall time. CPU time is not realistic for users. | |
stats = {_: 0 for _ in ('loglines', 'commits', 'touches', 'skip', 'errors', | |
'dirtouches', 'direrrors')} | |
logging.basicConfig(level=args.loglevel, format='%(message)s') | |
log.trace("Arguments: %s", args) | |
# First things first: Where and Who are we? | |
if args.cwd: | |
log.debug("Changing directory: %s", args.cwd) | |
try: | |
os.chdir(args.cwd) | |
except OSError as e: | |
log.critical(e) | |
return e.errno | |
# Using both os.chdir() and `git -C` is redundant, but might prevent side effects | |
# `git -C` alone could be enough if we make sure that: | |
# - all paths, including args.pathspec, are processed by git: ls-files, rev-parse | |
# - touch() / os.utime() path argument is always prepended with git.workdir | |
try: | |
git = Git(workdir=args.workdir, gitdir=args.gitdir, cwd=args.cwd) | |
except Git.Error as e: | |
# Not in a git repository, and git already informed user on stderr. So we just... | |
return e.returncode | |
# Get the files managed by git and build file list to be processed | |
if UPDATE_SYMLINKS and not args.skip_older_than: | |
filelist = set(git.ls_files(args.pathspec)) | |
else: | |
filelist = set() | |
for path in git.ls_files(args.pathspec): | |
fullpath = os.path.join(git.workdir, path) | |
# Symlink (to file, to dir or broken - git handles the same way) | |
if not UPDATE_SYMLINKS and os.path.islink(fullpath): | |
log.warning("WARNING: Skipping symlink, no OS support for updates: %s", | |
path) | |
continue | |
# skip files which are older than given threshold | |
if (args.skip_older_than | |
and start - get_mtime_path(fullpath) > args.skip_older_than): | |
continue | |
# Always add files relative to worktree root | |
filelist.add(path) | |
# If --force, silently ignore uncommitted deletions (not in the filesystem) | |
# and renames / additions (will not be found in log anyway) | |
if args.force: | |
filelist -= set(git.ls_dirty(force=True)) | |
# Otherwise, ignore any dirty files | |
else: | |
dirty = set(git.ls_dirty()) | |
if dirty: | |
log.warning("WARNING: Modified files in the working directory were ignored." | |
"\nTo include such files, commit your changes or use --force.") | |
filelist -= dirty | |
# Build dir list to be processed | |
dirlist = set(os.path.dirname(_) for _ in filelist) if args.dirs else set() | |
stats['totalfiles'] = stats['files'] = len(filelist) | |
log.info("{0:,} files to be processed in work dir".format(stats['totalfiles'])) | |
if not filelist: | |
# Nothing to do. Exit silently and without errors, just like git does | |
return | |
# Process the log until all files are 'touched' | |
log.debug("Line #\tLog #\tF.Left\tModification Time\tFile Name") | |
parse_log(filelist, dirlist, stats, git, args.merge, args.pathspec) | |
# Missing files | |
if filelist: | |
# Try to find them in merge logs, if not done already | |
# (usually HUGE, thus MUCH slower!) | |
if args.missing and not args.merge: | |
filterlist = list(filelist) | |
missing = len(filterlist) | |
log.info("{0:,} files not found in log, trying merge commits".format(missing)) | |
for i in range(0, missing, STEPMISSING): | |
parse_log(filelist, dirlist, stats, git, | |
merge=True, filterlist=filterlist[i:i + STEPMISSING]) | |
# Still missing some? | |
for file in filelist: | |
log.warning("WARNING: not found in the log: %s", file) | |
# Final statistics | |
# Suggestion: use git-log --before=mtime to brag about skipped log entries | |
def log_info(msg, *a, width=13): | |
ifmt = '{:%d,}' % (width,) # not using 'n' for consistency with ffmt | |
ffmt = '{:%d,.2f}' % (width,) | |
# %-formatting lacks a thousand separator, must pre-render with .format() | |
log.info(msg.replace('%d', ifmt).replace('%f', ffmt).format(*a)) | |
log_info( | |
"Statistics:\n" | |
"%f seconds\n" | |
"%d log lines processed\n" | |
"%d commits evaluated", | |
time.time() - start, stats['loglines'], stats['commits']) | |
if args.dirs: | |
if stats['direrrors']: log_info("%d directory update errors", stats['direrrors']) | |
log_info("%d directories updated", stats['dirtouches']) | |
if stats['touches'] != stats['totalfiles']: | |
log_info("%d files", stats['totalfiles']) | |
if stats['skip']: log_info("%d files skipped", stats['skip']) | |
if stats['files']: log_info("%d files missing", stats['files']) | |
if stats['errors']: log_info("%d file update errors", stats['errors']) | |
log_info("%d files updated", stats['touches']) | |
if args.test: | |
log.info("TEST RUN - No files modified!") | |
# Keep only essential, global assignments here. Any other logic must be in main() | |
log = setup_logging() | |
args = parse_args() | |
# Set the actual touch() and other functions based on command-line arguments | |
if args.unique_times: | |
touch = touch_ns | |
isodate = isodate_ns | |
# Make sure this is always set last to ensure --test behaves as intended | |
if args.test: | |
touch = dummy | |
# UI done, it's showtime! | |
try: | |
sys.exit(main()) | |
except KeyboardInterrupt: | |
log.info("\nAborting") | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
os.kill(os.getpid(), signal.SIGINT) | |