| import base64 |
| from urllib.parse import urlparse |
| import logging |
| import requests |
| import os |
| import zipfile, io |
| |
| |
| def fetch_file_from_git(git_url, path): |
| # get the basics |
| user_name, repo = split_git_url(git_url) |
| |
| # authenticate for rate limiting |
| auth_string = os.environ['GH_USERNAME'] + ':' + os.environ['GH_TOKEN'] |
| encoded = base64.b64encode(auth_string.encode('ascii')) |
| headers = { |
| "authorization" : 'Basic ' + encoded.decode('ascii'), |
| "Accept" : "application/vnd.github+json", |
| } |
| encoded = base64.b64encode(auth_string.encode('ascii')) |
| |
| api_url = 'https://api.github.com/repos/%s/%s/contents/%s' % (user_name, repo, path) |
| |
| logging.debug(api_url) |
| r = requests.get(api_url, headers=headers) |
| requests_remaining = int(r.headers['X-RateLimit-Remaining']) |
| if requests_remaining == 0: |
| logging.error("no API requests remaining") |
| exit(1) |
| |
| logging.debug("API requests remaining %d" % requests_remaining) |
| |
| data = r.json() |
| if 'content' not in data: |
| return None |
| |
| file_content = data['content'] |
| |
| file_content_encoding = data.get('encoding') |
| if file_content_encoding == 'base64': |
| file_content = base64.b64decode(file_content) |
| |
| return file_content |
| |
| |
| # the latest artifact isn't necessarily the one related to the latest commit, as github |
| # could have taken longer to process an older commit than a newer one. |
| # so iterate through commits and return the artifact that matches |
| def get_most_recent_action_url(commits, artifacts): |
| release_sha_to_download_url = {artifact['workflow_run']['head_sha']: artifact['archive_download_url'] for artifact in artifacts} |
| for commit in commits: |
| if commit['sha'] in release_sha_to_download_url: |
| return release_sha_to_download_url[commit['sha']] |
| |
| |
| def split_git_url(url): |
| res = urlparse(url) |
| try: |
| _, user_name, repo = res.path.split('/') |
| except ValueError: |
| logging.error(f"couldn't split repo from {url}") |
| exit(1) |
| repo = repo.replace('.git', '') |
| return user_name, repo |
| |
| |
| # download the artifact for each project to get the gds & lef |
| def install_artifacts(url, directory): |
| logging.debug(url) |
| user_name, repo = split_git_url(url) |
| |
| # authenticate for rate limiting |
| auth_string = os.environ['GH_USERNAME'] + ':' + os.environ['GH_TOKEN'] |
| encoded = base64.b64encode(auth_string.encode('ascii')) |
| headers = { |
| "authorization" : 'Basic ' + encoded.decode('ascii'), |
| "Accept" : "application/vnd.github+json", |
| } |
| |
| # first fetch the git commit history |
| api_url = f'https://api.github.com/repos/{user_name}/{repo}/commits' |
| r = requests.get(api_url, headers=headers) |
| requests_remaining = int(r.headers['X-RateLimit-Remaining']) |
| if requests_remaining == 0: |
| logging.error("no API requests remaining") |
| exit(1) |
| |
| commits = r.json() |
| |
| # now get the artifacts |
| api_url = f'https://api.github.com/repos/{user_name}/{repo}/actions/artifacts' |
| r = requests.get(api_url, headers=headers) |
| data = r.json() |
| |
| # check there are some artifacts |
| if 'artifacts' not in data: |
| logging.error(f"no artifact found for {url}") |
| exit(1) |
| else: |
| # only get artifacts called GDS |
| artifacts = [a for a in data['artifacts'] if a['name'] == 'GDS'] |
| logging.debug(f"found {len(artifacts)} artifacts") |
| |
| if len(artifacts) == 0: |
| logging.error("no artifacts for this project") |
| exit(1) |
| |
| download_url = get_most_recent_action_url(commits, artifacts) |
| |
| # need actions access on the token to get the artifact |
| # won't work on a pull request because they won't have the token |
| attempts = 1 |
| max_attempts = 3 |
| while attempts < max_attempts: |
| try: |
| logging.debug(f"download url {download_url} attempt {attempts}") |
| r = requests.get(download_url, headers=headers) |
| z = zipfile.ZipFile(io.BytesIO(r.content)) |
| z.extractall(directory) |
| break |
| except zipfile.BadZipFile: |
| attempts += 1 |
| logging.warning(f"problem with zipfile, retry {attempts}") |
| |
| if attempts == max_attempts: |
| logging.error("gave up downloading zipfile") |
| exit(1) |