| #!/usr/bin/env python3 |
| from urllib.parse import urlparse |
| import argparse, requests, base64, zipfile, io, logging, pickle, shutil, sys, os |
| |
| # pipe handling |
| from signal import signal, SIGPIPE, SIG_DFL |
| signal(SIGPIPE, SIG_DFL) |
| |
| filler_project_url = 'https://github.com/mattvenn/wokwi_filler' |
| tmp_dir = '/tmp/tt' |
| DEFAULT_NUM_PROJECTS = 498 |
| |
| |
| class Projects(): |
| |
| projects_db = "projects.pkl" |
| |
| def __init__(self, update_cache=False): |
| self.default_project = 0 |
| if update_cache: |
| self.update_cache() |
| else: |
| # load projects from cache |
| try: |
| self.wokwi_ids = pickle.load(open(Projects.projects_db, 'rb')) |
| logging.info("loaded {} projects".format(len(self.wokwi_ids))) |
| except FileNotFoundError: |
| logging.error("project cache {} not found, use --update-cache to build it".format(Projects.projects_db)) |
| |
| def update_cache(self): |
| from project_urls import project_urls |
| self.wokwi_ids = [] |
| for url in [filler_project_url] + project_urls: |
| self.wokwi_ids.append(self.install_artifacts(url)) |
| |
| # cache it |
| with open(Projects.projects_db, 'wb') as fh: |
| pickle.dump(self.wokwi_ids, fh) |
| |
| # filling the space with default projects is handled by returning the default on exception |
| def get_macro_instance(self, id): |
| try: |
| return "scan_wrapper_{}_{}".format(self.wokwi_ids[id], id) |
| except IndexError: |
| return "scan_wrapper_{}_{}".format(self.wokwi_ids[self.default_project], id) |
| |
| def get_wokwi_id(self, id): |
| try: |
| return self.wokwi_ids[id] |
| except IndexError: |
| return self.wokwi_ids[self.default_project] |
| |
| def get_macro_gds_name(self, id): |
| try: |
| return "scan_wrapper_{}.gds".format(self.wokwi_ids[id]) |
| except IndexError: |
| return "scan_wrapper_{}.gds".format(self.wokwi_ids[self.default_project]) |
| |
| def get_macro_lef_name(self, id): |
| try: |
| return "scan_wrapper_{}.lef".format(self.wokwi_ids[id]) |
| except IndexError: |
| return "scan_wrapper_{}.lef".format(self.wokwi_ids[self.default_project]) |
| |
| def get_macro_name(self, id): |
| try: |
| return "scan_wrapper_{}".format(self.wokwi_ids[id]) |
| except IndexError: |
| return "scan_wrapper_{}".format(self.wokwi_ids[self.default_project]) |
| |
| def get_verilog_include(self, id): |
| try: |
| return '`include "scan_wrapper_{}.v"\n'.format(self.wokwi_ids[id]) |
| except IndexError: |
| return '' |
| |
| def get_verilog_names(self, id): |
| try: |
| return ["scan_wrapper_{}.v".format(self.wokwi_ids[id]), "user_module_{}.v".format(self.wokwi_ids[id])] |
| except IndexError: |
| return [] |
| |
| # the latest artifact isn't necessarily the one related to the latest commit, as github |
| # could have taken longer to process an older commit than a newer one. |
| # so iterate through commits and return the artifact that matches |
| @classmethod |
| def get_most_recent_action_url(project, commits, artifacts): |
| release_sha_to_download_url = {artifact['workflow_run']['head_sha']: artifact['archive_download_url'] for artifact in artifacts} |
| for commit in commits: |
| if commit['sha'] in release_sha_to_download_url: |
| return release_sha_to_download_url[commit['sha']] |
| |
| # download the artifact for each project to get the gds & lef |
| def install_artifacts(self, url): |
| res = urlparse(url) |
| try: |
| _, user_name, repo = res.path.split('/') |
| except ValueError: |
| logging.error("couldn't split repo from {}".format(url)) |
| exit(1) |
| repo = repo.replace('.git', '') |
| |
| # authenticate for rate limiting |
| auth_string = os.environ['GH_USERNAME'] + ':' + os.environ['GH_TOKEN'] |
| encoded = base64.b64encode(auth_string.encode('ascii')) |
| headers = { |
| "authorization" : 'Basic ' + encoded.decode('ascii'), |
| "Accept" : "application/vnd.github+json", |
| } |
| |
| # first fetch the git commit history |
| api_url = 'https://api.github.com/repos/{}/{}/commits'.format(user_name, repo) |
| r = requests.get(api_url, headers=headers) |
| requests_remaining = int(r.headers['X-RateLimit-Remaining']) |
| if requests_remaining == 0: |
| logging.error("no API requests remaining") |
| exit(1) |
| |
| commits = r.json() |
| |
| # now get the artifacts |
| api_url = 'https://api.github.com/repos/{}/{}/actions/artifacts'.format(user_name, repo) |
| r = requests.get(api_url, headers=headers) |
| data = r.json() |
| |
| # check there are some artifacts |
| if 'artifacts' not in data: |
| logging.error("no artifact found for {}".format(self)) |
| exit(1) |
| else: |
| artifacts = data['artifacts'] |
| logging.debug("found {} artifacts".format(len(artifacts))) |
| |
| download_url = Projects.get_most_recent_action_url(commits, artifacts) |
| logging.info(download_url) |
| |
| # need actions access on the token to get the artifact |
| # won't work on a pull request because they won't have the token |
| r = requests.get(download_url, headers=headers) |
| z = zipfile.ZipFile(io.BytesIO(r.content)) |
| z.extractall(tmp_dir) |
| |
| # get the wokwi id |
| with open(os.path.join(tmp_dir, 'src/ID')) as fh: |
| wokwi_id = fh.readline().strip() |
| |
| logging.info("wokwi id {}".format(wokwi_id)) |
| |
| # copy all important files to the correct places. Everything is dependent on the id |
| files = [ |
| ("/tmp/tt/runs/wokwi/results/final/gds/scan_wrapper_{}.gds".format(wokwi_id), "gds/scan_wrapper_{}.gds".format(wokwi_id)), |
| ("/tmp/tt/runs/wokwi/results/final/lef/scan_wrapper_{}.lef".format(wokwi_id), "lef/scan_wrapper_{}.lef".format(wokwi_id)), |
| ("/tmp/tt/runs/wokwi/results/final/verilog/gl/scan_wrapper_{}.v".format(wokwi_id), "verilog/gl/scan_wrapper_{}.v".format(wokwi_id)), |
| ("/tmp/tt/src/scan_wrapper_{}.v".format(wokwi_id), "verilog/rtl/scan_wrapper_{}.v".format(wokwi_id)), |
| ("/tmp/tt/src/user_module_{}.v".format(wokwi_id), "verilog/rtl/user_module_{}.v".format(wokwi_id)), |
| ] |
| |
| logging.info("copying files into position") |
| for file in files: |
| logging.debug("copy {} to {}".format(file[0], file[1])) |
| shutil.copyfile(file[0], file[1]) |
| |
| # unlink temp directory |
| shutil.rmtree(tmp_dir) |
| return wokwi_id |
| |
| |
| class CaravelConfig(): |
| |
| def __init__(self, projects, num_projects): |
| self.projects = projects |
| self.num_projects = num_projects |
| |
| @classmethod |
| def unique(cls, duplist): |
| unique_list = [] |
| # traverse for all elements |
| for x in duplist: |
| # check if exists in unique_list or not |
| if x not in unique_list: |
| unique_list.append(x) |
| return unique_list |
| |
| # create macro file & positions, power hooks |
| def create_macro_config(self): |
| start_x = 80 |
| start_y = 80 |
| step_x = 140 |
| step_y = 135 |
| rows = 25 |
| cols = 20 |
| |
| num_macros_placed = 0 |
| |
| # macro.cfg: where macros are placed |
| logging.info("creating macro.cfg") |
| with open("openlane/user_project_wrapper/macro.cfg", 'w') as fh: |
| fh.write("scan_controller 80 80 N\n") |
| for row in range(rows): |
| for col in range(cols): |
| # skip the space where the scan controller goes on the first row |
| if row == 0 and col <= 1: |
| continue |
| |
| macro_instance = self.projects.get_macro_instance(num_macros_placed) |
| instance = "{} {:<4} {:<4} N\n".format(macro_instance, start_x + col * step_x, start_y + row * step_y) |
| fh.write(instance) |
| |
| num_macros_placed += 1 |
| |
| # macro_power.tcl: extra file for macro power hooks |
| logging.info("creating macro_power.tcl") |
| with open("openlane/user_project_wrapper/macro_power.tcl", 'w') as fh: |
| fh.write('set ::env(FP_PDN_MACRO_HOOKS) "\\\n') |
| fh.write(" ") |
| fh.write("scan_controller") |
| fh.write(" vccd1 vssd1 vccd1 vssd1") |
| fh.write(", \\\n") |
| for i in range(self.num_projects): |
| fh.write(" ") |
| fh.write(self.projects.get_macro_instance(i)) |
| fh.write(" vccd1 vssd1 vccd1 vssd1") |
| if i != self.num_projects - 1: |
| fh.write(", \\\n") |
| fh.write('"\n') |
| |
| # extra_lef_gds.tcl |
| logging.info("creating extra_lef_gds.tcl") |
| lefs = [] |
| gdss = [] |
| for i in range(self.num_projects): |
| lefs.append(self.projects.get_macro_lef_name(i)) |
| gdss.append(self.projects.get_macro_gds_name(i)) |
| |
| # can't have duplicates or OpenLane crashes at PDN |
| lefs = CaravelConfig.unique(lefs) |
| gdss = CaravelConfig.unique(gdss) |
| |
| with open("openlane/user_project_wrapper/extra_lef_gds.tcl", 'w') as fh: |
| fh.write('set ::env(EXTRA_LEFS) "\\\n') |
| fh.write("$script_dir/../../lef/scan_controller.lef \\\n") |
| for i, lef in enumerate(lefs): |
| fh.write("$script_dir/../../lef/{}".format(lef)) |
| if i != len(lefs) - 1: |
| fh.write(" \\\n") |
| else: |
| fh.write('"\n') |
| fh.write('set ::env(EXTRA_GDS_FILES) "\\\n') |
| fh.write("$script_dir/../../gds/scan_controller.gds \\\n") |
| for i, gds in enumerate(gdss): |
| fh.write("$script_dir/../../gds/{}".format(gds)) |
| if i != len(gdss) - 1: |
| fh.write(" \\\n") |
| else: |
| fh.write('"\n') |
| |
| # instantiate inside user_project_wrapper |
| def instantiate(self): |
| logging.info("instantiating designs in user_project_wrapper.v") |
| assigns = """ |
| localparam NUM_MACROS = {}; |
| wire [NUM_MACROS:0] data, scan, latch, clk; |
| wire [8:0] active_select = io_in[20:12]; |
| wire [7:0] inputs = io_in[28:21]; |
| wire [7:0] outputs; |
| assign io_out[36:29] = outputs; |
| wire ready; |
| assign io_out[37] = ready; |
| """ |
| |
| scan_controller_template = """ |
| scan_controller #(.NUM_DESIGNS(NUM_MACROS)) scan_controller( |
| .clk (wb_clk_i), |
| .reset (wb_rst_i), |
| .active_select (active_select), |
| .inputs (inputs), |
| .outputs (outputs), |
| .ready (ready), |
| .scan_clk (clk[0]), |
| .scan_data_out (data[0]), |
| .scan_data_in (data[NUM_MACROS]), |
| .scan_select (scan[0]), |
| .scan_latch_enable(latch[0]), |
| .oeb (io_oeb[37:29]) |
| ); |
| |
| """ |
| lesson_template = """ |
| {name} #(.NUM_IOS(8)) {instance} ( |
| .clk_in (clk [{id}]), |
| .data_in (data [{id}]), |
| .scan_select_in (scan [{id}]), |
| .latch_enable_in (latch[{id}]), |
| .clk_out (clk [{next_id}]), |
| .data_out (data [{next_id}]), |
| .scan_select_out (scan [{next_id}]), |
| .latch_enable_out(latch[{next_id}]) |
| ); |
| """ |
| with open('upw_pre.v') as fh: |
| pre = fh.read() |
| |
| with open('upw_post.v') as fh: |
| post = fh.read() |
| |
| with open('verilog/rtl/user_project_wrapper.v', 'w') as fh: |
| fh.write(pre) |
| fh.write(assigns.format(self.num_projects)) |
| fh.write(scan_controller_template) |
| for i in range(self.num_projects): |
| logging.debug("instance {} {}".format(i, self.projects.get_macro_name(i))) |
| # instantiate template |
| instance = lesson_template.format(instance=self.projects.get_macro_instance(i), name=self.projects.get_macro_name(i), id=i, next_id=i + 1) |
| fh.write(instance) |
| fh.write(post) |
| |
| # build the user_project_includes.v file - used for blackboxing when building the GDS |
| verilogs = [] |
| for i in range(self.num_projects): |
| verilogs.append(self.projects.get_verilog_include(i)) |
| verilogs = CaravelConfig.unique(verilogs) |
| |
| with open('verilog/rtl/user_project_includes.v', 'w') as fh: |
| fh.write('`include "scan_controller/scan_controller.v"\n') |
| for verilog in verilogs: |
| fh.write(verilog) |
| |
| # build complete list of filenames for sim |
| verilog_files = [] |
| for i in range(self.num_projects): |
| verilog_files += self.projects.get_verilog_names(i) |
| verilog_files = CaravelConfig.unique(verilog_files) |
| with open('verilog/includes/includes.rtl.caravel_user_project', 'w') as fh: |
| fh.write('-v $(USER_PROJECT_VERILOG)/rtl/user_project_wrapper.v\n') |
| fh.write('-v $(USER_PROJECT_VERILOG)/rtl/scan_controller/scan_controller.v\n') |
| fh.write('-v $(USER_PROJECT_VERILOG)/rtl/cells.v\n') |
| for verilog in verilog_files: |
| fh.write('-v $(USER_PROJECT_VERILOG)/rtl/{}\n'.format(verilog)) |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description="TinyTapeout") |
| |
| parser.add_argument('--list', help="list projects", action='store_const', const=True) |
| parser.add_argument('--update-projects', help='fetch the project data', action='store_const', const=True) |
| parser.add_argument('--update-caravel', help='configure caravel for build', action='store_const', const=True) |
| parser.add_argument('--limit-num-projects', help='only configure for the first n projects', type=int, default=DEFAULT_NUM_PROJECTS) |
| parser.add_argument('--debug', help="debug logging", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO) |
| |
| args = parser.parse_args() |
| |
| # setup log |
| log_format = logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s') |
| # configure the client logging |
| log = logging.getLogger('') |
| # has to be set to debug as is the root logger |
| log.setLevel(args.loglevel) |
| # turn off debug logging for requests |
| logging.getLogger("requests").setLevel(logging.INFO) |
| logging.getLogger("urllib3").setLevel(logging.INFO) |
| |
| # create console handler and set level to info |
| ch = logging.StreamHandler(sys.stdout) |
| # create formatter for console |
| ch.setFormatter(log_format) |
| log.addHandler(ch) |
| |
| projects = Projects(update_cache=args.update_projects) |
| caravel = CaravelConfig(projects, num_projects=args.limit_num_projects) |
| |
| if args.update_caravel: |
| caravel.create_macro_config() |
| caravel.instantiate() |