Source code for gitpandas.repository

"""
.. module:: repository
   :platform: Unix, Windows
   :synopsis: A module for examining a single git repository

.. moduleauthor:: Will McGinnis <will@pedalwrencher.com>


"""

import fnmatch
import json
import logging
import os
import shutil
import tempfile
import time

import git  # Import the full git module
import numpy as np
import pandas as pd
from git import BadName, BadObject, GitCommandError, Repo
from pandas import DataFrame, to_datetime

from gitpandas.cache import multicache
from gitpandas.logging import logger

try:
    from joblib import Parallel, delayed

    _has_joblib = True
except ImportError:
    _has_joblib = False

__author__ = "willmcginnis"


def _parallel_cumulative_blame_func(self_, x, committer, ignore_globs, include_globs):
    blm = self_.blame(
        rev=x["rev"],
        committer=committer,
        ignore_globs=ignore_globs,
        include_globs=include_globs,
    )
    blame_data = json.loads(blm.to_json())
    if "loc" in blame_data:
        x.update(blame_data["loc"])
    else:
        # If no blame data, ensure we have at least one committer with 0 lines
        x["Test User"] = 0

    return x


[docs] class Repository: """A class for analyzing a single git repository. This class provides functionality to analyze a git repository, whether it is a local repository or a remote repository that needs to be cloned. It offers methods for analyzing commit history, blame information, file changes, and other git metrics. Args: working_dir (Optional[str]): Path to the git repository: - If None: Uses current working directory - If local path: Path must contain a .git directory - If git URL: Repository will be cloned to a temporary directory verbose (bool, optional): Whether to print verbose output. Defaults to False. tmp_dir (Optional[str]): Directory to clone remote repositories into. Created if not provided. cache_backend (Optional[object]): Cache backend instance from gitpandas.cache labels_to_add (Optional[List[str]]): Extra labels to add to output DataFrames default_branch (Optional[str]): Name of the default branch to use. If None, will try to detect 'main' or 'master', and if neither exists, will raise ValueError. Attributes: verbose (bool): Whether verbose output is enabled git_dir (str): Path to the git repository repo (git.Repo): GitPython Repo instance cache_backend (Optional[object]): Cache backend being used _labels_to_add (List[str]): Labels to add to DataFrames _git_repo_name (Optional[str]): Repository name for remote repos default_branch (str): Name of the default branch Raises: ValueError: If default_branch is None and neither 'main' nor 'master' branch exists Examples: >>> # Create from local repository >>> repo = Repository('/path/to/repo') >>> # Create from remote repository >>> repo = Repository('git://github.com/user/repo.git') Note: When using remote repositories, they will be cloned to temporary directories. This can be slow for large repositories. """
[docs] def __init__( self, working_dir=None, verbose=False, tmp_dir=None, cache_backend=None, labels_to_add=None, default_branch=None, ): """Initialize a Repository instance. Args: working_dir (Optional[str]): Path to the git repository: - If None: Uses current working directory - If local path: Path must contain a .git directory - If git URL: Repository will be cloned to a temporary directory verbose (bool, optional): Whether to print verbose output. Defaults to False. tmp_dir (Optional[str]): Directory to clone remote repositories into. Created if not provided. cache_backend (Optional[object]): Cache backend instance from gitpandas.cache labels_to_add (Optional[List[str]]): Extra labels to add to output DataFrames default_branch (Optional[str]): Name of the default branch to use. If None, will try to detect 'main' or 'master', and if neither exists, will raise ValueError. Raises: ValueError: If default_branch is None and neither 'main' nor 'master' branch exists """ self.verbose = verbose self.__delete_hook = False self._git_repo_name = None self.cache_backend = cache_backend self._labels_to_add = labels_to_add or [] # Convert PosixPath to string if needed if working_dir is not None: working_dir = str(working_dir) if working_dir is not None: if working_dir.startswith(("git://", "https://", "http://")): # if a tmp dir is passed, clone into that, otherwise make a temp directory. if tmp_dir is None: if self.verbose: print(f"cloning repository: {working_dir} into a temporary location") dir_path = tempfile.mkdtemp() else: dir_path = tmp_dir logger.info(f"Cloning remote repository {working_dir} to {dir_path}") self.repo = Repo.clone_from(working_dir, dir_path) self._git_repo_name = working_dir.split(os.sep)[-1].split(".")[0] self.git_dir = dir_path self.__delete_hook = True else: self.git_dir = working_dir self.repo = Repo(self.git_dir) else: self.git_dir = os.getcwd() self.repo = Repo(self.git_dir) # Smart default branch detection if default_branch is None: if self.has_branch("main"): self.default_branch = "main" elif self.has_branch("master"): self.default_branch = "master" else: raise ValueError( "Could not detect default branch. Neither 'main' nor 'master' exists. " "Please specify default_branch explicitly." ) else: self.default_branch = default_branch if self.verbose: print( f"Repository [{self._repo_name()}] instantiated at directory: {self.git_dir} " f"with default branch: {self.default_branch}" ) logger.info( f"Repository [{self._repo_name()}] instantiated at directory: {self.git_dir} " f"with default branch: {self.default_branch}" )
[docs] def __del__(self): """Cleanup method called when the object is destroyed. Cleans up any temporary directories created for cloned repositories. """ if self.__delete_hook and os.path.exists(self.git_dir): shutil.rmtree(self.git_dir)
@multicache(key_prefix="is_bare", key_list=[]) def is_bare(self): """Checks if this is a bare repository. A bare repository is one without a working tree, typically used as a central repository. Returns: bool: True if this is a bare repository, False otherwise """ return self.repo.bare @multicache(key_prefix="has_coverage", key_list=[]) def has_coverage(self): """Checks if a parseable .coverage file exists in the repository. Attempts to find and parse a .coverage file in the repository root directory. The file must be in a valid format that can be parsed as JSON. Returns: bool: True if a valid .coverage file exists, False otherwise """ return os.path.exists(self.git_dir + os.sep + ".coverage") @multicache(key_prefix="coverage", key_list=[]) def coverage(self): """Analyzes test coverage information from the repository. Attempts to read and parse the .coverage file in the repository root using the coverage.py API. Returns coverage statistics for each file. Returns: pandas.DataFrame: A DataFrame with columns: - filename (str): Path to the file - lines_covered (int): Number of lines covered by tests - total_lines (int): Total number of lines - coverage (float): Coverage percentage - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: Returns an empty DataFrame if no coverage data exists or can't be read. """ if not self.has_coverage(): return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) try: import coverage cov = coverage.Coverage(data_file=os.path.join(self.git_dir, ".coverage")) cov.load() data = cov.get_data() ds = [] for filename in data.measured_files(): try: with open(os.path.join(self.git_dir, filename)) as f: total_lines = sum(1 for _ in f) lines_covered = len(data.lines(filename) or []) short_filename = filename.replace(self.git_dir + os.sep, "") ds.append([short_filename, lines_covered, total_lines]) except OSError as e: logger.warning(f"Could not process coverage for file {filename}: {e}") if not ds: return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) df = DataFrame(ds, columns=["filename", "lines_covered", "total_lines"]) df["coverage"] = df["lines_covered"] / df["total_lines"] df = self._add_labels_to_df(df) return df except FileNotFoundError as e: logger.warning(f"Coverage file not found: {e}") return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) except PermissionError as e: logger.error(f"Permission denied accessing coverage file: {e}") return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) except (ValueError, KeyError) as e: logger.error(f"Invalid coverage data format: {e}") return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) except Exception as e: logger.error(f"Unexpected error analyzing coverage data: {e}", exc_info=True) return DataFrame(columns=["filename", "lines_covered", "total_lines", "coverage"]) @multicache( key_prefix="hours_estimate", key_list=[ "branch", "grouping_window", "single_commit_hours", "limit", "days", "committer", "ignore_globs", "include_globs", ], ) def hours_estimate( self, branch=None, grouping_window=0.5, single_commit_hours=0.5, limit=None, days=None, committer=True, ignore_globs=None, include_globs=None, ): """ inspired by: https://github.com/kimmobrunfeldt/git-hours/blob/8aaeee237cb9d9028e7a2592a25ad8468b1f45e4/index.js#L114-L143 Iterates through the commit history of repo to estimate the time commitement of each author or committer over the course of time indicated by limit/extensions/days/etc. :param branch: (optional, default=None) the branch to return commits for, defaults to default_branch if None :param limit: (optional, default=None) a maximum number of commits to return, None for no limit :param grouping_window: (optional, default=0.5 hours) the threhold for how close two commits need to be to consider them part of one coding session :param single_commit_hours: (optional, default 0.5 hours) the time range to associate with one single commit :param days: (optional, default=None) number of days to return, if limit is None :param committer: (optional, default=True) whether to use committer vs. author :param ignore_globs: (optional, default=None) a list of globs to ignore, default none excludes nothing :param include_globs: (optinal, default=None) a list of globs to include, default of None includes everything. :return: DataFrame """ if branch is None: branch = self.default_branch logger.info(f"Starting hours estimation for branch '{branch}'") max_diff_in_minutes = grouping_window * 60.0 first_commit_addition_in_minutes = single_commit_hours * 60.0 # First get the commit history ch = self.commit_history( branch=branch, limit=limit, days=days, ignore_globs=ignore_globs, include_globs=include_globs, ) # split by committer|author by = "committer" if committer else "author" people = set(ch[by].values) ds = [] for person in people: commits = ch[ch[by] == person] commits_ts = [x * 10e-10 for x in sorted(commits.index.values.tolist())] if len(commits_ts) < 2: ds.append([person, 0]) continue def estimate(index, date, commits_ts): next_ts = commits_ts[index + 1] diff_in_minutes = next_ts - date diff_in_minutes /= 60.0 if diff_in_minutes < max_diff_in_minutes: return diff_in_minutes / 60.0 return first_commit_addition_in_minutes / 60.0 hours = [estimate(a, b, commits_ts) for a, b in enumerate(commits_ts[:-1])] hours = sum(hours) ds.append([person, hours]) df = DataFrame(ds, columns=[by, "hours"]) df = self._add_labels_to_df(df) logger.info(f"Finished hours estimation for branch '{branch}'. Found data for {len(df)} contributors.") return df @multicache(key_prefix="commit_history", key_list=["branch", "limit", "days", "ignore_globs", "include_globs"]) def commit_history( self, branch=None, limit=None, days=None, ignore_globs=None, include_globs=None, ): """ Returns a DataFrame containing the commit history for a branch. Retrieves the commit history for the specified branch, with options to limit the number of commits or time range, and filter which files to include. Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of commits to return days (Optional[int]): If provided, only return commits from the last N days ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include Returns: DataFrame: A DataFrame with columns: - date (datetime, index): Timestamp of the commit - author (str): Name of the commit author - committer (str): Name of the committer - message (str): Commit message - commit_sha (str): Commit hash - lines (int): Total lines changed - insertions (int): Lines added - deletions (int): Lines removed - net (int): Net lines changed (insertions - deletions) - repository (str): Repository name Note: If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ if branch is None: branch = self.default_branch logger.info(f"Fetching commit history for branch '{branch}'. Limit: {limit}, Days: {days}") # setup the data-set of commits commit_count = 0 if limit is None: if days is None: ds = [ [ x.author.name, x.committer.name, x.committed_date, x.message, x.hexsha, self.__check_extension( x.stats.files, ignore_globs=ignore_globs, include_globs=include_globs, ), ] for x in self.repo.iter_commits(branch) ] else: ds = [] c_date = time.time() commits = self.repo.iter_commits(branch) dlim = time.time() - days * 24 * 3600 while c_date > dlim: try: x = next(commits) except StopIteration: break c_date = x.committed_date if c_date > dlim: commit_count += 1 if logger.isEnabledFor(logging.DEBUG) and commit_count % 1000 == 0: logger.debug(f"Processed {commit_count} commits (days filter)...") ds.append( [ x.author.name, x.committer.name, x.committed_date, x.message, x.hexsha, self.__check_extension( x.stats.files, ignore_globs=ignore_globs, include_globs=include_globs, ), ] ) else: ds = [ [ x.author.name, x.committer.name, x.committed_date, x.message, x.hexsha, self.__check_extension( x.stats.files, ignore_globs=ignore_globs, include_globs=include_globs, ), ] for x in self.repo.iter_commits(branch, max_count=limit) ] commit_count = len(ds) # Count is known due to max_count if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Processed {commit_count} commits (limit applied).") # aggregate stats ds = [ x[:-1] + [ sum([x[-1][key]["lines"] for key in x[-1]]), sum([x[-1][key]["insertions"] for key in x[-1]]), sum([x[-1][key]["deletions"] for key in x[-1]]), sum([x[-1][key]["insertions"] for key in x[-1]]) - sum([x[-1][key]["deletions"] for key in x[-1]]), ] for x in ds if len(x[-1].keys()) > 0 ] # make it a pandas dataframe df = DataFrame( ds, columns=[ "author", "committer", "date", "message", "commit_sha", "lines", "insertions", "deletions", "net", ], ) # format the date col and make it the index df["date"] = pd.to_datetime(df["date"], unit="s", utc=True) df = df.set_index("date") df["branch"] = branch df = self._add_labels_to_df(df) logger.info(f"Finished fetching commit history for branch '{branch}'. Found {len(df)} relevant commits.") return df @multicache(key_prefix="file_change_history", key_list=["branch", "limit", "days", "ignore_globs", "include_globs"]) def file_change_history( self, branch=None, limit=None, days=None, ignore_globs=None, include_globs=None, skip_broken=True, ): """Returns data on commit history of files. For each file changed in each commit within the given parameters, returns information about insertions, deletions, and commit metadata. Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of commits to return, None for no limit days (Optional[int]): Number of days to return if limit is None ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include skip_broken (bool, optional): Whether to skip corrupted Git objects. Defaults to True. Returns: pandas.DataFrame: A DataFrame indexed by commit timestamp containing file change data. Columns include: - filename (str): Path to the file - insertions (int): Number of lines inserted - deletions (int): Number of lines deleted - lines (int): Current line count (insertions - deletions) - message (str): Commit message - committer (str): Name of the committer - author (str): Name of the author - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: Files matching both include_globs and ignore_globs patterns will be excluded. """ if branch is None: branch = self.default_branch logger.info( f"Fetching file change history for branch '{branch}'. Limit: {limit}, Days: {days}, " f"Ignore: {ignore_globs}, Include: {include_globs}, Skip Broken: {skip_broken}" ) history = [] if limit is None and days is not None: try: cutoff = pd.Timestamp.now(tz="UTC") - pd.Timedelta(days=days) for x in self.repo.iter_commits(branch): if pd.to_datetime(x.committed_date, unit="s", utc=True) < cutoff: break try: # Access commit properties safely to avoid common Git errors self._process_commit_for_file_history(x, history, ignore_globs, include_globs, skip_broken) except (git.exc.GitCommandError, ValueError) as e: if skip_broken: logger.warning(f"Skipping commit {x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}") continue else: logger.error( f"Error processing commit {x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) raise except Exception as e: if skip_broken: logger.warning( f"Unexpected error processing commit " f"{x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) continue else: logger.error( f"Unexpected error processing commit " f"{x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) raise except git.exc.GitCommandError as e: logger.error(f"Error listing commits for branch '{branch}': {e}") return pd.DataFrame( columns=["filename", "insertions", "deletions", "lines", "message", "committer", "author"] ) else: try: for i, x in enumerate(self.repo.iter_commits(branch)): if limit is not None and i >= limit: break try: # Access commit properties safely to avoid common Git errors self._process_commit_for_file_history(x, history, ignore_globs, include_globs, skip_broken) except (git.exc.GitCommandError, ValueError) as e: if skip_broken: logger.warning(f"Skipping commit {x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}") continue else: logger.error( f"Error processing commit {x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) raise except Exception as e: if skip_broken: logger.warning( f"Unexpected error processing commit " f"{x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) continue else: logger.error( f"Unexpected error processing commit " f"{x.hexsha if hasattr(x, 'hexsha') else 'unknown'}: {e}" ) raise except git.exc.GitCommandError as e: logger.error(f"Error listing commits for branch '{branch}': {e}") return pd.DataFrame( columns=["filename", "insertions", "deletions", "lines", "message", "committer", "author"] ) # Return empty DataFrame with correct columns if no valid commits found if not history: logger.warning(f"No valid file change history found for branch '{branch}'") df = pd.DataFrame( columns=["filename", "insertions", "deletions", "lines", "message", "committer", "author"] ) df = self._add_labels_to_df(df) return df # Create DataFrame from the collected history data df = pd.DataFrame(history) df = df.reset_index(drop=True) # Convert date column to datetime and set as index df["date"] = pd.to_datetime(df["date"], unit="s", utc=True) df = df.set_index(keys=["date"], drop=False) df = df.sort_index() # Add repository labels df = self._add_labels_to_df(df) logger.info(f"Finished fetching file change history for branch '{branch}'. Found {len(df)} file changes.") return df
[docs] def _process_commit_for_file_history(self, commit, history, ignore_globs, include_globs, skip_broken): """Helper method to process a commit for file change history. Args: commit: The commit object to process history: List to append the file change data to ignore_globs: List of glob patterns for files to ignore include_globs: List of glob patterns for files to include skip_broken: Whether to skip errors for specific files """ # Get commit metadata safely try: c_date = commit.committed_date c_message = commit.message c_author = commit.author.name if hasattr(commit.author, "name") else "Unknown" c_committer = commit.committer.name if hasattr(commit.committer, "name") else "Unknown" hexsha = commit.hexsha except (ValueError, AttributeError) as e: if skip_broken: logger.warning(f"Error accessing commit metadata: {e}") return else: raise # Get parent parent = commit.parents[0] if commit.parents else None # Process each file in the commit try: diffs = commit.diff(parent) if parent else commit.diff(git.NULL_TREE) for diff in diffs: try: # Get file path if diff.a_path: path = diff.a_path elif diff.b_path: path = diff.b_path else: logger.warning(f"Skipping diff with no path in commit {hexsha}") continue # Apply glob filtering - skip filtered files if not self.__check_extension({path: path}, ignore_globs, include_globs): continue # Extract the stats insertions = 0 deletions = 0 try: # Check if diff has stats attribute first if hasattr(diff, "stats"): stats = diff.stats insertions = stats.get("insertions", 0) deletions = stats.get("deletions", 0) else: # Alternative approach for newer GitPython versions where stats may not be available # Calculate insertions and deletions manually from the diff diff_content = diff.diff # Check if diff.diff is bytes or string if isinstance(diff_content, bytes): diff_lines = diff_content.decode("utf-8", errors="replace").splitlines() elif isinstance(diff_content, str): diff_lines = diff_content.splitlines() else: # If it's neither bytes nor string, we can't process it logger.warning(f"Diff content has unexpected type: {type(diff_content)}") continue for line in diff_lines: if line.startswith("+") and not line.startswith("+++"): insertions += 1 elif line.startswith("-") and not line.startswith("---"): deletions += 1 except (ValueError, AttributeError, KeyError, UnicodeDecodeError) as e: if skip_broken: logger.warning(f"Error getting diff stats for {path} in commit {hexsha}: {e}") continue else: raise # Add to history history.append( { "filename": path, "insertions": insertions, "deletions": deletions, "lines": insertions - deletions, "message": c_message, "committer": c_committer, "author": c_author, "date": c_date, } ) except Exception as e: if skip_broken: logger.warning(f"Error processing diff in commit {hexsha}: {e}") continue else: raise except git.exc.GitCommandError as e: if skip_broken: logger.warning(f"Git error getting diffs for commit {hexsha}: {e}") return else: raise except Exception as e: if skip_broken: logger.warning(f"Unexpected error processing commit {hexsha}: {e}") return else: raise
@multicache( key_prefix="file_change_rates", key_list=["branch", "limit", "coverage", "days", "ignore_globs", "include_globs"], ) def file_change_rates( self, branch=None, limit=None, coverage=False, days=None, ignore_globs=None, include_globs=None, skip_broken=True, ): """ Returns a DataFrame with file change rates, calculated as the number of changes between the first commit for that file and the last. If coverage is true, it will also calculate test coverage statistics for python source files. Args: branch (Optional[str]): Which branch to analyze. If None, uses default_branch. limit (Optional[int]): How many commits to go back in history. None for all. coverage (bool): Whether to calculate test coverage stats. Defaults to False. days (Optional[int]): If not None, only consider changes in the last x days. ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include skip_broken (bool, optional): Whether to skip corrupted Git objects. Defaults to True. Returns: pandas.DataFrame: A DataFrame with columns: - file (str): Path to the file - unique_committers (int): Number of unique committers - abs_rate_of_change (float): Absolute rate of change - net_rate_of_change (float): Net rate of change - net_change (int): Net lines changed - abs_change (int): Absolute lines changed - edit_rate (float): Edit rate - lines (int): Current line count - repository (str): Repository name Additional columns for any labels specified in labels_to_add """ if branch is None: branch = self.default_branch logger.info( f"Calculating file change rates for branch '{branch}'. " f"Limit: {limit}, Coverage: {coverage}, Days: {days}, " f"Ignore: {ignore_globs}, Include: {include_globs}" ) try: # Get file change history, passing skip_broken parameter fch = self.file_change_history( branch=branch, limit=limit, days=days, ignore_globs=ignore_globs, include_globs=include_globs, skip_broken=skip_broken, ) # If file_change_history returns empty DataFrame, return empty DataFrame if fch.empty: logger.warning(f"No file change history data found for '{branch}'. Returning empty DataFrame.") return pd.DataFrame( columns=[ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", "repository", ] ) # Reset index if not already done to make date a column if isinstance(fch.index, pd.DatetimeIndex) and "date" not in fch.columns: fch = fch.reset_index() # Group by filename and compute detailed stats if fch.shape[0] > 0: file_history = fch.groupby("filename").agg( { "insertions": ["sum", "max", "mean"], "deletions": ["sum", "max", "mean"], "message": lambda x: " | ".join([str(y) for y in x]), "committer": lambda x: " | ".join([str(y) for y in x]), "author": lambda x: " | ".join([str(y) for y in x]), "date": ["max", "min"], } ) # Flatten column names file_history.columns = [ "total_insertions", "insertions_max", "mean_insertions", "total_deletions", "deletions_max", "mean_deletions", "messages", "committers", "authors", "max_date", "min_date", ] # Reset index to make filename a column file_history = file_history.reset_index() # Rename filename to file for consistency file_history = file_history.rename(columns={"filename": "file"}) # Calculate net changes file_history["net_change"] = file_history["total_insertions"] - file_history["total_deletions"] file_history["abs_change"] = file_history["total_insertions"] + file_history["total_deletions"] # Calculate time deltas - ensure it's at least 1 day to avoid division by zero file_history["delta_time"] = file_history["max_date"] - file_history["min_date"] file_history["delta_days"] = file_history["delta_time"].dt.total_seconds() / (60 * 60 * 24) file_history["delta_days"] = file_history["delta_days"].apply(lambda x: max(1.0, x)) # Calculate metrics file_history["net_rate_of_change"] = file_history["net_change"] / file_history["delta_days"] file_history["abs_rate_of_change"] = file_history["abs_change"] / file_history["delta_days"] file_history["edit_rate"] = file_history["abs_rate_of_change"] - file_history["net_rate_of_change"] file_history["unique_committers"] = file_history["committers"].apply(lambda x: len(set(x.split(" | ")))) file_history["lines"] = file_history["net_change"] # For compatibility with simplified version # Select key columns for the output rates = file_history[ [ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", ] ] # Sort by edit rate rates = rates.sort_values("edit_rate", ascending=False) # Add coverage data if requested if coverage: cov = self.coverage() if not cov.empty: # Ensure coverage DataFrame has 'file' as column, not index if "file" not in cov.columns and "filename" in cov.columns: cov = cov.rename(columns={"filename": "file"}) elif "file" not in cov.columns and isinstance(cov.index.name, str) and cov.index.name == "file": cov = cov.reset_index() rates = pd.merge(rates, cov, on="file", how="left") # Add repository name rates = self._add_labels_to_df(rates) return rates else: # If no file history after grouping, return empty DataFrame logger.warning(f"No valid file change data could be analyzed for '{branch}'.") return pd.DataFrame( columns=[ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", "repository", ] ) except MemoryError as e: logger.error(f"Out of memory calculating file change rates. Try reducing limit or using days filter: {e}") return pd.DataFrame( columns=[ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", "repository", ] ) except git.exc.GitCommandError as e: logger.error(f"Git command failed while calculating file change rates: {e}") return pd.DataFrame( columns=[ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", "repository", ] ) except Exception as e: logger.error(f"Unexpected error calculating file change rates: {e}", exc_info=True) return pd.DataFrame( columns=[ "file", "unique_committers", "abs_rate_of_change", "net_rate_of_change", "net_change", "abs_change", "edit_rate", "lines", "repository", ] ) @staticmethod def __check_extension(files, ignore_globs=None, include_globs=None): """ Internal method to filter a list of file changes by extension and ignore_dirs. :param files: :param ignore_globs: a list of globs to ignore (if none falls back to extensions and ignore_dir) :param include_globs: a list of globs to include (if none, includes all). :return: dict """ logger.debug( f"Checking extensions/globs. Files: {len(files)}, Ignore: {ignore_globs}, Include: {include_globs}" ) if include_globs is None or include_globs == []: include_globs = ["*"] out = {} for key in files: # count up the number of patterns in the ignore globs list that match if ignore_globs is not None: count_exclude = sum([1 if fnmatch.fnmatch(key, g) else 0 for g in ignore_globs]) else: count_exclude = 0 # count up the number of patterns in the include globs list that match count_include = sum([1 if fnmatch.fnmatch(key, g) else 0 for g in include_globs]) # if we have one vote or more to include and none to exclude, then we use the file. if count_include > 0 and count_exclude == 0: out[key] = files[key] logger.debug(f"Finished checking extensions. Filtered files count: {len(out)}") return out @multicache(key_prefix="blame", key_list=["rev", "committer", "by", "ignore_blobs", "include_globs"]) def blame( self, rev="HEAD", committer=True, by="repository", ignore_globs=None, include_globs=None, ): """Analyzes blame information for files in the repository. Retrieves blame information from a specific revision and aggregates it based on the specified grouping. Can group results by committer/author and either repository or file. Args: rev (str, optional): Revision to analyze. Defaults to 'HEAD'. committer (bool, optional): If True, group by committer name. If False, group by author name. Defaults to True. by (str, optional): How to group the results. One of: - 'repository': Group by repository (default) - 'file': Group by individual file ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include Returns: pandas.DataFrame: A DataFrame with columns depending on the 'by' parameter: If by='repository': - committer/author (str): Name of the committer/author - loc (int): Lines of code attributed to that person If by='file': - committer/author (str): Name of the committer/author - file (str): File path - loc (int): Lines of code attributed to that person in that file Note: Results are sorted by lines of code in descending order. If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ logger.info(f"Calculating blame for rev '{rev}'. Group by: {by}, Committer: {committer}") logger.debug(f"Blame Ignore: {ignore_globs}, Include: {include_globs}") blames = [] try: # List files at the specified revision file_output = self.repo.git.ls_tree("-r", "--name-only", rev) # Correct split character to standard newline file_names = [f for f in file_output.split("\n") if f.strip()] except GitCommandError as e: logger.error(f"Could not list files for rev '{rev}': {e}") return DataFrame() # Return empty DataFrame if we can't list files for file in self.__check_extension( {x: x for x in file_names}, ignore_globs=ignore_globs, include_globs=include_globs, ): try: logger.debug(f"Getting blame for file: {file} at rev: {rev}") # Use the relative path directly from ls-tree blame_output = self.repo.blame(rev, file) for commit, lines in blame_output: # Store the relative path directly blames.append((commit, lines, file)) except GitCommandError as e: logger.warning(f"Failed to get blame for file: {file} at rev: {rev}. Error: {e}") continue except UnicodeDecodeError as e: logger.warning(f"Skipping binary file that cannot be decoded: {file} at rev: {rev}. Error: {e}") continue if committer: if by == "repository": blames_df = ( DataFrame( [[x[0].committer.name, len(x[1])] for x in blames], columns=["committer", "loc"], ) .groupby("committer")["loc"] .sum() .to_frame() ) elif by == "file": blames_df = ( DataFrame( [[x[0].committer.name, len(x[1]), x[2]] for x in blames], columns=["committer", "loc", "file"], ) .groupby(["committer", "file"])["loc"] .sum() .to_frame() ) else: if by == "repository": blames_df = ( DataFrame( [[x[0].author.name, len(x[1])] for x in blames], columns=["author", "loc"], ) .groupby("author")["loc"] .sum() .to_frame() ) elif by == "file": blames_df = ( DataFrame( [[x[0].author.name, len(x[1]), x[2]] for x in blames], columns=["author", "loc", "file"], ) .groupby(["author", "file"])["loc"] .sum() .to_frame() ) blames_df = self._add_labels_to_df(blames_df) logger.info(f"Finished calculating blame for rev '{rev}'. Found {len(blames_df)} blame entries.") return blames_df @multicache(key_prefix="revs", key_list=["branch", "limit", "skip", "num_datapoints"]) def revs(self, branch=None, limit=None, skip=None, num_datapoints=None, skip_broken=False): """ Returns a dataframe of all revision tags and their timestamps. It will have the columns: * date * rev Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of revisions to return, None for no limit skip (Optional[int]): Number of revisions to skip. Ex: skip=2 returns every other revision, None for no skipping. num_datapoints (Optional[int]): If limit and skip are none, and this isn't, then num_datapoints evenly spaced revs will be used skip_broken (bool): Whether to skip corrupted commit objects. Defaults to False. Returns: DataFrame: DataFrame with revision information """ if branch is None: branch = self.default_branch logger.info( f"Fetching revisions for branch '{branch}'. Limit: {limit}, Skip: {skip}, " f"Num Datapoints: {num_datapoints}, Skip Broken: {skip_broken}" ) if limit is None and skip is None and num_datapoints is not None: logger.debug("Calculating skip based on num_datapoints") try: # Safely count commits commit_count = 0 for _ in self.repo.iter_commits(branch): commit_count += 1 limit = commit_count skip = int(float(limit) / num_datapoints) if commit_count > 0 else 1 logger.debug(f"Calculated limit={limit}, skip={skip} from {commit_count} commits") except git.exc.GitCommandError as e: logger.error(f"Error counting commits for branch '{branch}': {e}") return pd.DataFrame(columns=["date", "rev"]) else: if limit is None: limit = None # Let Git handle unlimited commits naturally elif skip is not None: limit = limit * skip ds = [] skipped_count = 0 try: commits_iterator = self.repo.iter_commits(branch, max_count=limit) for commit in commits_iterator: try: # Get required properties safely try: # Capture all needed data in a single access to avoid file handle issues committed_date = commit.committed_date name_rev = commit.name_rev # Safely handle name_rev format parts = name_rev.split(" ") if name_rev else [] rev_sha = parts[0] if parts else commit.hexsha ds.append([committed_date, rev_sha]) except (ValueError, AttributeError) as e: if skip_broken: logger.warning( f"Skipping commit {commit.hexsha if hasattr(commit, 'hexsha') else 'unknown'}: {e}" ) skipped_count += 1 continue else: logger.error(f"Error processing commit: {e}") raise except git.exc.GitCommandError as git_err: if skip_broken: logger.warning(f"Skipping commit due to Git error: {git_err}") skipped_count += 1 continue else: logger.error(f"Git error processing commit: {git_err}") raise except Exception as e: if skip_broken: logger.warning(f"Skipping commit due to unexpected error: {e}") skipped_count += 1 continue else: logger.error(f"Unexpected error processing commit: {e}") raise except git.exc.GitCommandError as e: logger.error(f"Could not iterate commits for branch '{branch}' in revs(): {e}") # Return empty DataFrame if iteration fails return pd.DataFrame(columns=["date", "rev"]) if not ds: logger.warning(f"No valid revisions found for branch '{branch}'") return pd.DataFrame(columns=["date", "rev"]) df = DataFrame(ds, columns=["date", "rev"]) if skip is not None: logger.debug(f"Applying skip ({skip}) to revisions.") if skip == 0: skip = 1 if df.shape[0] >= skip: df = df.iloc[range(0, df.shape[0], skip)] df.reset_index(drop=True, inplace=True) else: df = df.iloc[[0]] df.reset_index(drop=True, inplace=True) df = self._add_labels_to_df(df) if skipped_count > 0: logger.info( f"Finished fetching revisions for '{branch}'. Found {len(df)} " f"valid revisions, skipped {skipped_count} corrupted objects." ) else: logger.info(f"Finished fetching revisions for '{branch}'. Found {len(df)} revisions.") return df @multicache( key_prefix="cumulative_blame", key_list=["branch", "limit", "skip", "num_datapoints", "committer", "ignore_globs", "include_globs"], ) def cumulative_blame( self, branch=None, limit=None, skip=None, num_datapoints=None, committer=True, ignore_globs=None, include_globs=None, skip_broken=True, ): """ Returns the blame at every revision of interest. Index is a datetime, column per committer, with number of lines blamed to each committer at each timestamp as data. Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of revisions to return, None for no limit skip (Optional[int]): Number of revisions to skip. Ex: skip=2 returns every other revision, None for no skipping. num_datapoints (Optional[int]): If limit and skip are none, and this isn't, then num_datapoints evenly spaced revs will be used committer (bool, optional): True if committer should be reported, false if author ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include skip_broken (bool, optional): Whether to skip corrupted Git objects. Defaults to True. Returns: DataFrame: DataFrame with blame information Note: If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ if branch is None: branch = self.default_branch logger.info( f"Starting cumulative blame calculation for branch '{branch}'. " f"Limit: {limit}, Skip: {skip}, Num Datapoints: {num_datapoints}, " f"Committer: {committer}, Skip Broken: {skip_broken}" ) # Pass skip_broken and force_refresh to ensure robustness when getting revisions revs = self.revs(branch=branch, limit=limit, skip=skip, num_datapoints=num_datapoints, skip_broken=skip_broken) # Check immediately after calling revs() if not revs.empty and "rev" not in revs.columns: logger.error("DataFrame returned from self.revs() is missing the 'rev' column.") # Raise a specific error to make it clear. raise ValueError("Internal Error: self.revs() returned DataFrame without 'rev' column.") # get the commit history to stub out committers (hacky and slow) logger.debug("Fetching all committers to pre-populate columns...") committers = set() try: for commit in self.repo.iter_commits(branch): try: # Determine the name based on the 'committer' flag name = commit.committer.name if committer else commit.author.name committers.add(name) except ValueError as e: # Handle potential errors resolving commit objects (e.g., due to corruption) logger.warning( f"Could not resolve commit object " f"{commit.hexsha if hasattr(commit, 'hexsha') else 'unknown'} when fetching committers: {e}" ) continue except Exception as e: # Catch other potential errors getting name (e.g., missing name) logger.warning( f"Error getting committer/author name for commit " f"{commit.hexsha if hasattr(commit, 'hexsha') else 'unknown'}: {e}" ) continue except GitCommandError as e: logger.error(f"Could not iterate commits for branch '{branch}' to get committers: {e}") # Return empty DataFrame if we can't even get committers return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) # Check if any committers were found if not committers: logger.warning(f"No valid committers found for branch '{branch}'. Returning empty DataFrame.") # Return an empty DataFrame with a 'date' index to avoid errors downstream return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) # If revs is empty, return an empty DataFrame with proper index if revs.empty: logger.warning(f"No valid revisions found for branch '{branch}'. Returning empty DataFrame.") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) for y in committers: revs[y] = 0 if self.verbose: print("Beginning processing for cumulative blame:") logger.debug(f"Processing {len(revs)} revisions for cumulative blame...") # now populate that table with some actual values for idx, row in revs.iterrows(): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Processing blame for rev: {row['rev']} (Index: {idx})") try: blame = self.blame( rev=row["rev"], committer=committer, ignore_globs=ignore_globs, include_globs=include_globs, ) for y in committers: try: loc = blame.loc[y, "loc"] revs.at[idx, y] = loc except KeyError: pass except GitCommandError as e: logger.warning(f"Skipping blame for revision {row['rev']}: {e}") continue except Exception as e: logger.warning(f"Unexpected error processing blame for revision {row['rev']}: {e}") continue # If revs is now empty after processing, return an empty DataFrame if revs.empty: logger.warning("No valid blame data found after processing. Returning empty DataFrame.") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) try: del revs["rev"] # Convert date strings to numeric type before using to_datetime revs["date"] = pd.to_numeric(revs["date"]) revs["date"] = pd.to_datetime(revs["date"], unit="s", utc=True) revs.set_index(keys=["date"], drop=True, inplace=True) revs = revs.fillna(0.0) # drop 0 cols for col in revs.columns.values: if col != "col" and revs[col].sum() == 0: del revs[col] # drop 0 rows keep_idx = [] committers = [x for x in revs.columns.values if x != "date"] for idx, row in revs.iterrows(): # Convert any string values to numeric, treating non-numeric strings as 0 row_sum = 0 for x in committers: try: val = float(row[x]) row_sum += val except (ValueError, TypeError): continue if row_sum > 0: keep_idx.append(idx) logger.debug(f"Filtering complete. Kept {len(keep_idx)} non-zero rows.") # Only filter if we have rows to keep if keep_idx: revs = revs.loc[keep_idx] except Exception as e: logger.error(f"Error processing cumulative blame data: {e}") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) logger.info(f"Finished cumulative blame calculation for '{branch}'. Result shape: {revs.shape}") return revs @multicache( key_prefix="parallel_cumulative_blame", key_list=["branch", "limit", "skip", "num_datapoints", "committer", "workers", "ignore_globs", "include_globs"], ) def parallel_cumulative_blame( self, branch=None, limit=None, skip=None, num_datapoints=None, committer=True, workers=1, ignore_globs=None, include_globs=None, skip_broken=True, ): """ Returns the blame at every revision of interest. Index is a datetime, column per committer, with number of lines blamed to each committer at each timestamp as data. Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of revisions to return, None for no limit skip (Optional[int]): Number of revisions to skip. Ex: skip=2 returns every other revision, None for no skipping. num_datapoints (Optional[int]): If limit and skip are none, and this isn't, then num_datapoints evenly spaced revs will be used committer (bool, optional): True if committer should be reported, false if author ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include workers (Optional[int]): Number of workers to use in the threadpool, -1 for one per core. skip_broken (bool, optional): Whether to skip corrupted Git objects. Defaults to True. Returns: DataFrame: DataFrame with blame information """ if branch is None: branch = self.default_branch logger.info( f"Starting parallel cumulative blame for branch '{branch}'. " f"Limit: {limit}, Skip: {skip}, Num Datapoints: {num_datapoints}, " f"Committer: {committer}, Workers: {workers}, Skip Broken: {skip_broken}" ) if not _has_joblib: logger.error("Joblib not installed. Cannot run parallel_cumulative_blame.") raise ImportError("""Must have joblib installed to use parallel_cumulative_blame(), please use cumulative_blame() instead.""") revs = self.revs(branch=branch, limit=limit, skip=skip, num_datapoints=num_datapoints, skip_broken=skip_broken) # If revs is empty, return an empty DataFrame with proper index if revs.empty: logger.warning(f"No valid revisions found for branch '{branch}'. Returning empty DataFrame.") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) logger.debug(f"Prepared {len(revs)} revisions for parallel processing.") try: revisions = json.loads(revs.to_json(orient="index")) revisions = [revisions[key] for key in revisions] ds = Parallel(n_jobs=workers, backend="threading", verbose=5)( delayed(_parallel_cumulative_blame_func)(self, x, committer, ignore_globs, include_globs) for x in revisions ) if not ds: logger.warning("No valid blame data found after processing. Returning empty DataFrame.") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) revs = DataFrame(ds) del revs["rev"] # Convert date strings to numeric type before using to_datetime revs["date"] = pd.to_numeric(revs["date"]) revs["date"] = pd.to_datetime(revs["date"], unit="s", utc=True) revs.set_index(keys=["date"], drop=True, inplace=True) revs = revs.fillna(0.0) # drop 0 cols for col in revs.columns.values: if col != "col" and revs[col].sum() == 0: del revs[col] # drop 0 rows keep_idx = [] committers = [x for x in revs.columns.values if x != "date"] for idx, row in revs.iterrows(): # Convert any string values to numeric, treating non-numeric strings as 0 row_sum = 0 for x in committers: try: val = float(row[x]) row_sum += val except (ValueError, TypeError): continue if row_sum > 0: keep_idx.append(idx) logger.debug(f"Filtering complete. Kept {len(keep_idx)} non-zero rows.") # Only filter if we have rows to keep if keep_idx: revs = revs.loc[keep_idx] logger.info(f"Finished parallel cumulative blame for '{branch}'. Result shape: {revs.shape}") return revs except Exception as e: logger.error(f"Error in parallel cumulative blame: {e}") return pd.DataFrame(index=pd.to_datetime([]).tz_localize("UTC")) @multicache(key_prefix="branches", key_list=[]) def branches(self): """Returns information about all branches in the repository. Retrieves a list of all branches (both local and remote) from the repository. Returns: pandas.DataFrame: A DataFrame with columns: - repository (str): Repository name - branch (str): Name of the branch - local (bool): Whether the branch is local Additional columns for any labels specified in labels_to_add """ logger.info("Fetching repository branches (local and remote).") # first pull the local branches logger.debug("Fetching local branches...") local_branches = self.repo.branches data = [[x.name, True] for x in list(local_branches)] # then the remotes logger.debug("Fetching remote branches...") remote_branches = self.repo.git.branch("-r").replace(" ", "").splitlines() rb = [] for _i, remote in enumerate(remote_branches): if "->" in remote: continue # Strip origin/ prefix if remote.startswith("origin/"): remote = remote[7:] rb.append(remote) remote_branches = set(rb) data += [[x, False] for x in remote_branches] df = DataFrame(data, columns=["branch", "local"]) df = self._add_labels_to_df(df) logger.info(f"Finished fetching branches. Found {len(df)} total branches.") return df @multicache(key_prefix="get_branches_by_commit", key_list=["commit"]) def get_branches_by_commit(self, commit): """Finds all branches containing a specific commit. Args: commit (str): Commit hash to look up Returns: pandas.DataFrame: A DataFrame with columns: - branch (str): Name of each branch containing the commit - commit (str): The commit hash that was looked up - repository (str): Repository name Additional columns for any labels specified in labels_to_add """ logger.info(f"Finding branches containing commit: {commit}") branches = self.repo.git.branch("-a", "--contains", commit).replace(" ", "").replace("*", "").splitlines() df = DataFrame(branches, columns=["branch"]) df["commit"] = str(commit) df = self._add_labels_to_df(df) logger.info(f"Found {len(df)} branches containing commit {commit}.") return df @multicache(key_prefix="commits_in_tags", key_list=["start", "end"]) def commits_in_tags(self, start=None, end=None): """Analyzes commits associated with each tag. For each tag, traces backwards through the commit history until hitting another tag, reaching the time limit, or hitting the root commit. This helps understand what changes went into each tagged version. Args: start (Union[np.timedelta64, pd.Timestamp], optional): Start time for analysis. If a timedelta, calculated relative to now. Defaults to 6 months ago. end (Optional[pd.Timestamp]): End time for analysis. Defaults to None. Returns: pandas.DataFrame: A DataFrame indexed by (tag_date, commit_date) with columns: - commit_sha (str): SHA of the commit - tag (str): Name of the tag this commit belongs to - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: This is useful for generating changelogs or understanding the scope of changes between tagged releases. """ logger.info(f"Analyzing commits within tags. Start: {start}, End: {end}") if start is None: start = np.timedelta64(180, "D") # Approximately 6 months # If we pass in a timedelta instead of a timestamp, calc the timestamp relative to now if isinstance(start, pd.Timedelta | np.timedelta64): start = pd.Timestamp.today(tz="UTC") - start if isinstance(end, pd.Timedelta | np.timedelta64): end = pd.Timestamp.today(tz="UTC") - end # remove tagged commits outside our date ranges df_tags = self.tags() if start: df_tags = df_tags.query(f'commit_date > "{start}"').copy() if end: df_tags = df_tags.query(f'commit_date < "{end}"').copy() # convert to unix time to speed up calculations later start = (start - pd.Timestamp("1970-01-01", tz="UTC")) // pd.Timedelta("1s") if start else start end = (end - pd.Timestamp("1970-01-01", tz="UTC")) // pd.Timedelta("1s") if end else end ds = [] checked_commits = set() df_tags["filled_shas"] = df_tags["tag_sha"].fillna(value=df_tags["commit_sha"]) logger.debug(f"Processing {len(df_tags)} tags within the specified date range.") for sha, tag_name in df_tags[["filled_shas", "tag"]].sort_index(level="tag_date").values: logger.debug(f"Processing tag '{tag_name}' starting from SHA: {sha}") commit = self.repo.commit(sha) before_start = start and commit.committed_date < start passed_end = end and commit.committed_date > end already_checked = str(commit) in checked_commits if before_start or passed_end or already_checked: continue tag = self.repo.tag(tag_name) checked_commits.add(str(commit)) logger.debug(f"Adding commit {commit.hexsha[:7]} for tag '{tag.name}'") ds.append(self._commits_per_tags_helper(commit, df_tags, tag=tag)) if not ds: logger.info("No commits found within tags for the specified range.") return pd.DataFrame(columns=["commit_sha", "tag", "tag_date", "commit_date"]) df = pd.DataFrame(ds) df = df.set_index(["tag_date", "commit_date"]) df = self._add_labels_to_df(df) logger.info(f"Finished analyzing commits in tags. Found {len(df)} commits.") return df def _commits_per_tags_recursive( self, commit, df_tags, ds=None, tag=None, checked_commits=None, start=None, end=None, ): logger.debug(f"Recursive check for commit {commit.hexsha[:7]} under tag '{tag.name if tag else None}'") ds = ds if ds is not None else [] checked_commits = checked_commits if checked_commits is not None else set() for parent_commit in commit.parents: before_start = start and parent_commit.committed_date < start passed_end = end and parent_commit.committed_date > end already_checked = str(parent_commit) in checked_commits if before_start or passed_end or already_checked: logger.debug( f"Skipping parent commit {parent_commit.hexsha[:7]}: BeforeStart={before_start}, PassedEnd={passed_end}, AlreadyChecked={already_checked}" # noqa: E501 ) continue checked_commits.add(str(parent_commit)) commit_meta, tag = self._commits_per_tags_helper(commit=parent_commit, df_tags=df_tags, tag=tag) ds.append(commit_meta) self._commits_per_tags_recursive( commit=parent_commit, df_tags=df_tags, ds=ds, tag=tag, checked_commits=checked_commits, start=start, end=end, ) def _commits_per_tags_helper(self, commit, df_tags, tag=None): tag_pd = df_tags.loc[ (df_tags["commit_sha"].str.contains(str(commit))) | (df_tags["tag_sha"].str.contains(str(commit))) ].tag if not tag_pd.empty: tag = self.repo.tag(tag_pd[0]) tag_date = tag.tag.tagged_date if tag and tag.tag else commit.committed_date tag_date = pd.to_datetime(tag_date, unit="s", utc=True) commit_date = pd.to_datetime(commit.committed_date, unit="s", utc=True) return { "commit_sha": str(commit), "tag": str(tag), "tag_date": tag_date, "commit_date": commit_date, } @multicache(key_prefix="tags", key_list=[]) def tags(self, skip_broken=False): """Returns information about all tags in the repository. Retrieves detailed information about all tags, including both lightweight and annotated tags. Args: skip_broken (bool): Whether to skip corrupted tag objects. Defaults to False. Returns: pandas.DataFrame: A DataFrame indexed by (tag_date, commit_date) with columns: - tag (str): Name of the tag - annotated (bool): Whether it's an annotated tag - annotation (str): Tag message (empty for lightweight tags) - tag_sha (Optional[str]): SHA of tag object (None for lightweight tags) - commit_sha (str): SHA of the commit being tagged - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: - tag_date is the tag creation time for annotated tags, commit time for lightweight - commit_date is always the timestamp of the tagged commit - Both dates are timezone-aware UTC timestamps """ logger.info(f"Fetching repository tags (skip_broken={skip_broken}).") tags = self.repo.tags tags_meta = [] cols = [ "tag_date", "commit_date", "tag", "annotated", "annotation", "tag_sha", "commit_sha", ] skipped_count = 0 for tag in tags: try: d = dict.fromkeys(cols) d["tag"] = tag.name # Safely handle tag object access tag_obj = None try: # Check if this is an annotated tag (has tag object) tag_obj = tag.tag except (ValueError, AttributeError, git.exc.GitCommandError): # Not an annotated tag or tag object is inaccessible tag_obj = None if tag_obj is not None: # This is a safer way to access tag properties - get all at once try: # Store all tag object attributes we need in one go d["annotated"] = True d["tag_date"] = str(tag_obj.tagged_date) d["annotation"] = str(tag_obj.message) d["tag_sha"] = str(tag_obj.hexsha) except (ValueError, AttributeError, git.exc.GitCommandError) as e: if skip_broken: logger.warning(f"Skipping corrupted tag object '{tag.name}': {e}") skipped_count += 1 continue else: logger.error(f"Error accessing tag object '{tag.name}': {e}") raise else: # Lightweight tag d["annotated"] = False d["annotation"] = "" d["tag_sha"] = None # Safely get commit information try: commit = tag.commit d["commit_date"] = commit.committed_date d["commit_sha"] = commit.hexsha # For lightweight tags, use commit date as tag date if "tag_date" not in d or d["tag_date"] is None: d["tag_date"] = commit.committed_date except (ValueError, git.exc.GitCommandError) as e: if skip_broken: logger.warning(f"Skipping tag '{tag.name}' with invalid commit reference: {e}") skipped_count += 1 continue else: logger.error(f"Error accessing commit for tag '{tag.name}': {e}") raise tags_meta.append(d) except git.exc.GitCommandError as git_err: # Handle Git command errors (like unknown object type) if skip_broken: logger.warning(f"Skipping tag '{tag.name}' due to Git error: {git_err}") skipped_count += 1 continue else: logger.error(f"Git error reading tag '{tag.name}': {git_err}") raise except ValueError as ve: # Handle file handle errors and value errors if skip_broken: logger.warning(f"Skipping tag '{tag.name}' due to value error: {ve}") skipped_count += 1 continue else: logger.error(f"Value error while reading tag '{tag.name}': {ve}") raise except Exception as e: # General error handling if skip_broken: logger.warning(f"Skipping tag '{tag.name}' due to unexpected error: {e}") skipped_count += 1 continue else: logger.error(f"Unexpected error while processing tag '{tag.name}': {e}") raise if not tags_meta: logger.info("No valid tags found in the repository.") # Return an empty DataFrame with the expected columns df = DataFrame(columns=cols) df = self._add_labels_to_df(df) return df df = DataFrame(tags_meta, columns=cols) df["tag_date"] = to_datetime(pd.to_numeric(df["tag_date"], errors="coerce"), unit="s", utc=True) df["commit_date"] = to_datetime(pd.to_numeric(df["commit_date"], errors="coerce"), unit="s", utc=True) df = self._add_labels_to_df(df) df = df.set_index(keys=["tag_date", "commit_date"], drop=True) df = df.sort_index(level=["tag_date", "commit_date"]) if skipped_count > 0: logger.info(f"Finished fetching tags. Found {len(df)} valid tags, skipped {skipped_count} corrupted tags.") else: logger.info(f"Finished fetching tags. Found {len(df)} tags.") return df @property def repo_name(self): return self._repo_name()
[docs] def _repo_name(self): """Returns the name of the repository. For local repositories, uses the name of the directory containing the .git folder. For remote repositories, extracts the name from the URL. Returns: str: Name of the repository, or 'unknown_repo' if name can't be determined Note: This is an internal method primarily used to provide consistent repository names in DataFrame outputs. """ if self._git_repo_name is not None: return self._git_repo_name else: reponame = self.repo.git_dir.split(os.sep)[-2] if reponame.strip() == "": return "unknown_repo" return reponame
[docs] def _add_labels_to_df(self, df): """Adds configured labels to a DataFrame. Adds the repository name and any additional configured labels to the DataFrame. This ensures consistent labeling across all DataFrame outputs. Args: df (pandas.DataFrame): DataFrame to add labels to Returns: pandas.DataFrame: The input DataFrame with additional label columns: - repository (str): Repository name - label0..labelN: Values from labels_to_add Note: This is an internal helper method used by all public methods that return DataFrames. """ df["repository"] = self._repo_name() for i, label in enumerate(self._labels_to_add): df[f"label{i}"] = label return df
[docs] def __str__(self): """Returns a human-readable string representation of the repository. Returns: str: String in format 'git repository: {name} at: {path}' """ return f"git repository: {self._repo_name()} at: {self.git_dir}"
@multicache(key_prefix="get_commit_content", key_list=["rev", "ignore_globs", "include_globs"]) def get_commit_content(self, rev, ignore_globs=None, include_globs=None): """Gets detailed content changes for a specific commit. For each file changed in the commit, returns the actual content changes including added and removed lines. Args: rev (str): Revision (commit hash) to analyze ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include Returns: pandas.DataFrame: A DataFrame with columns: - file (str): Path of the changed file - change_type (str): Type of change (A=added, M=modified, D=deleted) - old_line_num (int): Line number in the old version (None for added lines) - new_line_num (int): Line number in the new version (None for deleted lines) - content (str): The actual line content - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: For binary files, only the change_type is recorded, with no line-by-line changes. If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ logger.info(f"Getting detailed content changes for revision '{rev}'") try: commit = self.repo.commit(rev) # Get the parent commit. For merge commits, use first parent parent = commit.parents[0] if commit.parents else None parent_sha = parent.hexsha if parent else "4b825dc642cb6eb9a060e54bf8d69288fbee4904" # empty tree # Get the diff between this commit and its parent diff = self.repo.git.diff( parent_sha, commit.hexsha, "--unified=0", # No context lines "--no-prefix", # Don't prefix with a/ and b/ "--no-renames", # Don't try to detect renames ) changes = [] current_file = None current_type = None for line in diff.split("\n"): if line.startswith("diff --git"): # New file being processed file_path = line.split(" ")[-1] # Check if this file should be included based on globs if not self.__check_extension({file_path: None}, ignore_globs, include_globs): current_file = None continue current_file = file_path elif line.startswith("new file"): current_type = "A" elif line.startswith("deleted"): current_type = "D" elif line.startswith("index"): current_type = "M" elif line.startswith("@@") and current_file: # Parse the @@ line to get line numbers # Format: @@ -old_start,old_count +new_start,new_count @@ nums = line.split("@@")[1].strip().split(" ") old_range = nums[0].split(",") new_range = nums[1].split(",") old_start = int(old_range[0].lstrip("-")) new_start = int(new_range[0].lstrip("+")) elif line.startswith("+") and current_file and not line.startswith("+++"): # Added line changes.append( [ current_file, current_type, None, # old line number new_start, line[1:], # Remove the + prefix ] ) new_start += 1 elif line.startswith("-") and current_file and not line.startswith("---"): # Removed line changes.append( [ current_file, current_type, old_start, None, # new line number line[1:], # Remove the - prefix ] ) old_start += 1 if not changes: logger.info(f"No changes found in revision '{rev}' matching the filters") return DataFrame(columns=["file", "change_type", "old_line_num", "new_line_num", "content"]) df = DataFrame(changes, columns=["file", "change_type", "old_line_num", "new_line_num", "content"]) df = self._add_labels_to_df(df) logger.info(f"Found {len(df)} line changes in revision '{rev}'") return df except (GitCommandError, IndexError, BadObject, BadName) as e: logger.error(f"Failed to get content changes for revision '{rev}': {e}") return DataFrame(columns=["file", "change_type", "old_line_num", "new_line_num", "content"]) @multicache(key_prefix="get_file_content", key_list=["path", "rev"]) def get_file_content(self, path, rev="HEAD"): """Gets the content of a file from the repository at a specific revision. Safely retrieves file content by first verifying the file exists in git's tree (respecting .gitignore) before attempting to read it. Args: path (str): Path to the file relative to repository root rev (str, optional): Revision to get file from. Defaults to 'HEAD'. Returns: Optional[str]: Content of the file if it exists and is tracked by git, None if file doesn't exist or isn't tracked. Note: This only works for files that are tracked by git. Untracked files and files matched by .gitignore patterns cannot be read. """ logger.info(f"Getting content of file '{path}' at revision '{rev}'") try: # First verify the file exists in git's tree try: # ls-tree -r for recursive, --full-name for full paths # -l for long format (includes size) self.repo.git.ls_tree("-r", "-l", "--full-name", rev, path) except GitCommandError: logger.warning(f"File '{path}' not found in git tree at revision '{rev}'") return None # If we get here, the file exists in git's tree # Use git show to get the file content content = self.repo.git.show(f"{rev}:{path}") return content except GitCommandError as e: logger.error(f"Failed to get content of file '{path}' at revision '{rev}': {e}") return None @multicache(key_prefix="list_files", key_list=["rev"]) def list_files(self, rev="HEAD"): """Lists all files in the repository at a specific revision, respecting .gitignore. Uses git ls-tree to get a list of all tracked files in the repository, which automatically respects .gitignore rules since untracked and ignored files are not in git's tree. Args: rev (str, optional): Revision to list files from. Defaults to 'HEAD'. Returns: pandas.DataFrame: A DataFrame with columns: - file (str): Full path to the file relative to repository root - mode (str): File mode (100644 for regular file, 100755 for executable, etc) - type (str): Object type (blob for file, tree for directory) - sha (str): SHA-1 hash of the file content - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: This only includes files that are tracked by git. Untracked files and files matched by .gitignore patterns are not included. """ logger.info(f"Listing files at revision '{rev}'") try: # Get the full file list with details using ls-tree # -r for recursive # -l for long format (includes file size) # --full-tree to start from root # --full-name for full paths output = self.repo.git.ls_tree("-r", "-l", "--full-tree", "--full-name", rev) if not output.strip(): logger.info("No files found in repository") return DataFrame(columns=["file", "mode", "type", "sha"]) # Parse the ls-tree output # Format: <mode> <type> <sha> <size>\t<file> files = [] for line in output.split("\n"): if not line.strip(): continue # Split on tab first to separate path from rest details, path = line.split("\t") mode, obj_type, sha, _ = details.split() files.append([path, mode, obj_type, sha]) df = DataFrame(files, columns=["file", "mode", "type", "sha"]) df = self._add_labels_to_df(df) logger.info(f"Found {len(df)} files at revision '{rev}'") return df except GitCommandError as e: logger.error(f"Failed to list files at revision '{rev}': {e}") return DataFrame(columns=["file", "mode", "type", "sha"])
[docs] def __repr__(self): """Returns a unique string representation of the repository. Returns: str: The absolute path to the repository """ return str(self.git_dir)
@multicache(key_prefix="bus_factor", key_list=["by", "ignore_globs", "include_globs"]) def bus_factor(self, by="repository", ignore_globs=None, include_globs=None): """Calculates the "bus factor" for the repository. The bus factor is a measure of risk based on how concentrated the codebase knowledge is among contributors. It is calculated as the minimum number of contributors whose combined contributions account for at least 50% of the codebase's lines of code. Args: by (str, optional): How to calculate the bus factor. One of: - 'repository': Calculate for entire repository (default) - 'file': Calculate for each individual file ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include Returns: pandas.DataFrame: A DataFrame with columns depending on the 'by' parameter: If by='repository': - repository (str): Repository name - bus factor (int): Bus factor for the repository If by='file': - file (str): File path - bus factor (int): Bus factor for that file - repository (str): Repository name Note: A low bus factor (e.g. 1-2) indicates high risk as knowledge is concentrated among few contributors. A higher bus factor indicates knowledge is better distributed. """ logger.info(f"Calculating bus factor. Group by: {by}, Ignore: {ignore_globs}, Include: {include_globs}") if by == "file": # Get file-wise blame data blame = self.blame(include_globs=include_globs, ignore_globs=ignore_globs, by="file") if blame.empty: logger.warning("No blame data found for file-wise bus factor calculation.") return DataFrame(columns=["file", "bus factor", "repository"]) # Reset index to access file column if it's in the index if isinstance(blame.index, pd.MultiIndex) and "file" in blame.index.names: blame = blame.reset_index() # Group by file and calculate bus factor for each file file_bus_factors = [] files = blame["file"].unique() for file_name in files: file_blame = blame[blame["file"] == file_name].copy() file_blame = file_blame.sort_values(by=["loc"], ascending=False) total = file_blame["loc"].sum() if total == 0: # If file has no lines of code, skip it continue cumulative = 0 tc = 0 for idx in range(file_blame.shape[0]): cumulative += file_blame.iloc[idx]["loc"] tc += 1 if cumulative >= total / 2: break file_bus_factors.append([file_name, tc, self._repo_name()]) logger.info(f"Calculated bus factor for {len(file_bus_factors)} files.") return DataFrame(file_bus_factors, columns=["file", "bus factor", "repository"]) blame = self.blame(include_globs=include_globs, ignore_globs=ignore_globs, by=by) blame = blame.sort_values(by=["loc"], ascending=False) total = blame["loc"].sum() cumulative = 0 tc = 0 for idx in range(blame.shape[0]): cumulative += blame.iloc[idx]["loc"] tc += 1 if cumulative >= total / 2: break logger.info(f"Bus factor calculated: {tc}") return DataFrame([[self._repo_name(), tc]], columns=["repository", "bus factor"]) @multicache(key_prefix="file_owner", key_list=["rev", "filename", "committer"]) def file_owner(self, rev, filename, committer=True): """Determines the primary owner of a file at a specific revision. The owner is determined by who has contributed the most lines of code to the file according to git blame. Args: rev (str): Revision to analyze filename (str): Path to the file relative to repository root committer (bool, optional): If True, use committer info. If False, use author. Defaults to True. Returns: Optional[dict]: Dictionary containing owner information with keys: - name (str): Name of the primary owner Returns None if file doesn't exist or can't be analyzed Note: This is a helper method used by file_detail() to determine file ownership. """ logger.debug(f"Determining file owner for: {filename} at rev: {rev}, Committer: {committer}") try: cm = "committer" if committer else "author" blame = self.repo.blame(rev, os.path.join(self.git_dir, filename)) blame = ( DataFrame( [[x[0].committer.name, len(x[1])] for x in blame], columns=[cm, "loc"], ) .groupby(cm) .agg({"loc": "sum"}) ) if blame.shape[0] > 0: owner = blame["loc"].idxmax() return {"name": owner} else: logger.debug(f"No blame information found for file {filename} at rev {rev}.") return None except (GitCommandError, KeyError) as e: logger.warning(f"Could not determine file owner for {filename} at rev {rev}: {e}") return None
[docs] def _get_last_edit_date(self, file_path, rev="HEAD"): """Get the last edit date for a file at a given revision. Args: file_path (str): Path to the file rev (str): Revision to check Returns: datetime: Last edit date for the file """ try: cmd = ["git", "log", "-1", "--format=%aI", rev, "--", file_path] date_str = self.repo.git.execute(cmd) if date_str: # Parse ISO 8601 format which includes timezone return pd.to_datetime(date_str.strip(), utc=True) return pd.NaT except Exception as e: logger.warning(f"Error getting last edit date for {file_path}: {e}") return pd.NaT
@multicache( key_prefix="punchcard", key_list=["branch", "limit", "days", "by", "normalize", "ignore_globs", "include_globs"] ) def punchcard( self, branch=None, limit=None, days=None, by=None, normalize=None, ignore_globs=None, include_globs=None, ): """ Returns a pandas DataFrame containing all of the data for a punchcard. * day_of_week * hour_of_day * author / committer * lines * insertions * deletions * net Args: branch (Optional[str]): Branch to analyze. Defaults to default_branch if None. limit (Optional[int]): Maximum number of commits to return, None for no limit days (Optional[int]): Number of days to return if limit is None by (Optional[str]): Agg by options, None for no aggregation (just a high level punchcard), or 'committer', 'author' normalize (Optional[int]): If an integer, returns the data normalized to max value of that (for plotting) ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore include_globs (Optional[List[str]]): List of glob patterns for files to include Returns: DataFrame: DataFrame with punchcard data """ logger.info( f"Generating punchcard data for branch '{branch}'. " f"Limit: {limit}, Days: {days}, By: {by}, Normalize: {normalize}, " f"Ignore: {ignore_globs}, Include: {include_globs}" ) if branch is None: branch = self.default_branch logger.debug("Fetching commit history for punchcard...") ch = self.commit_history( branch=branch, limit=limit, days=days, ignore_globs=ignore_globs, include_globs=include_globs, ) # add in the date fields ch["day_of_week"] = ch.index.map(lambda x: x.weekday()) ch["hour_of_day"] = ch.index.map(lambda x: x.hour) aggs = ["hour_of_day", "day_of_week"] if by is not None: aggs.append(by) logger.debug(f"Aggregating punchcard data by: {aggs}") punch_card = ch.groupby(aggs).agg({"lines": "sum", "insertions": "sum", "deletions": "sum", "net": "sum"}) punch_card.reset_index(inplace=True) # normalize all cols if normalize is not None: logger.debug(f"Normalizing punchcard data to max value: {normalize}") for col in ["lines", "insertions", "deletions", "net"]: punch_card[col] = (punch_card[col] / punch_card[col].sum()) * normalize logger.info(f"Finished generating punchcard data for '{branch}'. Result shape: {punch_card.shape}") return punch_card @multicache(key_prefix="has_branch", key_list=["branch"]) def has_branch(self, branch): """Checks if a branch exists in the repository. Args: branch (str): Name of the branch to check Returns: bool: True if the branch exists, False otherwise Note: This checks both local and remote branches. """ logger.info(f"Checking if branch '{branch}' exists.") try: # Get all branches (both local and remote) branches = self.branches() result = branch in branches["branch"].values logger.info(f"Branch '{branch}' exists: {result}") return result except GitCommandError as e: logger.warning(f"Could not check branches in repo '{self._repo_name()}': {e}") return False @multicache(key_prefix="file_detail", key_list=["include_globs", "ignore_globs", "rev", "committer"]) def file_detail(self, include_globs=None, ignore_globs=None, rev="HEAD", committer=True): """Provides detailed information about all files in the repository. Analyzes each file at the specified revision, gathering information about size, ownership, and last modification. Args: include_globs (Optional[List[str]]): List of glob patterns for files to include ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore rev (str, optional): Revision to analyze. Defaults to 'HEAD'. committer (bool, optional): If True, use committer info. If False, use author. Defaults to True. Returns: pandas.DataFrame: A DataFrame with columns: - file (str): Path to the file - file_owner (str): Name of primary committer/author - last_edit_date (datetime): When file was last modified - loc (int): Lines of code in file - ext (str): File extension - repository (str): Repository name Additional columns for any labels specified in labels_to_add Note: The primary file owner is the person responsible for the most lines in the current version of the file. This method is cached if a cache_backend was provided and rev is not HEAD. """ logger.info( f"Fetching file details for rev '{rev}'. " f"Ignore: {ignore_globs}, Include: {include_globs}, Committer: {committer}" ) # first get the blame logger.debug("Calculating blame for file details...") blame = self.blame( include_globs=include_globs, ignore_globs=ignore_globs, rev=rev, committer=committer, by="file", ) blame = blame.reset_index(level=-1) blame = blame.reset_index(level=-1) # reduce it to files and total LOC logger.debug("Reducing to files and total LOC...") df = blame.reindex(columns=["file", "loc"]) df = df.groupby("file").agg({"loc": "sum"}).reset_index() # Keep file as column # map in file owners logger.debug("Mapping file owners...") def _get_owner_name_safe(file_path): owner_info = self.file_owner(rev, file_path, committer=committer) return owner_info.get("name") if owner_info else None df["file_owner"] = df["file"].map(_get_owner_name_safe) # add extension (something like the language) logger.debug("Extracting file extensions...") df["ext"] = df["file"].map(lambda x: x.split(".")[-1] if "." in x else "") # Handle files without extensions # add in last edit date for the file logger.debug("Mapping last edit dates...") df["last_edit_date"] = df["file"].map(lambda x: self._get_last_edit_date(x, rev=rev)) # Add repository labels without setting index df = self._add_labels_to_df(df) logger.info(f"Finished fetching file details for rev '{rev}'. Found details for {len(df)} files.") return df
[docs] def time_between_revs(self, rev1, rev2): """Calculates the time difference in days between two revisions. Args: rev1 (str): The first revision (commit hash or tag). rev2 (str): The second revision (commit hash or tag). Returns: float: The absolute time difference in days between the two revisions. Note: The result is always non-negative (absolute value). """ c1 = self.repo.commit(rev1) c2 = self.repo.commit(rev2) t1 = pd.to_datetime(c1.committed_date, unit="s", utc=True) t2 = pd.to_datetime(c2.committed_date, unit="s", utc=True) return abs((t2 - t1).total_seconds()) / (60 * 60 * 24)
[docs] def diff_stats_between_revs(self, rev1, rev2, ignore_globs=None, include_globs=None): """Computes diff statistics between two revisions. Calculates the total insertions, deletions, net line change, and number of files changed between two arbitrary revisions (commits or tags). Optionally filters files using glob patterns. Args: rev1 (str): The base revision (commit hash or tag). rev2 (str): The target revision (commit hash or tag). ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore. include_globs (Optional[List[str]]): List of glob patterns for files to include. Returns: dict: A dictionary with keys: - 'insertions' (int): Total lines inserted. - 'deletions' (int): Total lines deleted. - 'net' (int): Net lines changed (insertions - deletions). - 'files_changed' (int): Number of files changed. - 'files' (List[str]): List of changed file paths. Note: Binary files or files that cannot be parsed are skipped. If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ diff = self.repo.git.diff(rev1, rev2, "--numstat", "--no-renames") insertions = deletions = files_changed = 0 files = set() for line in diff.splitlines(): parts = line.strip().split("\t") if len(parts) == 3: ins, dels, fname = parts if ins == "-" or dels == "-": continue # binary or unparseable if not self.__check_extension({fname: None}, ignore_globs, include_globs): continue insertions += int(ins) deletions += int(dels) files_changed += 1 files.add(fname) return { "insertions": insertions, "deletions": deletions, "net": insertions - deletions, "files_changed": files_changed, "files": list(files), }
[docs] def committers_between_revs(self, rev1, rev2, ignore_globs=None, include_globs=None): """Finds unique committers and authors between two revisions. Iterates through all commits between two revisions (exclusive of rev1, inclusive of rev2) and returns the unique committers and authors who contributed, filtered by file globs if provided. Args: rev1 (str): The base revision (commit hash or tag). rev2 (str): The target revision (commit hash or tag). ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore. include_globs (Optional[List[str]]): List of glob patterns for files to include. Returns: dict: A dictionary with keys: - 'committers' (List[str]): Sorted list of unique committer names. - 'authors' (List[str]): Sorted list of unique author names. Note: Only commits that touch files matching the glob filters are considered. The range is interpreted as Git does: rev1..rev2 means commits reachable from rev2 but not rev1. """ commits = list(self.repo.iter_commits(f"{rev1}..{rev2}")) committers = set() authors = set() for c in commits: # Check if any file in commit matches globs files = self.__check_extension(c.stats.files, ignore_globs, include_globs) if not files: continue if hasattr(c.committer, "name"): committers.add(c.committer.name) if hasattr(c.author, "name"): authors.add(c.author.name) return {"committers": sorted(committers), "authors": sorted(authors)}
[docs] def files_changed_between_revs(self, rev1, rev2, ignore_globs=None, include_globs=None): """Lists files changed between two revisions. Returns a sorted list of all files changed between two arbitrary revisions (commits or tags), optionally filtered by glob patterns. Args: rev1 (str): The base revision (commit hash or tag). rev2 (str): The target revision (commit hash or tag). ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore. include_globs (Optional[List[str]]): List of glob patterns for files to include. Returns: List[str]: Sorted list of file paths changed between the two revisions. Note: If both ignore_globs and include_globs are provided, files must match an include pattern and not match any ignore patterns to be included. """ diff = self.repo.git.diff(rev1, rev2, "--name-only", "--no-renames") files = set() for fname in diff.splitlines(): if not fname.strip(): continue if not self.__check_extension({fname: None}, ignore_globs, include_globs): continue files.add(fname) return sorted(files)
@multicache(key_prefix="release_tag_summary", key_list=["tag_glob", "include_globs", "ignore_globs"]) def release_tag_summary(self, tag_glob=None, ignore_globs=None, include_globs=None): """Summarizes repository activity between release tags. For each tag (filtered by glob), computes the time since the previous tag, diff statistics, committers/authors involved, and files changed between tags. Returns a DataFrame with one row per tag and columns for all computed metrics. Args: tag_glob (Optional[Union[str, List[str]]]): Glob pattern(s) to filter tags (e.g., 'v*' or ['v*', 'release-*']). If None, all tags are included. ignore_globs (Optional[List[str]]): List of glob patterns for files to ignore in diff/commit analysis. include_globs (Optional[List[str]]): List of glob patterns for files to include in diff/commit analysis. Returns: pandas.DataFrame: DataFrame with columns: - tag (str): Tag name - tag_date (datetime): Tag creation date - commit_sha (str): SHA of the tagged commit - time_since_prev (float): Days since previous tag - insertions (int): Lines inserted since previous tag - deletions (int): Lines deleted since previous tag - net (int): Net lines changed since previous tag - files_changed (int): Number of files changed since previous tag - committers (List[str]): Committers between previous and current tag - authors (List[str]): Authors between previous and current tag - files (List[str]): Files changed between previous and current tag Note: The first tag in the sorted list will have NaN for time_since_prev and empty diff/commit info. Tag filtering uses fnmatch and supports multiple globs. """ tags_df = self.tags().reset_index() if tags_df.empty: return pd.DataFrame( columns=[ "tag", "tag_date", "commit_sha", "time_since_prev", "insertions", "deletions", "net", "files_changed", "committers", "authors", "files", ] ) # Filter tags by glob if tag_glob is not None: if isinstance(tag_glob, str): tag_glob = [tag_glob] tags_df = tags_df[tags_df["tag"].apply(lambda t: any(fnmatch.fnmatch(t, g) for g in tag_glob))] if tags_df.empty: return pd.DataFrame( columns=[ "tag", "tag_date", "commit_sha", "time_since_prev", "insertions", "deletions", "net", "files_changed", "committers", "authors", "files", ] ) # Sort by tag_date ascending tags_df = tags_df.sort_values("tag_date").reset_index(drop=True) rows = [] prev_sha = None for _idx, row in tags_df.iterrows(): tag = row["tag"] tag_date = row["tag_date"] commit_sha = row["commit_sha"] if prev_sha is not None: time_since_prev = self.time_between_revs(prev_sha, commit_sha) diff_stats = self.diff_stats_between_revs(prev_sha, commit_sha, ignore_globs, include_globs) commit_info = self.committers_between_revs(prev_sha, commit_sha, ignore_globs, include_globs) files = self.files_changed_between_revs(prev_sha, commit_sha, ignore_globs, include_globs) else: time_since_prev = float("nan") diff_stats = {"insertions": 0, "deletions": 0, "net": 0, "files_changed": 0, "files": []} commit_info = {"committers": [], "authors": []} files = [] rows.append( { "tag": tag, "tag_date": tag_date, "commit_sha": commit_sha, "time_since_prev": time_since_prev, "insertions": diff_stats["insertions"], "deletions": diff_stats["deletions"], "net": diff_stats["net"], "files_changed": diff_stats["files_changed"], "committers": commit_info["committers"], "authors": commit_info["authors"], "files": files, } ) prev_sha = commit_sha return pd.DataFrame(rows)
[docs] def safe_fetch_remote(self, remote_name="origin", prune=False, dry_run=False): """Safely fetch changes from remote repository. Fetches the latest changes from a remote repository without modifying the working directory. This is a read-only operation that only updates remote-tracking branches. Args: remote_name (str, optional): Name of remote to fetch from. Defaults to 'origin'. prune (bool, optional): Remove remote-tracking branches that no longer exist on remote. Defaults to False. dry_run (bool, optional): Show what would be fetched without actually fetching. Defaults to False. Returns: dict: Fetch results with keys: - success (bool): Whether the fetch was successful - message (str): Status message or error description - remote_exists (bool): Whether the specified remote exists - changes_available (bool): Whether new changes were fetched - error (Optional[str]): Error message if fetch failed Note: This method is safe as it only fetches remote changes and never modifies the working directory or current branch. It will not perform any merges, rebases, or checkouts. """ logger.info(f"Attempting to safely fetch from remote '{remote_name}' (dry_run={dry_run})") result = {"success": False, "message": "", "remote_exists": False, "changes_available": False, "error": None} try: # Check if we have any remotes if not self.repo.remotes: result["message"] = "No remotes configured for this repository" logger.warning(f"No remotes configured for repository '{self.repo_name}'") return result # Check if the specified remote exists remote_names = [remote.name for remote in self.repo.remotes] if remote_name not in remote_names: result["message"] = f"Remote '{remote_name}' not found. Available remotes: {remote_names}" logger.warning(f"Remote '{remote_name}' not found in repository '{self.repo_name}'") return result result["remote_exists"] = True remote = self.repo.remote(remote_name) # Perform dry run if requested if dry_run: try: # Get remote refs to see what's available remote_refs = list(remote.refs) result["message"] = f"Dry run: Would fetch from {remote.url}. Remote has {len(remote_refs)} refs." result["success"] = True logger.info(f"Dry run completed for remote '{remote_name}' in repository '{self.repo_name}'") return result except Exception as e: result["error"] = f"Dry run failed: {str(e)}" logger.error(f"Dry run failed for remote '{remote_name}' in repository '{self.repo_name}': {e}") return result # Perform the actual fetch try: logger.info(f"Fetching from remote '{remote_name}' in repository '{self.repo_name}'") fetch_info = remote.fetch(prune=prune) # Check if any changes were fetched changes_available = len(fetch_info) > 0 result["changes_available"] = changes_available if changes_available: fetched_refs = [info.ref.name for info in fetch_info if info.ref] result["message"] = f"Successfully fetched {len(fetch_info)} updates. Updated refs: {fetched_refs}" logger.info( f"Fetch completed with {len(fetch_info)} updates from '{remote_name}' " f"in repository '{self.repo_name}'" ) else: result["message"] = f"Fetch completed - repository is up to date with '{remote_name}'" logger.info(f"Repository '{self.repo_name}' is up to date with remote '{remote_name}'") result["success"] = True except Exception as e: result["error"] = f"Fetch failed: {str(e)}" logger.error(f"Fetch failed for remote '{remote_name}' in repository '{self.repo_name}': {e}") except Exception as e: result["error"] = f"Unexpected error: {str(e)}" logger.error( f"Unexpected error during fetch from remote '{remote_name}' in repository '{self.repo_name}': {e}" ) return result
[docs] def warm_cache(self, methods=None, **kwargs): """Pre-populate cache with commonly used data. Executes a set of commonly used repository analysis methods to populate the cache, improving performance for subsequent calls. Only methods that support caching will be executed. Args: methods (Optional[List[str]]): List of method names to pre-warm. If None, uses a default set of commonly used methods. Available methods: - 'commit_history': Load commit history - 'branches': Load branch information - 'tags': Load tag information - 'blame': Load blame information - 'file_detail': Load file details - 'list_files': Load file listing - 'file_change_rates': Load file change statistics **kwargs: Additional keyword arguments to pass to the methods. Common arguments include: - branch: Branch to analyze (default: repository's default branch) - limit: Limit number of commits to analyze - ignore_globs: List of glob patterns to ignore - include_globs: List of glob patterns to include Returns: dict: Results of cache warming operations with keys: - success (bool): Whether cache warming was successful - methods_executed (List[str]): List of methods that were executed - methods_failed (List[str]): List of methods that failed - cache_entries_created (int): Number of cache entries created - execution_time (float): Total execution time in seconds - errors (List[str]): List of error messages for failed methods Note: This method will only execute methods if a cache backend is configured. If no cache backend is available, it will return immediately with a success status but no methods executed. """ logger.info(f"Starting cache warming for repository '{self.repo_name}'") result = { "success": False, "methods_executed": [], "methods_failed": [], "cache_entries_created": 0, "execution_time": 0.0, "errors": [], } import time start_time = time.time() # Check if caching is enabled if self.cache_backend is None: result["success"] = True result["execution_time"] = time.time() - start_time logger.info(f"No cache backend configured for repository '{self.repo_name}' - skipping cache warming") return result # Default methods to warm if none specified if methods is None: methods = ["commit_history", "branches", "tags", "blame", "file_detail", "list_files"] # Get initial cache size initial_cache_size = len(self.cache_backend._cache) if hasattr(self.cache_backend, "_cache") else 0 # Execute each method to warm the cache for method_name in methods: try: if not hasattr(self, method_name): result["methods_failed"].append(method_name) result["errors"].append(f"Method '{method_name}' not found") logger.warning(f"Method '{method_name}' not found in repository '{self.repo_name}'") continue method = getattr(self, method_name) # Execute method with provided kwargs logger.debug(f"Executing method '{method_name}' for cache warming in repository '{self.repo_name}'") # Handle special cases for method arguments method_kwargs = kwargs.copy() # For methods that might need specific arguments if method_name in ["commit_history", "file_change_rates"]: # Set reasonable defaults if not provided if "limit" not in method_kwargs: method_kwargs["limit"] = 100 # Reasonable default for cache warming elif method_name == "list_files": # list_files doesn't accept limit parameter, remove it if present method_kwargs.pop("limit", None) # Execute the method _ = method(**method_kwargs) result["methods_executed"].append(method_name) logger.debug( f"Successfully executed method '{method_name}' for cache warming in repository '{self.repo_name}'" ) except Exception as e: result["methods_failed"].append(method_name) error_msg = f"Method '{method_name}' failed: {str(e)}" result["errors"].append(error_msg) logger.error(f"Cache warming failed for method '{method_name}' in repository '{self.repo_name}': {e}") # Calculate cache entries created final_cache_size = len(self.cache_backend._cache) if hasattr(self.cache_backend, "_cache") else 0 result["cache_entries_created"] = final_cache_size - initial_cache_size # Calculate execution time result["execution_time"] = time.time() - start_time # Determine overall success result["success"] = len(result["methods_executed"]) > 0 if result["success"]: logger.info( f"Cache warming completed for repository '{self.repo_name}'. " f"Executed {len(result['methods_executed'])} methods, " f"created {result['cache_entries_created']} cache entries " f"in {result['execution_time']:.2f} seconds" ) else: logger.warning( f"Cache warming failed for repository '{self.repo_name}'. " f"No methods executed successfully. Errors: {result['errors']}" ) return result
[docs] def invalidate_cache(self, keys=None, pattern=None): """Invalidate specific cache entries or all cache entries for this repository. Args: keys (Optional[List[str]]): List of specific cache keys to invalidate pattern (Optional[str]): Pattern to match cache keys (supports * wildcard) Returns: int: Number of cache entries invalidated Note: If both keys and pattern are None, all cache entries for this repository are invalidated. Cache keys are automatically prefixed with repository name. """ if self.cache_backend is None: logger.warning(f"No cache backend configured for repository '{self.repo_name}' - cannot invalidate cache") return 0 if not hasattr(self.cache_backend, "invalidate_cache"): logger.warning(f"Cache backend {type(self.cache_backend).__name__} does not support cache invalidation") return 0 # If specific keys provided, prefix them with repo name prefixed_keys = None if keys: prefixed_keys = [f"*||{self.repo_name}||*{key}*" if not key.startswith("*") else key for key in keys] # If pattern provided, include repo name in pattern repo_pattern = None if pattern: repo_pattern = f"*||{self.repo_name}||*{pattern}*" elif keys is None: # No keys or pattern specified, invalidate all for this repo repo_pattern = f"*||{self.repo_name}||*" try: if prefixed_keys and repo_pattern: # Both keys and pattern specified count1 = self.cache_backend.invalidate_cache(pattern=repo_pattern) count2 = self.cache_backend.invalidate_cache(pattern=prefixed_keys[0] if prefixed_keys else None) return count1 + count2 elif prefixed_keys: # Only keys specified return sum(self.cache_backend.invalidate_cache(pattern=key) for key in prefixed_keys) else: # Only pattern (or neither, defaulting to repo pattern) return self.cache_backend.invalidate_cache(pattern=repo_pattern) except Exception as e: logger.error(f"Error invalidating cache for repository '{self.repo_name}': {e}") return 0
[docs] def get_cache_stats(self): """Get cache statistics for this repository. Returns: dict: Cache statistics including repository-specific and global cache information """ if self.cache_backend is None: return { "repository": self.repo_name, "cache_backend": None, "repository_entries": 0, "global_cache_stats": None, } # Get global cache stats global_stats = None if hasattr(self.cache_backend, "get_cache_stats"): try: global_stats = self.cache_backend.get_cache_stats() except Exception as e: logger.error(f"Error getting global cache stats: {e}") # Count repository-specific entries repo_entries = 0 if hasattr(self.cache_backend, "list_cached_keys"): try: all_keys = self.cache_backend.list_cached_keys() repo_entries = len([key for key in all_keys if self.repo_name in str(key.get("key", ""))]) except Exception as e: logger.error(f"Error counting repository cache entries: {e}") return { "repository": self.repo_name, "cache_backend": type(self.cache_backend).__name__, "repository_entries": repo_entries, "global_cache_stats": global_stats, }
[docs] class GitFlowRepository(Repository): """ A special case where git flow is followed, so we know something about the branching scheme """
[docs] def __init__(self): super().__init__()