150aca7d2a
Fixes: https://pagure.io/fedora-rust/rust2rpm/issue/101 Signed-off-by: Igor Gnatenko <ignatenkobrain@fedoraproject.org>
371 lines
13 KiB
Python
371 lines
13 KiB
Python
__all__ = ["Dependency", "Metadata"]
|
|
|
|
import collections
|
|
import copy
|
|
import json
|
|
import re
|
|
import subprocess
|
|
|
|
|
|
Requirement = collections.namedtuple('Requirement', ('kind',
|
|
'version'))
|
|
|
|
|
|
Version = collections.namedtuple('Version', ('major', 'minor',
|
|
'patch', 'pre_release',
|
|
'build'))
|
|
|
|
|
|
class CargoSemVer:
|
|
"""Cargo semantic versioning parser"""
|
|
KIND_ANY = '*'
|
|
KIND_LT = '<'
|
|
KIND_LTE = '<='
|
|
KIND_SHORTEQ = '='
|
|
KIND_EQUAL = '=='
|
|
KIND_EMPTY = ''
|
|
KIND_GTE = '>='
|
|
KIND_GT = '>'
|
|
KIND_NEQ = '!='
|
|
KIND_CARET = '^'
|
|
KIND_TILDE = '~'
|
|
KIND_COMPATIBLE = '~='
|
|
|
|
def __init__(self, requirement):
|
|
requirements = requirement.replace(' ', '').split(',')
|
|
self.requirements = [self.parse(i) for i in requirements]
|
|
self.normalized = [j for i in self.requirements
|
|
for j in self.normalize(i)]
|
|
|
|
@staticmethod
|
|
def parse(requirement):
|
|
if not requirement:
|
|
raise ValueError(f'Invalid empty requirement '
|
|
f'specification: {requirement}')
|
|
|
|
match = re.match(
|
|
r'^(?:([\d.]*\*))$|^(?:(<|<=|=|==|>=|>||!=|\^|~|~=)(\d.*))$',
|
|
requirement)
|
|
if not match:
|
|
raise ValueError(f'Invalid requirement '
|
|
f'specification: {requirement}')
|
|
|
|
wildcard, kind, version = match.groups()
|
|
if wildcard:
|
|
version = wildcard.replace('.*', '').replace('*', '')
|
|
kind = CargoSemVer.KIND_ANY
|
|
return Requirement(kind, CargoSemVer.parse_version(version))
|
|
|
|
@staticmethod
|
|
def parse_version(version):
|
|
match = re.match(
|
|
r'^(\d+)?(?:\.(\d+))?(?:\.(\d+))?(?:-([\w.-]+))?(?:\+([\w.-]+))?$',
|
|
version)
|
|
if not match:
|
|
raise ValueError(f'Invalid version string: {version}')
|
|
|
|
major, minor, patch, pre_release, build = match.groups()
|
|
major = int(major) if major else major
|
|
minor = int(minor) if minor else minor
|
|
patch = int(patch) if patch else patch
|
|
return Version(major, minor, patch, pre_release, build)
|
|
|
|
@staticmethod
|
|
def unparse_version(version, sep='-'):
|
|
version_str = f'{version.major}.{version.minor or 0}' \
|
|
f'.{version.patch or 0}'
|
|
if version.pre_release:
|
|
version_str = f'{version_str}{sep}{version.pre_release}'
|
|
if version.build:
|
|
version_str = f'{version_str}+{version.build}'
|
|
return version_str
|
|
|
|
@staticmethod
|
|
def coerce(version):
|
|
return Version(version.major or 0,
|
|
version.minor or 0,
|
|
version.patch or 0,
|
|
version.pre_release,
|
|
version.build)
|
|
|
|
@staticmethod
|
|
def next_major(version):
|
|
major, minor, patch, pre_release, _ = version
|
|
if pre_release and not minor and not patch:
|
|
return Version(major, minor or 0, patch or 0, None, None)
|
|
return Version((major or 0) + 1, 0, 0, None, None)
|
|
|
|
@staticmethod
|
|
def next_minor(version):
|
|
major, minor, patch, pre_release, _ = version
|
|
if pre_release and not patch:
|
|
return Version(major, minor or 0, patch or 0, None, None)
|
|
return Version(major, (minor or 0) + 1, 0, None, None)
|
|
|
|
@staticmethod
|
|
def next_patch(version):
|
|
major, minor, patch, pre_release, _ = version
|
|
if pre_release:
|
|
return Version(major, minor or 0, patch or 0, None, None)
|
|
return Version(major, minor or 0, (patch or 0) + 1, None, None)
|
|
|
|
@staticmethod
|
|
def normalize(requirement):
|
|
normalized = []
|
|
kind, version = requirement
|
|
if kind == CargoSemVer.KIND_NEQ:
|
|
raise ValueError(f'Kind not supported: {requirement}')
|
|
|
|
if kind == CargoSemVer.KIND_EQUAL:
|
|
kind = CargoSemVer.KIND_SHORTEQ
|
|
|
|
coerced_version = CargoSemVer.coerce(version)
|
|
if version.pre_release:
|
|
version = CargoSemVer.next_patch(version)
|
|
|
|
if kind == CargoSemVer.KIND_ANY:
|
|
normalized.append((CargoSemVer.KIND_GTE,
|
|
CargoSemVer.coerce(version)))
|
|
if version.major:
|
|
if version.minor is not None:
|
|
upper_version = CargoSemVer.next_minor(version)
|
|
else:
|
|
upper_version = CargoSemVer.next_major(version)
|
|
normalized.append((CargoSemVer.KIND_LT, upper_version))
|
|
elif kind in (CargoSemVer.KIND_SHORTEQ,
|
|
CargoSemVer.KIND_GT, CargoSemVer.KIND_GTE,
|
|
CargoSemVer.KIND_LT, CargoSemVer.KIND_LTE):
|
|
normalized.append((kind, coerced_version))
|
|
elif kind in (CargoSemVer.KIND_CARET,
|
|
CargoSemVer.KIND_COMPATIBLE,
|
|
CargoSemVer.KIND_EMPTY):
|
|
if version.major == 0:
|
|
if version.minor is not None:
|
|
if version.minor != 0 or version.patch is None:
|
|
upper_version = CargoSemVer.next_minor(version)
|
|
else:
|
|
upper_version = CargoSemVer.next_patch(version)
|
|
else:
|
|
upper_version = CargoSemVer.next_major(version)
|
|
else:
|
|
upper_version = CargoSemVer.next_major(version)
|
|
normalized.append((CargoSemVer.KIND_GTE, coerced_version))
|
|
normalized.append((CargoSemVer.KIND_LT, upper_version))
|
|
elif kind == CargoSemVer.KIND_TILDE:
|
|
if version.minor is None:
|
|
upper_version = CargoSemVer.next_major(version)
|
|
else:
|
|
upper_version = CargoSemVer.next_minor(version)
|
|
normalized.append((CargoSemVer.KIND_GTE, coerced_version))
|
|
normalized.append((CargoSemVer.KIND_LT, upper_version))
|
|
else:
|
|
raise ValueError(f'Found unhandled kind: {requirement}')
|
|
return normalized
|
|
|
|
|
|
class Target:
|
|
def __init__(self, name, kind):
|
|
self.name = name
|
|
self.kind = kind
|
|
|
|
def __repr__(self):
|
|
return f"<Target {self.name} ({self.kind})>"
|
|
|
|
|
|
class Dependency:
|
|
def __init__(self, name, req=None, features=(), optional=False):
|
|
self.name = name
|
|
self.req = req
|
|
self.features = features
|
|
self.optional = optional
|
|
|
|
@classmethod
|
|
def from_json(cls, metadata):
|
|
features = set(metadata["features"])
|
|
if metadata["uses_default_features"]:
|
|
features.add("default")
|
|
kwargs = {"name": metadata["name"],
|
|
"req": metadata["req"],
|
|
"optional": metadata["optional"],
|
|
"features": features}
|
|
return cls(**kwargs)
|
|
|
|
@staticmethod
|
|
def _apply_reqs(name, reqs, feature=None):
|
|
fstr = f"/{feature}" if feature is not None else ""
|
|
cap = f"crate({name}{fstr})"
|
|
if not reqs:
|
|
return cap
|
|
deps = ' with '.join(
|
|
f'{cap} {op} {CargoSemVer.unparse_version(version, sep="~")}'
|
|
for op, version in reqs)
|
|
if len(reqs) > 1:
|
|
return f"({deps})"
|
|
else:
|
|
return deps
|
|
|
|
def normalize(self):
|
|
semver = CargoSemVer(self.req)
|
|
return [self._apply_reqs(self.name, semver.normalized, feature)
|
|
for feature in self.features or (None,)]
|
|
|
|
def __repr__(self):
|
|
return f"<Dependency: {self.name} {self.req} ({', '.join(sorted(self.features))})>"
|
|
|
|
def __str__(self):
|
|
return "\n".join(self.normalize())
|
|
|
|
|
|
class Metadata:
|
|
def __init__(self, name, version):
|
|
self.name = name
|
|
coerced_version = CargoSemVer.coerce(CargoSemVer.parse_version(version))
|
|
self._version = CargoSemVer.unparse_version(coerced_version)
|
|
self.version = CargoSemVer.unparse_version(coerced_version, sep="~")
|
|
self.license = None
|
|
self.license_file = None
|
|
self.readme = None
|
|
self._description = None
|
|
self._summary = None
|
|
self.targets = set()
|
|
self.dependencies = {}
|
|
self.dev_dependencies = set()
|
|
|
|
@property
|
|
def description(self):
|
|
return self._description
|
|
|
|
@property
|
|
def summary(self):
|
|
return self._summary
|
|
|
|
@description.setter
|
|
def description(self, description):
|
|
# https://salsa.debian.org/rust-team/debcargo/blob/master/src/crates.rs
|
|
# get_summary_description()
|
|
if description is None:
|
|
self._description = self._summary = None
|
|
return
|
|
description = description.replace('\n\n', '\r').replace('\n', ' ').replace('\r', '\n').strip()
|
|
description = re.sub(rf'^(?:{self.name}|This(?:\s+\w+)?)(?:\s*,|\s+is|\s+provides)\s+', '', description, flags=re.I)
|
|
description = re.sub(r'^(?:a|an|the)\s+', '', description, flags=re.I)
|
|
description = f'{description[0].upper()}{description[1:]}'
|
|
if description[-1] != '.':
|
|
description = f'{description}.'
|
|
|
|
p1 = description.find('.')
|
|
p2 = description.find('.\n')
|
|
if p2 != -1:
|
|
p1 = max(p1, p2)
|
|
else:
|
|
p1 = len(description) - 1
|
|
p2 = description.find('. ')
|
|
if p2 != -1:
|
|
p1 = min(p1, p2)
|
|
p2 = description.find('\n')
|
|
if p2 != -1:
|
|
p = min(p1, p2)
|
|
else:
|
|
p = p1
|
|
|
|
self._description = description
|
|
self._summary = description[:p]
|
|
|
|
@classmethod
|
|
def from_json(cls, metadata):
|
|
md = metadata
|
|
self = cls(md["name"], md["version"])
|
|
|
|
self.license = md["license"]
|
|
self.license_file = md["license_file"]
|
|
self.readme = md["readme"]
|
|
self.description = md.get("description")
|
|
|
|
# dependencies + build-dependencies → runtime
|
|
deps_by_name = collections.defaultdict(list)
|
|
for dep in md["dependencies"]:
|
|
if dep["kind"] == "dev":
|
|
continue
|
|
deps_by_name[dep["name"]].append(Dependency.from_json(dep))
|
|
|
|
deps_by_feature = {}
|
|
for feature, f_deps in md["features"].items():
|
|
features = {None}
|
|
deps = set()
|
|
for dep in f_deps:
|
|
if dep in md["features"]:
|
|
features.add(dep)
|
|
else:
|
|
pkg, _, f = dep.partition("/")
|
|
for dep in deps_by_name[pkg]:
|
|
dep = copy.deepcopy(dep)
|
|
if f:
|
|
dep.features = {f}
|
|
deps.add(dep)
|
|
deps_by_feature[feature] = (features, deps)
|
|
|
|
mandatory_deps = set()
|
|
for name, deps in deps_by_name.items():
|
|
fdeps = set()
|
|
for dep in deps:
|
|
if dep.optional:
|
|
fdeps.add(copy.deepcopy(dep))
|
|
else:
|
|
mandatory_deps.add(copy.deepcopy(dep))
|
|
if fdeps:
|
|
deps_by_feature[name] = ({None}, fdeps)
|
|
deps_by_feature[None] = (set(), mandatory_deps)
|
|
|
|
if "default" not in deps_by_feature:
|
|
deps_by_feature["default"] = ({None}, set())
|
|
|
|
self.dependencies = deps_by_feature
|
|
self.dev_dependencies = {Dependency.from_json(dep)
|
|
for dep in md["dependencies"]
|
|
if dep["kind"] == "dev"}
|
|
|
|
self.targets = {Target(tgt["name"], tgt["kind"][0])
|
|
for tgt in md["targets"]}
|
|
|
|
return self
|
|
|
|
@classmethod
|
|
def from_file(cls, path):
|
|
metadata = subprocess.check_output(["cargo", "read-manifest",
|
|
f"--manifest-path={path}"])
|
|
return cls.from_json(json.loads(metadata))
|
|
|
|
@property
|
|
def all_dependencies(self):
|
|
return set().union(*(x[1] for x in self.dependencies.values()))
|
|
|
|
def provides(self, feature=None):
|
|
if feature not in self.dependencies:
|
|
raise KeyError(f"Feature {feature!r} doesn't exist")
|
|
return Dependency(self.name, f"={self._version}", features={feature})
|
|
|
|
@classmethod
|
|
def _resolve(cls, deps_by_feature, feature):
|
|
all_features = set()
|
|
all_deps = set()
|
|
ff, dd = copy.deepcopy(deps_by_feature[feature])
|
|
all_features |= ff
|
|
all_deps |= dd
|
|
for f in ff:
|
|
ff1, dd1 = cls._resolve(deps_by_feature, f)
|
|
all_features |= ff1
|
|
all_deps |= dd1
|
|
return all_features, all_deps
|
|
|
|
def requires(self, feature=None, resolve=False):
|
|
if resolve:
|
|
return self._resolve(self.dependencies, feature)[1]
|
|
else:
|
|
features, deps = self.dependencies[feature]
|
|
fdeps = set(Dependency(self.name, f"={self._version}", features={feature})
|
|
for feature in features)
|
|
return fdeps | deps
|
|
|
|
|
|
def normalize_deps(deps):
|
|
return set().union(*(d.normalize() for d in deps))
|