from __future__ import annotations
import os
from fnmatch import fnmatch
from logging import getLogger
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath
from re import DOTALL, compile as regexp, error as RegexError # noqa: N812
from typing import TYPE_CHECKING
from semantic_release.commit_parser._base import CommitParser
from semantic_release.commit_parser.conventional.options import (
ConventionalCommitParserOptions,
)
from semantic_release.commit_parser.conventional.options_monorepo import (
ConventionalCommitMonorepoParserOptions,
)
from semantic_release.commit_parser.conventional.parser import ConventionalCommitParser
from semantic_release.commit_parser.token import (
ParsedCommit,
ParsedMessageResult,
ParseError,
ParseResult,
)
from semantic_release.commit_parser.util import force_str
from semantic_release.errors import InvalidParserOptions
if TYPE_CHECKING: # pragma: no cover
from git.objects.commit import Commit
[docs]
class ConventionalCommitMonorepoParser(
CommitParser[ParseResult, ConventionalCommitMonorepoParserOptions]
):
# TODO: Remove for v11 compatibility, get_default_options() will be called instead
parser_options = ConventionalCommitMonorepoParserOptions
def __init__(
self, options: ConventionalCommitMonorepoParserOptions | None = None
) -> None:
super().__init__(options)
try:
commit_scope_pattern = regexp(
r"\(" + self.options.scope_prefix + r"(?P<scope>[^\n]+)?\)",
)
except RegexError as err:
raise InvalidParserOptions(
str.join(
"\n",
[
f"Invalid options for {self.__class__.__name__}",
"Unable to create regular expression from configured scope_prefix.",
"Please check the configured scope_prefix and remove or escape any regular expression characters.",
],
)
) from err
try:
commit_type_pattern = regexp(
r"(?P<type>%s)" % str.join("|", self.options.allowed_tags)
)
except RegexError as err:
raise InvalidParserOptions(
str.join(
"\n",
[
f"Invalid options for {self.__class__.__name__}",
"Unable to create regular expression from configured commit-types.",
"Please check the configured commit-types and remove or escape any regular expression characters.",
],
)
) from err
# This regular expression includes scope prefix into the pattern and forces a scope to be present
# PSR will match the full scope but we don't include it in the scope match,
# which implicitly strips it from being included in the returned scope.
self._strict_scope_pattern = regexp(
str.join(
"",
[
r"^" + commit_type_pattern.pattern,
commit_scope_pattern.pattern,
r"(?P<break>!)?:\s+",
r"(?P<subject>[^\n]+)",
r"(?:\n\n(?P<text>.+))?", # commit body
],
),
flags=DOTALL,
)
self._optional_scope_pattern = regexp(
str.join(
"",
[
r"^" + commit_type_pattern.pattern,
r"(?:\((?P<scope>[^\n]+)\))?",
r"(?P<break>!)?:\s+",
r"(?P<subject>[^\n]+)",
r"(?:\n\n(?P<text>.+))?", # commit body
],
),
flags=DOTALL,
)
file_select_filters, file_ignore_filters = self._process_path_filter_options(
self.options.path_filters
)
self._file_selection_filters: list[str] = file_select_filters
self._file_ignore_filters: list[str] = file_ignore_filters
self._logger = getLogger(
str.join(".", [self.__module__, self.__class__.__name__])
)
self._base_parser = ConventionalCommitParser(
options=ConventionalCommitParserOptions(
**{
k: getattr(self.options, k)
for k in ConventionalCommitParserOptions().__dataclass_fields__
}
)
)
[docs]
def get_default_options(self) -> ConventionalCommitMonorepoParserOptions:
return ConventionalCommitMonorepoParserOptions()
@staticmethod
def _process_path_filter_options( # noqa: C901
path_filters: tuple[str, ...],
) -> tuple[list[str], list[str]]:
file_ignore_filters: list[str] = []
file_selection_filters: list[str] = []
unique_selection_filters: set[str] = set()
unique_ignore_filters: set[str] = set()
for str_path in path_filters:
str_filter = str_path[1:] if str_path.startswith("!") else str_path
filter_list = (
file_ignore_filters
if str_path.startswith("!")
else file_selection_filters
)
unique_cache = (
unique_ignore_filters
if str_path.startswith("!")
else unique_selection_filters
)
# Since fnmatch is not too flexible, we will expand the path filters to include the name and any subdirectories
# as this is how gitignore is interpreted. Possible scenarios:
# | Input | Path Normalization | Filter List |
# | ---------- | ------------------ | ------------------------- |
# | / | / | /** | done
# | /./ | / | /** | done
# | /** | /** | /** | done
# | /./** | /** | /** | done
# | /* | /* | /* | done
# | . | . | ./** | done
# | ./ | . | ./** | done
# | ././ | . | ./** | done
# | ./** | ./** | ./** | done
# | ./* | ./* | ./* | done
# | .. | .. | ../** | done
# | ../ | .. | ../** | done
# | ../** | ../** | ../** | done
# | ../* | ../* | ../* | done
# | ../.. | ../.. | ../../** | done
# | ../../ | ../../ | ../../** | done
# | ../../docs | ../../docs | ../../docs, ../../docs/** | done
# | src | src | src, src/** | done
# | src/ | src | src/** | done
# | src/* | src/* | src/* | done
# | src/** | src/** | src/** | done
# | /src | /src | /src, /src/** | done
# | /src/ | /src | /src/** | done
# | /src/** | /src/** | /src/** | done
# | /src/* | /src/* | /src/* | done
# | ../d/f.txt | ../d/f.txt | ../d/f.txt, ../d/f.txt/** | done
# This expansion will occur regardless of the negation prefix
os_path: PurePath | PurePosixPath | PureWindowsPath = PurePath(str_filter)
if r"\\" in str_filter:
# Windows paths were given so we convert them to posix paths
os_path = PureWindowsPath(str_filter)
os_path = (
PureWindowsPath(
os_path.root, *os_path.parts[1:]
) # drop any drive letter
if os_path.is_absolute()
else os_path
)
os_path = PurePosixPath(os_path.as_posix())
path_normalized = str(os_path)
if path_normalized == str(
Path(".").absolute().root
) or path_normalized == str(Path("/**")):
path_normalized = "/**"
elif path_normalized == str(Path("/*")):
pass
elif path_normalized == str(Path(".")) or path_normalized == str(
Path("./**")
):
path_normalized = "./**"
elif path_normalized == str(Path("./*")):
path_normalized = "./*"
elif path_normalized == str(Path("..")) or path_normalized == str(
Path("../**")
):
path_normalized = "../**"
elif path_normalized == str(Path("../*")):
path_normalized = "../*"
elif path_normalized.endswith(("..", "../**")):
path_normalized = f"{path_normalized.rstrip('*')}/**"
elif str_filter.endswith(os.sep):
# If the path ends with a separator, it is a directory, so we add the directory and all subdirectories
path_normalized = f"{path_normalized}/**"
elif not path_normalized.endswith("*"):
all_subdirs = f"{path_normalized}/**"
if all_subdirs not in unique_cache:
unique_cache.add(all_subdirs)
filter_list.append(all_subdirs)
# And fall through to add the path as is
# END IF
# Add the normalized path to the filter list if it is not already present
if path_normalized not in unique_cache:
unique_cache.add(path_normalized)
filter_list.append(path_normalized)
return file_selection_filters, file_ignore_filters
[docs]
def logged_parse_error(self, commit: Commit, error: str) -> ParseError:
self._logger.debug(error)
return ParseError(commit, error=error)
[docs]
def parse(self, commit: Commit) -> ParseResult | list[ParseResult]:
if self.options.ignore_merge_commits and self._base_parser.is_merge_commit(
commit
):
return self._base_parser.log_parse_error(
commit, "Ignoring merge commit: %s" % commit.hexsha[:8]
)
separate_commits: list[Commit] = (
self._base_parser.unsquash_commit(commit)
if self.options.parse_squash_commits
else [commit]
)
# Parse each commit individually if there were more than one
parsed_commits: list[ParseResult] = list(
map(self.parse_commit, separate_commits)
)
def add_linked_merge_request(
parsed_result: ParseResult, mr_number: str
) -> ParseResult:
return (
parsed_result
if not isinstance(parsed_result, ParsedCommit)
else ParsedCommit(
**{
**parsed_result._asdict(),
"linked_merge_request": mr_number,
}
)
)
# TODO: improve this for other VCS systems other than GitHub & BitBucket
# Github works as the first commit in a squash merge commit has the PR number
# appended to the first line of the commit message
lead_commit = next(iter(parsed_commits))
if isinstance(lead_commit, ParsedCommit) and lead_commit.linked_merge_request:
# If the first commit has linked merge requests, assume all commits
# are part of the same PR and add the linked merge requests to all
# parsed commits
parsed_commits = [
lead_commit,
*map(
lambda parsed_result, mr=lead_commit.linked_merge_request: ( # type: ignore[misc]
add_linked_merge_request(parsed_result, mr)
),
parsed_commits[1:],
),
]
elif isinstance(lead_commit, ParseError) and (
mr_match := self._base_parser.mr_selector.search(
force_str(lead_commit.message)
)
):
# Handle BitBucket Squash Merge Commits (see #1085), which have non angular commit
# format but include the PR number in the commit subject that we want to extract
linked_merge_request = mr_match.group("mr_number")
# apply the linked MR to all commits
parsed_commits = [
add_linked_merge_request(parsed_result, linked_merge_request)
for parsed_result in parsed_commits
]
return parsed_commits
[docs]
def parse_message(
self, message: str, strict_scope: bool = False
) -> ParsedMessageResult | None:
if (
not (parsed_match := self._strict_scope_pattern.match(message))
and strict_scope
):
return None
if not parsed_match and not (
parsed_match := self._optional_scope_pattern.match(message)
):
return None
return self._base_parser.create_parsed_message_result(parsed_match)
[docs]
def parse_commit(self, commit: Commit) -> ParseResult:
"""Attempt to parse the commit message with a regular expression into a ParseResult."""
# Multiple scenarios to consider when parsing a commit message [Truth table]:
# =======================================================================================================
# | || INPUTS || |
# | # ||------------------------+----------------+--------------|| Result |
# | || Example Commit Message | Relevant Files | Scope Prefix || |
# |----||------------------------+----------------+--------------||-------------------------------------|
# | 1 || type(prefix-cli): msg | yes | "prefix-" || ParsedCommit |
# | 2 || type(prefix-cli): msg | yes | "" || ParsedCommit |
# | 3 || type(prefix-cli): msg | no | "prefix-" || ParsedCommit |
# | 4 || type(prefix-cli): msg | no | "" || ParseError[No files] |
# | 5 || type(scope-cli): msg | yes | "prefix-" || ParsedCommit |
# | 6 || type(scope-cli): msg | yes | "" || ParsedCommit |
# | 7 || type(scope-cli): msg | no | "prefix-" || ParseError[No files & wrong scope] |
# | 8 || type(scope-cli): msg | no | "" || ParseError[No files] |
# | 9 || type(cli): msg | yes | "prefix-" || ParsedCommit |
# | 10 || type(cli): msg | yes | "" || ParsedCommit |
# | 11 || type(cli): msg | no | "prefix-" || ParseError[No files & wrong scope] |
# | 12 || type(cli): msg | no | "" || ParseError[No files] |
# | 13 || type: msg | yes | "prefix-" || ParsedCommit |
# | 14 || type: msg | yes | "" || ParsedCommit |
# | 15 || type: msg | no | "prefix-" || ParseError[No files & wrong scope] |
# | 16 || type: msg | no | "" || ParseError[No files] |
# | 17 || non-conventional msg | yes | "prefix-" || ParseError[Invalid Syntax] |
# | 18 || non-conventional msg | yes | "" || ParseError[Invalid Syntax] |
# | 19 || non-conventional msg | no | "prefix-" || ParseError[Invalid Syntax] |
# | 20 || non-conventional msg | no | "" || ParseError[Invalid Syntax] |
# =======================================================================================================
# Initial Logic Flow:
# [1] When there are no relevant files and a scope prefix is defined, we enforce a strict scope
# [2] When there are no relevant files and no scope prefix is defined, we parse scoped or unscoped commits
# [3] When there are relevant files, we parse scoped or unscoped commits regardless of any defined prefix
has_relevant_changed_files = self._has_relevant_changed_files(commit)
strict_scope = bool(
not has_relevant_changed_files and self.options.scope_prefix
)
pmsg_result = self.parse_message(
message=force_str(commit.message),
strict_scope=strict_scope,
)
if pmsg_result and (has_relevant_changed_files or strict_scope):
self._logger.debug(
"commit %s introduces a %s level_bump",
commit.hexsha[:8],
pmsg_result.bump,
)
return ParsedCommit.from_parsed_message_result(commit, pmsg_result)
if pmsg_result and not has_relevant_changed_files:
return self.logged_parse_error(
commit,
f"Commit {commit.hexsha[:7]} has no changed files matching the path filter(s)",
)
if strict_scope and self.parse_message(str(commit.message), strict_scope=False):
return self.logged_parse_error(
commit,
str.join(
" and ",
[
f"Commit {commit.hexsha[:7]} has no changed files matching the path filter(s)",
f"the scope does not match scope prefix '{self.options.scope_prefix}'",
],
),
)
return self.logged_parse_error(
commit,
f"Format Mismatch! Unable to parse commit message: {commit.message!r}",
)
[docs]
def unsquash_commit_message(self, message: str) -> list[str]:
return self._base_parser.unsquash_commit_message(message)
def _has_relevant_changed_files(self, commit: Commit) -> bool:
# Extract git root from commit
git_root = (
Path(commit.repo.working_tree_dir or commit.repo.working_dir)
.absolute()
.resolve()
)
cwd = Path.cwd().absolute().resolve()
rel_cwd = cwd.relative_to(git_root) if git_root in cwd.parents else Path(".")
sandboxed_selection_filters: list[str] = [
str(file_filter)
for file_filter in (
(
git_root / select_filter.rstrip("/")
if Path(select_filter).is_absolute()
else git_root / rel_cwd / select_filter
)
for select_filter in self._file_selection_filters
)
if git_root in file_filter.parents
]
sandboxed_ignore_filters: list[str] = [
str(file_filter)
for file_filter in (
(
git_root / ignore_filter.rstrip("/")
if Path(ignore_filter).is_absolute()
else git_root / rel_cwd / ignore_filter
)
for ignore_filter in self._file_ignore_filters
)
if git_root in file_filter.parents
]
# Check if the changed files of the commit that match the path filters
for full_path in iter(
str(git_root / rel_git_path) for rel_git_path in commit.stats.files
):
# Check if the filepath matches any of the file selection filters
if not any(
fnmatch(full_path, select_filter)
for select_filter in sandboxed_selection_filters
):
continue
# Pass filter matches, so now evaluate if it is supposed to be ignored
if not any(
fnmatch(full_path, ignore_filter)
for ignore_filter in sandboxed_ignore_filters
):
# No ignore filter matched, so it must be a relevant file
return True
return False