blob: c5f1484ef44242d987a7a35d801ded0f462baf15 [file] [log] [blame]
#!/usr/bin/env python3
from urllib.parse import urlparse
import argparse, requests, base64, zipfile, io, logging, pickle, shutil, sys, os, collections, subprocess, glob, csv
# pipe handling
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE, SIG_DFL)
tmp_dir = '/tmp/tt'
DEFAULT_NUM_PROJECTS = 498
class Projects():
def __init__(self, update_cache=False, update_single=None, test=False):
self.default_project = 0
self.test = test
if update_cache:
self.update_cache(update_single)
else:
# load projects from cache
try:
self.wokwi_ids = pickle.load(open(self.get_projects_db(), 'rb'))
logging.info("loaded {} projects".format(len(self.wokwi_ids)))
except FileNotFoundError:
logging.error("project cache {} not found, use --update-cache to build it".format(self.get_projects_db()))
# all ids must be unique
assert len(set(self.wokwi_ids)) == len(self.wokwi_ids)
def update_cache(self, update_single=None):
self.wokwi_ids = []
for number, url in enumerate(self.get_project_urls()):
if update_single is not None:
if url != update_single:
continue
wokwi_id = self.install_local_artifacts(number, url)
if wokwi_id in self.wokwi_ids:
logging.error("wokwi id already exists!")
exit(1)
self.wokwi_ids.append(wokwi_id)
# cache it
with open(self.get_projects_db(), 'wb') as fh:
pickle.dump(self.wokwi_ids, fh)
def list(self):
for count, (wokwi_id, project_url) in enumerate(zip(self.get_wokwi_ids(), self.get_project_urls())):
logging.info("{:3} {:20} {}".format(count, wokwi_id, project_url))
def get_dir_name(self, url, number):
user_name, repo = Projects.split_git_url(url)
top_dir = 'repos3'
dir_name = os.path.join(top_dir, "{:03d}_{}_{}".format(number, user_name, repo))
logging.debug(dir_name)
return dir_name
def clone_projects(self):
for number, url in enumerate(self.get_project_urls()):
dir_name = self.get_dir_name(url, number)
logging.info("cloning {}".format(url))
subprocess.run(['git', 'clone', url, dir_name], check=True)
def apply_git_patch(self):
cwd = os.getcwd()
for number, url in enumerate(self.get_project_urls()):
logging.info(url)
if url not in ['https://github.com/mattvenn/tinytapeout_m_segments',
'https://github.com/gatecat/tinytapeout-fpga-test',
'https://github.com/smunaut/tinytapeout-misc-1',
'https://github.com/mattvenn/tinytapeout-laura',
'https://github.com/smunaut/tinytapeout-smolram']:
os.chdir(self.get_dir_name(url, number))
subprocess.run(['../../update_repo.sh'], check=True)
os.chdir(cwd)
def harden(self):
cwd = os.getcwd()
for number, url in enumerate(self.get_project_urls()):
# if number >= 0 and number < 50:
# if number >= 50 and number < 100:
if number >= 100:
os.chdir(self.get_dir_name(url, number))
subprocess.run(['make', 'fetch'])
subprocess.run(['make', 'harden'])
os.chdir(cwd)
def formal_scan(self):
cwd = os.getcwd()
formal_dir = 'tinytapeout_scan'
for number, url in enumerate(self.get_project_urls()):
gl_dir = os.path.join(self.get_dir_name(url, number), 'runs/wokwi/results/final/verilog/gl')
gl_filename = glob.glob(gl_dir + '/*')[0]
gl_name = os.path.basename(gl_filename)
shutil.copyfile(gl_filename, os.path.join(formal_dir, gl_name))
os.chdir(formal_dir)
commands = ['sby', '-f', 'tinytapeout_scan.sby', gl_name.rstrip('.v')]
logging.info(commands)
subprocess.run(commands, check=True)
os.chdir(cwd)
def summary_report(self):
stat_names = ['wire_length', 'vias', 'cells_pre_abc']
total_stats = {x : 0 for x in stat_names}
min_stats = {x : 10000 for x in stat_names}
max_stats = {x : 0 for x in stat_names}
for number, url in enumerate(self.get_project_urls()):
metrics_dir = os.path.join(self.get_dir_name(url, number), 'runs/wokwi/reports/')
metrics_file = glob.glob(metrics_dir + '/*.csv')[0]
with open(metrics_file) as fh:
summary = csv.DictReader(fh)
for row in summary:
for stat in stat_names:
total_stats[stat] += int(row[stat])
if int(row[stat]) > max_stats[stat]:
max_stats[stat] = int(row[stat])
if int(row[stat]) < min_stats[stat]:
min_stats[stat] = int(row[stat])
for stat in stat_names:
logging.info("total {} = {}".format(stat, total_stats[stat]))
logging.info("min {} = {}".format(stat, min_stats[stat]))
logging.info("max {} = {}".format(stat, max_stats[stat]))
def get_projects_db(self):
if self.test:
return "projects_test.pkl"
else:
return "projects.pkl"
# filling the space with default projects is handled by returning the default on exception
def get_macro_instance(self, id):
try:
return "scan_wrapper_{}_{}".format(self.wokwi_ids[id], id)
except IndexError:
return "scan_wrapper_{}_{}".format(self.wokwi_ids[self.default_project], id)
def get_wokwi_id(self, id):
try:
return self.wokwi_ids[id]
except IndexError:
return self.wokwi_ids[self.default_project]
def get_macro_gds_name(self, id):
try:
return "scan_wrapper_{}.gds".format(self.wokwi_ids[id])
except IndexError:
return "scan_wrapper_{}.gds".format(self.wokwi_ids[self.default_project])
def get_macro_lef_name(self, id):
try:
return "scan_wrapper_{}.lef".format(self.wokwi_ids[id])
except IndexError:
return "scan_wrapper_{}.lef".format(self.wokwi_ids[self.default_project])
def get_macro_name(self, id):
try:
return "scan_wrapper_{}".format(self.wokwi_ids[id])
except IndexError:
return "scan_wrapper_{}".format(self.wokwi_ids[self.default_project])
def get_verilog_include(self, id):
try:
return '`include "scan_wrapper_{}.v"\n'.format(self.wokwi_ids[id])
except IndexError:
return ''
def get_gl_verilog_names(self, id):
try:
return ['scan_wrapper_{}.v'.format(self.wokwi_ids[id])]
except IndexError:
return []
def get_verilog_names(self, id):
try:
return ["scan_wrapper_{}.v".format(self.wokwi_ids[id]), "user_module_{}.v".format(self.wokwi_ids[id])]
except IndexError:
return []
def get_wokwi_ids(self):
return self.wokwi_ids
def get_giturl(self, id):
try:
return self.get_project_urls()[id]
except IndexError:
return self.get_project_urls()[self.default_project]
@classmethod
def build_wokwi_url(Project, wokwi_id):
return "https://wokwi.com/projects/{}".format(wokwi_id)
def get_project_urls(self):
if self.test:
from project_urls_test import project_urls, filler_project_url
else:
from project_urls import project_urls, filler_project_url
return [filler_project_url] + project_urls
def check_dupes(self):
project_urls = self.get_project_urls()
duplicates = [item for item, count in collections.Counter(project_urls).items() if count > 1]
if duplicates:
logging.error("duplicate projects: {}".format(duplicates))
exit(1)
# the latest artifact isn't necessarily the one related to the latest commit, as github
# could have taken longer to process an older commit than a newer one.
# so iterate through commits and return the artifact that matches
@classmethod
def get_most_recent_action_url(project, commits, artifacts):
release_sha_to_download_url = {artifact['workflow_run']['head_sha']: artifact['archive_download_url'] for artifact in artifacts}
for commit in commits:
if commit['sha'] in release_sha_to_download_url:
return release_sha_to_download_url[commit['sha']]
@classmethod
def split_git_url(Project, url):
res = urlparse(url)
try:
_, user_name, repo = res.path.split('/')
except ValueError:
logging.error("couldn't split repo from {}".format(url))
exit(1)
repo = repo.replace('.git', '')
return user_name, repo
# download the artifact for each project to get the gds & lef
def install_artifacts(self, url):
user_name, repo = Projects.split_git_url(url)
# authenticate for rate limiting
auth_string = os.environ['GH_USERNAME'] + ':' + os.environ['GH_TOKEN']
encoded = base64.b64encode(auth_string.encode('ascii'))
headers = {
"authorization" : 'Basic ' + encoded.decode('ascii'),
"Accept" : "application/vnd.github+json",
}
# first fetch the git commit history
api_url = 'https://api.github.com/repos/{}/{}/commits'.format(user_name, repo)
r = requests.get(api_url, headers=headers)
requests_remaining = int(r.headers['X-RateLimit-Remaining'])
if requests_remaining == 0:
logging.error("no API requests remaining")
exit(1)
commits = r.json()
# now get the artifacts
api_url = 'https://api.github.com/repos/{}/{}/actions/artifacts'.format(user_name, repo)
r = requests.get(api_url, headers=headers)
data = r.json()
# check there are some artifacts
if 'artifacts' not in data:
logging.error("no artifact found for {}".format(self))
exit(1)
else:
# only get artifacts called GDS
artifacts = [a for a in data['artifacts'] if a['name'] == 'GDS']
logging.debug("found {} artifacts".format(len(artifacts)))
download_url = Projects.get_most_recent_action_url(commits, artifacts)
logging.debug(download_url)
# need actions access on the token to get the artifact
# won't work on a pull request because they won't have the token
r = requests.get(download_url, headers=headers)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(tmp_dir)
# get the wokwi id
with open(os.path.join(tmp_dir, 'src/ID')) as fh:
wokwi_id = fh.readline().strip()
logging.info("wokwi id {} github url {}".format(wokwi_id, url))
# copy all important files to the correct places. Everything is dependent on the id
files = [
("/tmp/tt/runs/wokwi/results/final/gds/scan_wrapper_{}.gds".format(wokwi_id), "gds/scan_wrapper_{}.gds".format(wokwi_id)),
("/tmp/tt/runs/wokwi/results/final/lef/scan_wrapper_{}.lef".format(wokwi_id), "lef/scan_wrapper_{}.lef".format(wokwi_id)),
("/tmp/tt/runs/wokwi/results/final/verilog/gl/scan_wrapper_{}.v".format(wokwi_id), "verilog/gl/scan_wrapper_{}.v".format(wokwi_id)),
("/tmp/tt/src/scan_wrapper_{}.v".format(wokwi_id), "verilog/rtl/scan_wrapper_{}.v".format(wokwi_id)),
("/tmp/tt/src/user_module_{}.v".format(wokwi_id), "verilog/rtl/user_module_{}.v".format(wokwi_id)),
]
logging.debug("copying files into position")
for file in files:
logging.debug("copy {} to {}".format(file[0], file[1]))
shutil.copyfile(file[0], file[1])
# unlink temp directory
shutil.rmtree(tmp_dir)
return wokwi_id
def install_local_artifacts(self, number, url):
dir_name = self.get_dir_name(url, number)
logging.info(dir_name)
with open(os.path.join(dir_name, 'src/ID')) as fh:
wokwi_id = fh.readline().strip()
logging.info("wokwi id {} github url {}".format(wokwi_id, url))
# copy all important files to the correct places. Everything is dependent on the id
files = [
("runs/wokwi/results/final/gds/scan_wrapper_{}.gds".format(wokwi_id), "gds/scan_wrapper_{}.gds".format(wokwi_id)),
("runs/wokwi/results/final/lef/scan_wrapper_{}.lef".format(wokwi_id), "lef/scan_wrapper_{}.lef".format(wokwi_id)),
("runs/wokwi/results/final/verilog/gl/scan_wrapper_{}.v".format(wokwi_id), "verilog/gl/scan_wrapper_{}.v".format(wokwi_id)),
("src/scan_wrapper_{}.v".format(wokwi_id), "verilog/rtl/scan_wrapper_{}.v".format(wokwi_id)),
("src/user_module_{}.v".format(wokwi_id), "verilog/rtl/user_module_{}.v".format(wokwi_id)),
]
logging.debug("copying files into position")
for file in files:
from_path = os.path.join(dir_name, file[0])
logging.debug("copy {} to {}".format(from_path, file[1]))
shutil.copyfile(from_path, file[1])
return wokwi_id
class CaravelConfig():
def __init__(self, projects, num_projects):
self.projects = projects
self.num_projects = num_projects
@classmethod
def unique(cls, duplist):
unique_list = []
# traverse for all elements
for x in duplist:
# check if exists in unique_list or not
if x not in unique_list:
unique_list.append(x)
return unique_list
# create macro file & positions, power hooks
def create_macro_config(self):
start_x = 80
start_y = 80
step_x = 140
step_y = 135
rows = 25
cols = 20
num_macros_placed = 0
# macro.cfg: where macros are placed
logging.info("creating macro.cfg")
with open("openlane/user_project_wrapper/macro.cfg", 'w') as fh:
fh.write("scan_controller 80 80 N\n")
for row in range(rows):
if(row % 2 == 0):
col_order = range(cols)
orientation = 'N'
else:
# reverse odd rows to place instances in a zig zag pattern, shortening the scan chain wires
col_order = range(cols - 1, -1, -1)
orientation = 'S'
for col in col_order:
# skip the space where the scan controller goes on the first row
if row == 0 and col <= 1:
continue
if num_macros_placed < self.num_projects:
macro_instance = self.projects.get_macro_instance(num_macros_placed)
instance = "{} {:<4} {:<4} {}\n".format(
macro_instance, start_x + col * step_x, start_y + row * step_y, orientation
)
fh.write(instance)
num_macros_placed += 1
# macro_power.tcl: extra file for macro power hooks
logging.info("creating macro_power.tcl")
with open("openlane/user_project_wrapper/macro_power.tcl", 'w') as fh:
fh.write('set ::env(FP_PDN_MACRO_HOOKS) "\\\n')
fh.write(" ")
fh.write("scan_controller")
fh.write(" vccd1 vssd1 vccd1 vssd1")
fh.write(", \\\n")
for i in range(self.num_projects):
fh.write(" ")
fh.write(self.projects.get_macro_instance(i))
fh.write(" vccd1 vssd1 vccd1 vssd1")
if i != self.num_projects - 1:
fh.write(", \\\n")
fh.write('"\n')
# extra_lef_gds.tcl
logging.info("creating extra_lef_gds.tcl")
lefs = []
gdss = []
for i in range(self.num_projects):
lefs.append(self.projects.get_macro_lef_name(i))
gdss.append(self.projects.get_macro_gds_name(i))
# can't have duplicates or OpenLane crashes at PDN
lefs = CaravelConfig.unique(lefs)
gdss = CaravelConfig.unique(gdss)
with open("openlane/user_project_wrapper/extra_lef_gds.tcl", 'w') as fh:
fh.write('set ::env(EXTRA_LEFS) "\\\n')
fh.write("$script_dir/../../lef/scan_controller.lef \\\n")
for i, lef in enumerate(lefs):
fh.write("$script_dir/../../lef/{}".format(lef))
if i != len(lefs) - 1:
fh.write(" \\\n")
else:
fh.write('"\n')
fh.write('set ::env(EXTRA_GDS_FILES) "\\\n')
fh.write("$script_dir/../../gds/scan_controller.gds \\\n")
for i, gds in enumerate(gdss):
fh.write("$script_dir/../../gds/{}".format(gds))
if i != len(gdss) - 1:
fh.write(" \\\n")
else:
fh.write('"\n')
# instantiate inside user_project_wrapper
def instantiate(self):
logging.info("instantiating designs in user_project_wrapper.v")
assigns = """
localparam NUM_MACROS = {};
wire [NUM_MACROS:0] data, scan, latch, clk;
"""
scan_controller_template = """
scan_controller #(.NUM_DESIGNS(NUM_MACROS)) scan_controller(
.clk (wb_clk_i),
.reset (wb_rst_i),
.active_select (io_in[20:12]),
.inputs (io_in[28:21]),
.outputs (io_out[36:29]),
.ready (io_out[37]),
.slow_clk (io_out[10]),
.set_clk_div (io_in[11]),
.scan_clk_out (clk[0]),
.scan_clk_in (clk[NUM_MACROS]),
.scan_data_out (data[0]),
.scan_data_in (data[NUM_MACROS]),
.scan_select (scan[0]),
.scan_latch_en (latch[0]),
.la_scan_clk_in (la_data_in[0]),
.la_scan_data_in (la_data_in[1]),
.la_scan_data_out (la_data_out[0]),
.la_scan_select (la_data_in[2]),
.la_scan_latch_en (la_data_in[3]),
.driver_sel (io_in[9:8]),
.oeb (io_oeb)
);
"""
lesson_template = """
// {giturl}
{name} #(.NUM_IOS(8)) {instance} (
.clk_in (clk [{id}]),
.data_in (data [{id}]),
.scan_select_in (scan [{id}]),
.latch_enable_in (latch[{id}]),
.clk_out (clk [{next_id}]),
.data_out (data [{next_id}]),
.scan_select_out (scan [{next_id}]),
.latch_enable_out(latch[{next_id}])
);
"""
with open('upw_pre.v') as fh:
pre = fh.read()
with open('upw_post.v') as fh:
post = fh.read()
with open('verilog/rtl/user_project_wrapper.v', 'w') as fh:
fh.write(pre)
fh.write(assigns.format(self.num_projects))
fh.write(scan_controller_template)
for i in range(self.num_projects):
logging.debug("instance {} {}".format(i, self.projects.get_macro_name(i)))
# instantiate template
instance = lesson_template.format(giturl=self.projects.get_giturl(i), instance=self.projects.get_macro_instance(i), name=self.projects.get_macro_name(i), id=i, next_id=i + 1)
fh.write(instance)
fh.write(post)
# build the user_project_includes.v file - used for blackboxing when building the GDS
verilogs = []
for i in range(self.num_projects):
verilogs.append(self.projects.get_verilog_include(i))
verilogs = CaravelConfig.unique(verilogs)
with open('verilog/rtl/user_project_includes.v', 'w') as fh:
fh.write('`include "scan_controller/scan_controller.v"\n')
for verilog in verilogs:
fh.write(verilog)
# build complete list of filenames for sim
verilog_files = []
for i in range(self.num_projects):
verilog_files += self.projects.get_verilog_names(i)
verilog_files = CaravelConfig.unique(verilog_files)
with open('verilog/includes/includes.rtl.caravel_user_project', 'w') as fh:
fh.write('-v $(USER_PROJECT_VERILOG)/rtl/user_project_wrapper.v\n')
fh.write('-v $(USER_PROJECT_VERILOG)/rtl/scan_controller/scan_controller.v\n')
fh.write('-v $(USER_PROJECT_VERILOG)/rtl/cells.v\n')
for verilog in verilog_files:
fh.write('-v $(USER_PROJECT_VERILOG)/rtl/{}\n'.format(verilog))
# build GL includes
verilog_files = []
for i in range(self.num_projects):
verilog_files += self.projects.get_gl_verilog_names(i)
verilog_files = CaravelConfig.unique(verilog_files)
with open('verilog/includes/includes.gl.caravel_user_project', 'w') as fh:
fh.write('-v $(USER_PROJECT_VERILOG)/gl/user_project_wrapper.v\n')
fh.write('-v $(USER_PROJECT_VERILOG)/gl/scan_controller.v\n')
for verilog in verilog_files:
fh.write('-v $(USER_PROJECT_VERILOG)/gl/{}\n'.format(verilog))
def build_docs(self):
logging.info("building doc index")
with open("README_init.md") as fh:
readme = fh.read()
with open("README.md", 'w') as fh:
fh.write(readme)
for wokwi_id, project_url in zip(self.projects.get_wokwi_ids(), self.projects.get_project_urls()):
fh.write("* [{}]({}) {}\n".format(wokwi_id, Projects.build_wokwi_url(wokwi_id), project_url))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="TinyTapeout")
parser.add_argument('--list', help="list projects", action='store_const', const=True)
parser.add_argument('--update-projects', help='fetch the project data', action='store_const', const=True)
parser.add_argument('--update-single', help='only fetch a single repo for debug')
parser.add_argument('--update-caravel', help='configure caravel for build', action='store_const', const=True)
parser.add_argument('--limit-num-projects', help='only configure for the first n projects', type=int, default=DEFAULT_NUM_PROJECTS)
parser.add_argument('--test', help='use test projects', action='store_const', const=True)
parser.add_argument('--debug', help="debug logging", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO)
# stuff for help with applying patches to all designs and re-hardening
parser.add_argument('--clone-all', help="clone all projects", action="store_const", const=True)
parser.add_argument('--apply-git-patch', help="git patch", action="store_const", const=True)
parser.add_argument('--harden', help="harden", action="store_const", const=True)
parser.add_argument('--formal', help="formal scan proof", action="store_const", const=True)
parser.add_argument('--summary', help="summary", action="store_const", const=True)
args = parser.parse_args()
# setup log
log_format = logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s')
# configure the client logging
log = logging.getLogger('')
# has to be set to debug as is the root logger
log.setLevel(args.loglevel)
# turn off debug logging for requests
logging.getLogger("requests").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
# create console handler and set level to info
ch = logging.StreamHandler(sys.stdout)
# create formatter for console
ch.setFormatter(log_format)
log.addHandler(ch)
projects = Projects(update_cache=args.update_projects, test=args.test, update_single=args.update_single)
projects.check_dupes()
if args.clone_all:
projects.clone_projects()
if args.apply_git_patch:
projects.apply_git_patch()
if args.harden:
projects.harden()
if args.formal:
projects.formal_scan()
if args.summary:
projects.summary_report()
if args.list:
projects.list()
caravel = CaravelConfig(projects, num_projects=args.limit_num_projects)
if args.update_caravel:
caravel.create_macro_config()
caravel.instantiate()
caravel.build_docs()