Source code for gitpandas.cache

import gzip
import logging
import os
import pickle
import threading
from datetime import datetime, timezone

try:
    import redis

    _HAS_REDIS = True
except ImportError:
    _HAS_REDIS = False


[docs] class CacheEntry: """Wrapper for cached values that includes metadata."""
[docs] def __init__(self, data, cache_key=None): self.data = data self.cached_at = datetime.now(timezone.utc) self.cache_key = cache_key
[docs] def to_dict(self): """Convert to dictionary for serialization.""" return {"data": self.data, "cached_at": self.cached_at.isoformat(), "cache_key": self.cache_key}
[docs] @classmethod def from_dict(cls, d): """Create CacheEntry from dictionary.""" entry = cls.__new__(cls) entry.data = d["data"] entry.cached_at = datetime.fromisoformat(d["cached_at"]) entry.cache_key = d.get("cache_key") return entry
[docs] def age_seconds(self): """Return age of cache entry in seconds.""" return (datetime.now(timezone.utc) - self.cached_at).total_seconds()
[docs] def age_minutes(self): """Return age of cache entry in minutes.""" return self.age_seconds() / 60
[docs] def age_hours(self): """Return age of cache entry in hours.""" return self.age_minutes() / 60
[docs] def multicache(key_prefix, key_list, skip_if=None): """Decorator to cache the results of a method call. Args: key_prefix (str): Prefix for the cache key. key_list (list[str]): List of argument names (from kwargs) to include in the cache key. skip_if (callable, optional): A function that takes kwargs and returns True if caching should be skipped entirely (no read, no write). Defaults to None. The decorated method can accept an optional `force_refresh=True` argument to bypass the cache read but still update the cache with the new result. This force_refresh state propagates to nested calls on the same object instance. """ def multicache_nest(func): def deco(self, *args, **kwargs): # If no cache backend, just run the function if self.cache_backend is None: return func(self, *args, **kwargs) # Check if caching should be skipped entirely based on skip_if if skip_if is not None and skip_if(kwargs): logging.debug(f"Cache skipped entirely for {key_prefix} due to skip_if condition.") return func(self, *args, **kwargs) # Check for propagation flag first, then explicit force_refresh argument is_propagated_force = getattr(self, "_is_forcing_refresh", False) explicit_force_refresh = kwargs.pop("force_refresh", False) force_refresh = is_propagated_force or explicit_force_refresh # Generate the cache key (ensure force_refresh itself is not part of the key) # Use || as delimiter to avoid conflicts with repository names containing underscores key_parts = [str(kwargs.get(k)) for k in key_list] key = f"{key_prefix}||{self.repo_name}||{'_'.join(key_parts)}" logging.debug(f"Cache key generated for {key_prefix}: {key}") # Explicitly log force refresh bypass of cache read if force_refresh: logging.info( f"Force refresh active (propagated: {is_propagated_force}, explicit: {explicit_force_refresh}) " f"for key: {key}, bypassing cache read." ) cache_hit = False cache_entry = None # Try retrieving from cache if not forcing refresh if not force_refresh: try: # Include DiskCache in the type check if isinstance(self.cache_backend, EphemeralCache | RedisDFCache | DiskCache): # Use internal method to get CacheEntry for timestamp tracking if hasattr(self.cache_backend, "_get_entry"): cache_entry = self.cache_backend._get_entry(key) else: # Fallback for cache backends that don't have _get_entry cache_entry = self.cache_backend.get(key) if not isinstance(cache_entry, CacheEntry): cache_entry = CacheEntry(cache_entry, cache_key=key) logging.debug(f"Cache hit for key: {key}, cached at: {cache_entry.cached_at}") cache_hit = True return cache_entry.data else: logging.warning(f"Cannot get from cache; unknown backend type: {type(self.cache_backend)}") except CacheMissError: logging.debug(f"Cache miss for key: {key}") pass # Proceed to execute the function except Exception as e: logging.error(f"Error getting cache for key {key}: {e}", exc_info=True) # Execute the function if cache missed, force_refresh is True, or cache read failed if not cache_hit: # Set the temporary flag *before* calling the function if we are forcing refresh set_force_flag = force_refresh and not is_propagated_force if set_force_flag: self._is_forcing_refresh = True logging.debug(f"Setting _is_forcing_refresh flag for nested calls originating from {key_prefix}") try: logging.debug(f"Executing function for key: {key} (Force Refresh: {force_refresh})") ret = func(self, *args, **kwargs) # Store the result in the cache (always happens after function execution) try: if isinstance(self.cache_backend, EphemeralCache | RedisDFCache | DiskCache): cache_entry = CacheEntry(ret, cache_key=key) self.cache_backend.set(key, cache_entry) logging.debug(f"Cache set for key: {key} at: {cache_entry.cached_at}") else: logging.warning(f"Cannot set cache; unknown backend type: {type(self.cache_backend)}") except Exception as e: logging.error(f"Failed to set cache for key {key}: {e}", exc_info=True) return ret finally: # Always remove the flag after the function call completes or errors if set_force_flag: delattr(self, "_is_forcing_refresh") logging.debug(f"Removed _is_forcing_refresh flag after call to {key_prefix}") # This should only be reached if cache_hit was True and force_refresh was False # The earlier return inside the cache hit block handles this case. # Adding an explicit return here for clarity, although theoretically unreachable. return cache_entry.data if cache_entry else None return deco return multicache_nest
class CacheMissError(Exception): pass class DataFrameEncodingError(Exception): """Custom exception for errors during DataFrame encoding/decoding.""" pass
[docs] class EphemeralCache: """ A simple in-memory cache. """
[docs] def __init__(self, max_keys=1000): self._cache = {} self._key_list = [] self._max_keys = max_keys
[docs] def evict(self, n=1): for _ in range(n): key = self._key_list.pop(0) del self._cache[key]
[docs] def set(self, k, v): try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: self._key_list.append(k) # Ensure we're storing a CacheEntry if not isinstance(v, CacheEntry): v = CacheEntry(v, cache_key=k) self._cache[k] = v if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) # Add empty save method call for compatibility with DiskCache if hasattr(self, "save"): self.save() # Only call save if it exists to maintain backward compatibility
[docs] def get(self, k): if self.exists(k): # Move the key to the end of the list (most recently used) idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) entry = self._cache[k] # For backward compatibility, return raw data for direct cache access # The multicache decorator will handle CacheEntry objects separately if isinstance(entry, CacheEntry): return entry.data else: return entry else: raise CacheMissError(k)
[docs] def _get_entry(self, k): """Internal method that returns the CacheEntry object.""" if self.exists(k): # Move the key to the end of the list (most recently used) idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) entry = self._cache[k] # Ensure we return a CacheEntry if isinstance(entry, CacheEntry): return entry else: return CacheEntry(entry, cache_key=k) else: raise CacheMissError(k)
[docs] def exists(self, k): return k in self._cache
[docs] def get_cache_info(self, k): """Get cache entry metadata for a key.""" if self.exists(k): entry = self._cache[k] if isinstance(entry, CacheEntry): return { "cached_at": entry.cached_at, "age_seconds": entry.age_seconds(), "age_minutes": entry.age_minutes(), "age_hours": entry.age_hours(), "cache_key": entry.cache_key, } return None
[docs] def list_cached_keys(self): """List all cached keys with their metadata.""" result = [] for key in self._key_list: if key in self._cache: entry = self._cache[key] if isinstance(entry, CacheEntry): result.append( { "key": key, "cached_at": entry.cached_at, "age_seconds": entry.age_seconds(), "cache_key": entry.cache_key, } ) return result
[docs] def invalidate_cache(self, keys=None, pattern=None): """Invalidate specific cache entries or all entries. Args: keys (Optional[List[str]]): List of specific keys to invalidate pattern (Optional[str]): Pattern to match keys (supports * wildcard) Note: If both keys and pattern are None, all cache entries are invalidated. """ import fnmatch if keys is None and pattern is None: # Clear all cache entries self._cache.clear() self._key_list.clear() return len(self._cache) keys_to_remove = [] if keys: # Remove specific keys for key in keys: if key in self._cache: keys_to_remove.append(key) if pattern: # Remove keys matching pattern for key in self._key_list: if fnmatch.fnmatch(key, pattern): keys_to_remove.append(key) # Remove duplicates keys_to_remove = list(set(keys_to_remove)) # Actually remove the keys for key in keys_to_remove: if key in self._cache: del self._cache[key] if key in self._key_list: self._key_list.remove(key) return len(keys_to_remove)
[docs] def get_cache_stats(self): """Get comprehensive cache statistics. Returns: dict: Cache statistics including size, hit rates, and age information """ if not self._cache: return { "total_entries": 0, "max_entries": self._max_keys, "cache_usage_percent": 0.0, "oldest_entry_age_hours": None, "newest_entry_age_hours": None, "average_entry_age_hours": None, } ages_hours = [] for entry in self._cache.values(): if isinstance(entry, CacheEntry): ages_hours.append(entry.age_hours()) stats = { "total_entries": len(self._cache), "max_entries": self._max_keys, "cache_usage_percent": (len(self._cache) / self._max_keys) * 100.0, } if ages_hours: stats.update( { "oldest_entry_age_hours": max(ages_hours), "newest_entry_age_hours": min(ages_hours), "average_entry_age_hours": sum(ages_hours) / len(ages_hours), } ) else: stats.update( {"oldest_entry_age_hours": None, "newest_entry_age_hours": None, "average_entry_age_hours": None} ) return stats
# Add empty save method for compatibility with DiskCache
[docs] def save(self): """Empty save method for compatibility with DiskCache.""" pass
[docs] class DiskCache(EphemeralCache): """ An in-memory cache that can be persisted to disk using pickle. Inherits LRU eviction logic from EphemeralCache. Thread-safe for concurrent access. """
[docs] def __init__(self, filepath, max_keys=1000): """ Initializes the cache. Tries to load from the specified filepath if it exists. :param filepath: Path to the file for persisting the cache. :param max_keys: Maximum number of keys to keep in the cache (LRU). """ super().__init__(max_keys=max_keys) self.filepath = filepath self._lock = threading.RLock() # Reentrant lock for thread safety self.load() # Attempt to load existing cache on initialization
[docs] def set(self, k, v): """ Thread-safe set operation that prevents nested save calls. """ with self._lock: # Temporarily disable automatic save during parent set() to prevent nested calls original_save = getattr(self, "save", None) self.save = lambda: None # Temporarily disable save try: # Call parent set without triggering save super().set(k, v) finally: # Restore original save method if original_save: self.save = original_save # Now save once, safely under lock self.save()
[docs] def get(self, k): """ Thread-safe get operation with disk loading capability. """ with self._lock: # Initial check in memory if k in self._cache: # Move the key to the end of the list (most recently used) try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: # Should not happen if k is in _cache, but handle defensively # If key is in cache but not list, add it to the list (end) self._key_list.append(k) # If list exceeds max keys due to this, handle it (though ideally cache and list are always synced) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) entry = self._cache[k] # For backward compatibility, return raw data if isinstance(entry, CacheEntry): return entry.data else: return entry else: # Key not in memory, try loading from disk logging.debug(f"Key '{k}' not in memory cache, attempting disk load.") self.load() # Check again after loading if k in self._cache: logging.debug(f"Key '{k}' found in cache after disk load.") # Update LRU list as it was just accessed try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: # Add to list if somehow missing after load self._key_list.append(k) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) entry = self._cache[k] # For backward compatibility, return raw data if isinstance(entry, CacheEntry): return entry.data else: return entry else: # Key not found even after loading from disk logging.debug(f"Key '{k}' not found after attempting disk load.") raise CacheMissError(k)
[docs] def _get_entry(self, k): """Internal method that returns the CacheEntry object.""" with self._lock: # Initial check in memory if k in self._cache: # Move the key to the end of the list (most recently used) try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: self._key_list.append(k) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) entry = self._cache[k] # Ensure we return a CacheEntry if isinstance(entry, CacheEntry): return entry else: return CacheEntry(entry, cache_key=k) else: # Key not in memory, try loading from disk self.load() # Check again after loading if k in self._cache: entry = self._cache[k] # Update LRU list try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: self._key_list.append(k) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) # Ensure we return a CacheEntry if isinstance(entry, CacheEntry): return entry else: return CacheEntry(entry, cache_key=k) else: raise CacheMissError(k)
[docs] def exists(self, k): """ Thread-safe exists check. """ with self._lock: return super().exists(k)
[docs] def evict(self, n=1): """ Thread-safe eviction. """ with self._lock: super().evict(n)
[docs] def load(self): """ Loads the cache state (_cache dictionary and _key_list) from the specified filepath using pickle. Handles file not found and deserialization errors. Thread-safe operation. """ with self._lock: if not os.path.exists(self.filepath) or os.path.getsize(self.filepath) == 0: logging.info(f"Cache file not found or empty, starting fresh: {self.filepath}") return # Start with an empty cache try: with gzip.open(self.filepath, "rb") as f: # Use binary mode 'rb' for pickle loaded_data = pickle.load(f) if not isinstance(loaded_data, dict) or "_cache" not in loaded_data or "_key_list" not in loaded_data: logging.warning(f"Invalid cache file format found: {self.filepath}. Starting fresh.") self._cache = {} self._key_list = [] return # Handle backward compatibility with old cache format cache_data = loaded_data["_cache"] for key, value in cache_data.items(): if not isinstance(value, CacheEntry): # Convert old cache entries to new format cache_data[key] = CacheEntry(value, cache_key=key) # Directly assign loaded data as pickle handles complex objects self._cache = cache_data self._key_list = loaded_data["_key_list"] # Ensure consistency after loading (LRU eviction) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys) logging.info(f"Cache loaded successfully from {self.filepath} using pickle.") except ( gzip.BadGzipFile, pickle.UnpicklingError, EOFError, TypeError, Exception, ) as e: # Catch pickle errors logging.error(f"Error loading cache file {self.filepath} (pickle/Gzip): {e}. Starting fresh.") self._cache = {} self._key_list = [] except OSError as e: logging.error(f"OS error loading cache file {self.filepath}: {e}. Starting fresh.") self._cache = {} self._key_list = []
[docs] def save(self): """ Saves the current cache state (_cache dictionary and _key_list) to the specified filepath using pickle. Creates parent directories if needed. Thread-safe operation. """ with self._lock: try: # Ensure parent directory exists parent_dir = os.path.dirname(self.filepath) if parent_dir: os.makedirs(parent_dir, exist_ok=True) # Save cache and key list to gzipped pickle file data_to_save = {"_cache": self._cache, "_key_list": self._key_list} with gzip.open(self.filepath, "wb") as f: # Use binary mode 'wb' for pickle pickle.dump(data_to_save, f, protocol=pickle.HIGHEST_PROTOCOL) # Use highest protocol logging.info(f"Cache saved successfully to {self.filepath} using pickle.") except (OSError, pickle.PicklingError, Exception) as e: # Catch pickle errors logging.error(f"Error saving cache file {self.filepath}: {e}")
[docs] class RedisDFCache: """ A redis based cache, using redis-py under the hood. :param host: default localhost :param port: default 6379 :param db: the database to use, default 12 :param max_keys: the max number of keys to cache, default 1000 :param ttl: time to live for any cached results, default None :param kwargs: additional options available to redis.StrictRedis """
[docs] def __init__(self, host="localhost", port=6379, db=12, max_keys=1000, ttl=None, **kwargs): if not _HAS_REDIS: raise ImportError("Need redis installed to use redis cache") self._cache = redis.StrictRedis(host=host, port=port, db=db, **kwargs) self._key_list = [] self._max_keys = max_keys self.ttl = ttl self.prefix = "gitpandas_" # sync with any keys that already exist in this database (order will not be preserved) self.sync()
[docs] def evict(self, n=1): for _ in range(n): key = self._key_list.pop(0) self._cache.delete(key)
[docs] def set(self, orik, v): k = self.prefix + orik try: idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) except ValueError: self._key_list.append(k) # Ensure we're storing a CacheEntry if not isinstance(v, CacheEntry): v = CacheEntry(v, cache_key=orik) # Use pickle instead of msgpack for DataFrame serialization self._cache.set(k, pickle.dumps(v), ex=self.ttl) if len(self._key_list) > self._max_keys: self.evict(len(self._key_list) - self._max_keys)
[docs] def get(self, orik): k = self.prefix + orik if self.exists(orik): # Move the key to the end of the list (most recently used) idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) # Use pickle instead of msgpack for DataFrame deserialization cached_value = pickle.loads(self._cache.get(k)) # Handle backward compatibility with old cache format if not isinstance(cached_value, CacheEntry): cached_value = CacheEntry(cached_value, cache_key=orik) # For backward compatibility, return raw data return cached_value.data else: try: idx = self._key_list.index(k) self._key_list.pop(idx) except ValueError: pass raise CacheMissError(k)
[docs] def _get_entry(self, orik): """Internal method that returns the CacheEntry object.""" k = self.prefix + orik if self.exists(orik): # Move the key to the end of the list (most recently used) idx = self._key_list.index(k) self._key_list.pop(idx) self._key_list.append(k) # Use pickle instead of msgpack for DataFrame deserialization cached_value = pickle.loads(self._cache.get(k)) # Handle backward compatibility with old cache format if not isinstance(cached_value, CacheEntry): cached_value = CacheEntry(cached_value, cache_key=orik) return cached_value else: try: idx = self._key_list.index(k) self._key_list.pop(idx) except ValueError: pass raise CacheMissError(k)
[docs] def exists(self, k): k = self.prefix + k return self._cache.exists(k)
[docs] def sync(self): """ Syncs the key list with what is in redis. :return: None """ self._key_list = [x.decode("utf-8") for x in self._cache.keys(self.prefix + "*")]
[docs] def get_cache_info(self, orik): """Get cache entry metadata for a key.""" k = self.prefix + orik if self.exists(orik): try: cached_value = pickle.loads(self._cache.get(k)) if isinstance(cached_value, CacheEntry): return { "cached_at": cached_value.cached_at, "age_seconds": cached_value.age_seconds(), "age_minutes": cached_value.age_minutes(), "age_hours": cached_value.age_hours(), "cache_key": cached_value.cache_key, } except Exception: pass return None
[docs] def list_cached_keys(self): """List all cached keys with their metadata.""" result = [] for key in self._key_list: if key.startswith(self.prefix): orik = key[len(self.prefix) :] try: cached_value = pickle.loads(self._cache.get(key)) if isinstance(cached_value, CacheEntry): result.append( { "key": orik, "cached_at": cached_value.cached_at, "age_seconds": cached_value.age_seconds(), "cache_key": cached_value.cache_key, } ) except Exception: continue return result
[docs] def invalidate_cache(self, keys=None, pattern=None): """Invalidate specific cache entries or all entries. Args: keys (Optional[List[str]]): List of specific keys to invalidate (without prefix) pattern (Optional[str]): Pattern to match keys (supports * wildcard, without prefix) Note: If both keys and pattern are None, all cache entries are invalidated. """ import fnmatch if keys is None and pattern is None: # Clear all cache entries for key in list(self._key_list): if key.startswith(self.prefix): self._cache.delete(key) self._key_list = [k for k in self._key_list if not k.startswith(self.prefix)] return keys_to_remove = [] if keys: # Remove specific keys (add prefix) for key in keys: prefixed_key = self.prefix + key if prefixed_key in self._key_list: keys_to_remove.append(prefixed_key) if pattern: # Remove keys matching pattern for key in self._key_list: if key.startswith(self.prefix): orik = key[len(self.prefix) :] if fnmatch.fnmatch(orik, pattern): keys_to_remove.append(key) # Remove duplicates keys_to_remove = list(set(keys_to_remove)) # Actually remove the keys for key in keys_to_remove: self._cache.delete(key) if key in self._key_list: self._key_list.remove(key) return len(keys_to_remove)
[docs] def get_cache_stats(self): """Get comprehensive cache statistics. Returns: dict: Cache statistics including size, hit rates, and age information """ redis_keys = [k for k in self._key_list if k.startswith(self.prefix)] if not redis_keys: return { "total_entries": 0, "max_entries": self._max_keys, "cache_usage_percent": 0.0, "oldest_entry_age_hours": None, "newest_entry_age_hours": None, "average_entry_age_hours": None, } ages_hours = [] for key in redis_keys: try: cached_value = pickle.loads(self._cache.get(key)) if isinstance(cached_value, CacheEntry): ages_hours.append(cached_value.age_hours()) except Exception: continue stats = { "total_entries": len(redis_keys), "max_entries": self._max_keys, "cache_usage_percent": (len(redis_keys) / self._max_keys) * 100.0, } if ages_hours: stats.update( { "oldest_entry_age_hours": max(ages_hours), "newest_entry_age_hours": min(ages_hours), "average_entry_age_hours": sum(ages_hours) / len(ages_hours), } ) else: stats.update( {"oldest_entry_age_hours": None, "newest_entry_age_hours": None, "average_entry_age_hours": None} ) return stats
[docs] def purge(self): for key in self._cache.scan_iter(f"{self.prefix}*"): self._cache.delete(key)