#!/usr/bin/python3 import collections import os from pathlib import Path import sys import tempfile import click import requests from requests.compat import urljoin import solv XDG_CACHE_HOME = Path(os.getenv("XDG_CACHE_HOME", os.path.expanduser("~/.cache"))) CACHEDIR = XDG_CACHE_HOME / "rust2rpm" # Arch that is used for resolving dependencies and such ARCH = "x86_64" # The repo that contains crates REPO_URL = f"https://kojipkgs.fedoraproject.org/repos/f34-build/latest/{ARCH}/" # Just some sane chunk size CHUNK_SIZE = 8192 def _download(fname, chksum=None, uncompress=False): url = urljoin(REPO_URL, fname) with requests.get(urljoin(REPO_URL, fname), stream=True) as r: tmp = tempfile.TemporaryFile() total = int(r.headers["Content-Length"]) with click.progressbar( length=total, label=f"Downloading {url}", file=sys.stderr ) as pb: for chunk in r.iter_content(chunk_size=CHUNK_SIZE): pb.update(tmp.write(chunk)) tmp.seek(0) if chksum is not None: fchksum = solv.Chksum(chksum.type) fchksum.add_fd(tmp.fileno()) if chksum != fchksum: raise Exception(f"Checksums do not match: {fchksum} != {chksum}") return solv.xfopen_fd(os.path.basename(url) if uncompress else None, tmp.fileno()) @click.group() def cli(): pass @cli.command() @click.pass_context @click.argument("source", type=click.Path(exists=True)) @click.option( "-a", "--add", multiple=True, type=click.Path(exists=True), help="Add local RPMs to the pool that is used in dependency resolution.", ) @click.option( "-v", "--verbose", is_flag=True, help="Enable verbose mode.", ) def get_binary_license(ctx, source, add, verbose): """Get the resulting license of a binary. Since all crates (aka `rust-*-devel`) are linked statically, the resulting binary license should be result of combining source license and all used crates. Combined with dynamic BuildRequires generation, the usage would look like: \b rpmbuild -br *.spec -D "_sourcedir $PWD" --without check fedora-helper.py get-binary-license /path/to/buildreqs.nosrc.rpm This command resolves dependencies and outputs final license(s). """ pool = solv.Pool() pool.setarch(ARCH) repo = pool.add_repo("upstream") fd = _download("repodata/repomd.xml") chksum = solv.Chksum(solv.REPOKEY_TYPE_SHA256) chksum.add("1.1") chksum.add_fd(fd.fileno()) cookie = chksum.raw() repo.add_repomdxml(fd) fd.close() # Find "primary" di = repo.Dataiterator_meta( solv.REPOSITORY_REPOMD_TYPE, "primary", solv.Dataiterator.SEARCH_STRING ) di.prepend_keyname(solv.REPOSITORY_REPOMD) for d in di: dp = d.parentpos() filename = dp.lookup_str(solv.REPOSITORY_REPOMD_LOCATION) checksum = dp.lookup_checksum(solv.REPOSITORY_REPOMD_CHECKSUM) # FIXME: Can filename be None? cache_file = CACHEDIR / "upstream.solv" try: with open(cache_file, "rb") as f: clen = len(cookie) f.seek(-clen, os.SEEK_END) fcookie = f.read(clen) if fcookie != cookie: raise IOError("repomd.xml has changed") f.seek(0) fd = solv.xfopen_fd(None, f.fileno()) repo.add_solv(fd) fd.close() cached = True except IOError: # Failed to load from cache cached = False if not cached: fd = _download(filename, chksum=checksum, uncompress=True) repo.add_rpmmd(fd, None) fd.close() os.makedirs(CACHEDIR, exist_ok=True) with tempfile.NamedTemporaryFile(prefix=".newsolv-", dir=CACHEDIR) as tmp: fd = solv.xfopen_fd(None, tmp.fileno()) repo.write(fd) fd.flush() fd.write(cookie) fd.close() if repo.iscontiguous(): repo.empty() repo.add_solv(tmp.name) try: os.unlink(cache_file) except FileNotFoundError: pass os.link(tmp.name, cache_file) repo_local = pool.add_repo("local") repo_local.priority = 99 s = repo_local.add_rpm(source) for f in add: repo_local.add_rpm(f) pool.addfileprovides() pool.createwhatprovides() solver = pool.Solver() solver.set_flag(solv.Solver.SOLVER_FLAG_IGNORE_RECOMMENDED, True) problems = solver.solve( [pool.Job(solv.Job.SOLVER_SOLVABLE | solv.Job.SOLVER_INSTALL, s.id)] ) if problems: click.echo("Failed to resolve dependencies:", err=True) for problem in problems: for rule in problem.findallproblemrules(): for info in rule.allinfos(): click.echo(f" - ", nl=False) click.secho(info.problemstr(), fg="red") ctx.abort() licenses = collections.defaultdict(set) for p in solver.transaction().newsolvables(): if ( not (p.name.startswith("rust-") and p.name.endswith("-devel")) and not p == s ): continue licenses[p.lookup_str(solv.SOLVABLE_LICENSE)].add(p) # XXX: This is pure hack and we should optimize license set in much better way if "MIT or ASL 2.0" in licenses and "ASL 2.0 or MIT" in licenses: licenses["MIT or ASL 2.0"].update(licenses.pop("ASL 2.0 or MIT")) for license, packages in sorted(licenses.items()): click.echo(f"# {license}") if verbose: for package in packages: click.secho(f"# * {package}", fg="white") if __name__ == "__main__": cli()