blob: 48f2ffa71a1a88148aff652363d0e9bc2e966c05 [file] [log] [blame]
# Copyright 2022 GlobalFoundries PDK Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run GlobalFoundries 180nm Variants DRC Regression.
Usage:
run_regression.py (--help| -h)
run_regression.py (--variant=<variant_name>) [--mp=<num>] [--run_name=<run_name>] [--table_name=<table_name>]
Options:
--help -h Print this help message.
--variant=<variant_name> Variant name to run on.
--mp=<num> The number of threads used in run.
--run_name=<run_name> Select your run name.
--table_name=<table_name> Target specific table.
"""
from subprocess import check_call
from subprocess import Popen, PIPE
import concurrent.futures
import traceback
import yaml
from docopt import docopt
import os
from datetime import datetime
import xml.etree.ElementTree as ET
import time
import pandas as pd
import logging
import glob
from pathlib import Path
from tqdm import tqdm
import re
import gdstk
import errno
import numpy as np
from collections import defaultdict
SUPPORTED_TC_EXT = "gds"
SUPPORTED_SW_EXT = "yaml"
RULE_LAY_NUM = 10000
PATH_WIDTH = 0.01
RULE_STR_SEP = "--"
ANALYSIS_RULES = [
"pass_patterns",
"fail_patterns",
"false_negative",
"false_positive",
"not_tested",
]
def get_unit_test_coverage(gds_file):
"""
This function is used for getting all test cases available inside a single test table.
Parameters
----------
gds_file : str
Path string to the location of unit test cases path.
Returns
-------
list
A list of unique rules found.
"""
# Get rules from gds
rules = []
# layer num of rule text
lay_num = 11
# layer data type of rule text
lay_dt = 222
# Getting all rules names from testcase
library = gdstk.read_gds(gds_file)
top_cells = library.top_level() # Get top cells
for cell in top_cells:
flatten_cell = cell.flatten()
# Get all text labels for each cell
labels = flatten_cell.get_labels(
apply_repetitions=True, depth=None, layer=lay_num, texttype=lay_dt
)
# Get label value
for label in labels:
rule = label.text
if rule not in rules:
rules.append(rule)
return rules
def check_klayout_version():
"""
check_klayout_version checks klayout version and makes sure it would work with the DRC.
"""
# ======= Checking Klayout version =======
klayout_v_ = os.popen("klayout -b -v").read()
klayout_v_ = klayout_v_.split("\n")[0]
klayout_v_list = []
if klayout_v_ == "":
logging.error("Klayout is not found. Please make sure klayout is installed.")
exit(1)
else:
klayout_v_list = [int(v) for v in klayout_v_.split(" ")[-1].split(".")]
logging.info(f"Your Klayout version is: {klayout_v_}")
if len(klayout_v_list) < 1 or len(klayout_v_list) > 3:
logging.error("Was not able to get klayout version properly.")
exit(1)
elif len(klayout_v_list) >= 2 or len(klayout_v_list) <= 3:
if klayout_v_list[1] < 28:
logging.error("Prerequisites at a minimum: KLayout 0.28.0")
logging.error(
"Using this klayout version has not been assesed in this development. Limits are unknown"
)
exit(1)
def get_switches(yaml_file, rule_name):
"""Parse yaml file and extract switches data
Parameters
----------
yaml_file : str
yaml config file path given py the user.
Returns
-------
yaml_dic : dictionary
dictionary containing switches data.
"""
# load yaml config data
with open(yaml_file, "r") as stream:
try:
yaml_dic = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
return [f"{param}={value}" for param, value in yaml_dic[rule_name].items()]
def parse_results_db(results_database):
"""
This function will parse Klayout database for analysis.
Parameters
----------
results_database : string or Path object
Path string to the results file
Returns
-------
set
A set that contains all rules in the database with or without violations
"""
mytree = ET.parse(results_database)
myroot = mytree.getroot()
# Initial values for counter
rule_counts = defaultdict(int)
# Get the list of all rules that ran regardless it generated output or not
for z in myroot[5]:
rule_name = f"{z[0].text}"
rule_counts[rule_name] = 0
# Count rules with violations.
for z in myroot[7]:
rule_name = f"{z[1].text}".replace("'", "")
rule_counts[rule_name] += 1
return rule_counts
def parse_results_db_splitted(results_database):
"""
This function will parse Klayout database for analysis.
Parameters
----------
results_database : string or Path object
Path string to the results file
Returns
-------
set
A set that contains all rules in the database with violations
"""
mytree = ET.parse(results_database)
myroot = mytree.getroot()
all_violating_rules = set()
for z in myroot[7]: # myroot[7] : List rules with viloations
all_violating_rules.add(f"{z[1].text}".replace("'", ""))
return all_violating_rules
def analyze_splitted_results(layout_path, pattern_results, test_criteria):
"""
This function run a single test case using the correct DRC file.
Parameters
----------
layout_path : stirng or Path object
Path string to the layout of the test pattern we want to test.
pattern_results : stirng or Path object
Path to the location where is the result db file of DRC run is generated.
test_criteria : string
Type of test that we are running on.
Returns
-------
dict
A dict with all rule counts
"""
# Initial value for counters
rule_counts = defaultdict(int)
# Get test rule name
test_rule_path = Path(layout_path).parents[1]
test_rule = os.path.basename(test_rule_path)
# Get pass/fail patterns counts
pass_patterns = os.path.join(test_rule_path, "pass", "*.gds")
fail_patterns = os.path.join(test_rule_path, "fail", "*.gds")
pass_count = len(glob.glob(str(pass_patterns)))
fail_count = len(glob.glob(str(fail_patterns)))
if len(pattern_results) > 0:
violated_rules = set()
for p in pattern_results:
rules_with_violations = parse_results_db_splitted(p)
violated_rules.update(rules_with_violations)
if test_criteria == "pass":
if test_rule in violated_rules:
# false positive
return {f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[0]}': pass_count, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[1]}': fail_count,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[3]}': 1, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[2]}': 0,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[4]}': 0}
else:
# true positive
return {f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[0]}': pass_count, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[1]}': fail_count,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[3]}': 0, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[2]}': 0,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[4]}': 0}
else:
if test_rule in violated_rules:
# true negative
return {f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[0]}': pass_count, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[1]}': fail_count,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[3]}': 0, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[2]}': 0,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[4]}': 0}
else:
# false negative
return {f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[0]}': pass_count, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[1]}': fail_count,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[3]}': 0, f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[2]}': 1,
f'{test_rule}{RULE_STR_SEP}{ANALYSIS_RULES[4]}': 0}
else:
return rule_counts
def run_test_case(
drc_dir,
layout_path,
run_dir,
testcase_basename,
table_name,
test_criteria,
):
"""
This function run a single test case using the correct DRC file.
Parameters
----------
drc_dir : string or Path
Path to the location where all runsets exist.
layout_path : stirng or Path object
Path string to the layout of the test pattern we want to test.
run_dir : stirng or Path object
Path to the location where is the regression run is done.
testcase_basename : string
Testcase name that we are running on.
table_name : string
Table name that we are running on.
test_criteria : string
Type of test that we are running on.
Returns
-------
dict
A dict with all rule counts
"""
# Initial value for counters
rule_counts = defaultdict(int)
# Get switches used for each run
sw_file = os.path.join(
Path(layout_path.parent).absolute(), f"{testcase_basename}.{SUPPORTED_SW_EXT}"
)
if os.path.exists(sw_file):
switches = " ".join(get_switches(sw_file, testcase_basename))
else:
switches = "--variant=C" # default switch
# Adding switches for specific runsets
if "antenna" in str(layout_path):
switches += " --antenna_only"
elif "density" in str(layout_path):
switches += " --density_only"
# Creating run folder structure
pattern_clean = ".".join(os.path.basename(layout_path).split(".")[:-1])
output_loc = f"{run_dir}/{table_name}"
pattern_log = f"{output_loc}/{pattern_clean}_drc.log"
# command to run drc
call_str = f"python3 {drc_dir}/run_drc.py --path={layout_path} {switches} --table={table_name} --run_dir={output_loc} --run_mode=flat --thr=1 > {pattern_log} 2>&1"
# Starting klayout run
os.makedirs(output_loc, exist_ok=True)
try:
check_call(call_str, shell=True)
except Exception as e:
pattern_results = glob.glob(os.path.join(output_loc, f"{pattern_clean}*.lyrdb"))
if len(pattern_results) < 1:
logging.error("%s generated an exception: %s" % (pattern_clean, e))
traceback.print_exc()
raise Exception("Failed DRC run.")
# dumping log into output to make CI have the log
if os.path.isfile(pattern_log):
logging.info("# Dumping drc run output log:")
with open(pattern_log, "r") as f:
for line in f:
line = line.strip()
logging.info(f"{line}")
# Checking if run is completed or failed
pattern_results = glob.glob(os.path.join(output_loc, f"{pattern_clean}*.lyrdb"))
# Analysis of splitted testcases into patterns
if test_criteria in ['pass' , 'fail']:
return analyze_splitted_results(
layout_path, pattern_results, test_criteria
)
else:
# Get list of rules covered in the test case
rules_tested = get_unit_test_coverage(layout_path)
if len(pattern_results) > 0:
# db to gds conversion
marker_output, runset_analysis = convert_results_db_to_gds(
pattern_results[0], rules_tested
)
# Generating merged testcase for violated rules
merged_output = generate_merged_testcase(layout_path, marker_output)
# Generating final db file
if os.path.exists(merged_output):
final_report = f'{merged_output.split(".")[0]}_final.lyrdb'
analysis_log = f'{merged_output.split(".")[0]}_analysis.log'
call_str = f"klayout -b -r {runset_analysis} -rd input={merged_output} -rd report={final_report} > {analysis_log} 2>&1"
failed_analysis_step = False
try:
check_call(call_str, shell=True)
except Exception as e:
failed_analysis_step = True
logging.error("%s generated an exception: %s" % (pattern_clean, e))
traceback.print_exc()
# dumping log into output to make CI have the log
if os.path.isfile(analysis_log):
logging.info("# Dumping analysis run output log:")
with open(analysis_log, "r") as f:
for line in f:
line = line.strip()
logging.info(f"{line}")
if failed_analysis_step:
raise Exception("Failed DRC analysis run.")
if os.path.exists(final_report):
rule_counts = parse_results_db(final_report)
return rule_counts
else:
return rule_counts
else:
return rule_counts
else:
return rule_counts
def run_all_test_cases(tc_df, drc_dir, run_dir, num_workers):
"""
This function run all test cases from the input dataframe.
Parameters
----------
tc_df : pd.DataFrame
DataFrame that holds all the test cases information for running.
drc_dir : string or Path
Path string to the location of the drc runsets.
run_dir : string or Path
Path string to the location of the testing code and output.
num_workers : int
Number of workers to use for running the regression.
Returns
-------
pd.DataFrame
A pandas DataFrame with all test cases information post running.
"""
results_df_list = []
tc_df["run_status"] = "no status"
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
future_to_run_id = dict()
for i, row in tc_df.iterrows():
future_to_run_id[
executor.submit(
run_test_case,
drc_dir,
row["test_path"],
run_dir,
row["testcase_basename"],
row["table_name"],
row["test_criteria"],
)
] = row["run_id"]
for future in concurrent.futures.as_completed(future_to_run_id):
run_id = future_to_run_id[future]
try:
rule_counts = future.result()
if rule_counts:
rule_counts_df = pd.DataFrame(
{
"analysis_rule": rule_counts.keys(),
"count": rule_counts.values(),
}
)
rule_counts_df["rule_name"] = (
rule_counts_df["analysis_rule"].str.split(RULE_STR_SEP).str[0]
)
rule_counts_df["type"] = (
rule_counts_df["analysis_rule"].str.split(RULE_STR_SEP).str[1]
)
rule_counts_df.drop(columns=["analysis_rule"], inplace=True)
rule_counts_df["count"] = rule_counts_df["count"].astype(int)
rule_counts_df = rule_counts_df.pivot(
index="rule_name", columns="type", values="count"
)
rule_counts_df = rule_counts_df.fillna(0)
rule_counts_df = rule_counts_df.reset_index(drop=False)
rule_counts_df = rule_counts_df.rename(
columns={"index": "rule_name"}
)
rule_counts_df["table_name"] = tc_df.loc[
tc_df["run_id"] == run_id, "table_name"
].iloc[0]
for c in ANALYSIS_RULES:
if c not in rule_counts_df.columns:
rule_counts_df[c] = 0
rule_counts_df[ANALYSIS_RULES] = rule_counts_df[
ANALYSIS_RULES
].astype(int)
rule_counts_df = rule_counts_df[
["table_name", "rule_name"] + ANALYSIS_RULES
]
results_df_list.append(rule_counts_df)
tc_df.loc[tc_df["run_id"] == run_id, "run_status"] = "completed"
else:
tc_df.loc[tc_df["run_id"] == run_id, "run_status"] = "no output"
except Exception as exc:
logging.error("%d generated an exception: %s" % (run_id, exc))
traceback.print_exc()
tc_df.loc[tc_df["run_id"] == run_id, "run_status"] = "exception"
if len(results_df_list) > 0:
results_df = pd.concat(results_df_list)
else:
results_df = pd.DataFrame()
return results_df, tc_df
def parse_existing_rules(rule_deck_path, output_path, target_table=None):
"""
This function collects the rule names from the existing drc rule decks.
Parameters
----------
rule_deck_path : string or Path object
Path string to the DRC directory where all the DRC files are located.
output_path : string or Path
Path of the run location to store the output analysis file.
target_table : string Optional
Name of the table to be in testing
Returns
-------
pd.DataFrame
A pandas DataFrame with the rule and rule deck used.
"""
if target_table is None:
drc_files = glob.glob(os.path.join(rule_deck_path, "rule_decks", "*.drc"))
else:
table_rule_file = os.path.join(
rule_deck_path, "rule_decks", f"{target_table}.drc"
)
if not os.path.isfile(table_rule_file):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), table_rule_file
)
drc_files = [table_rule_file]
rules_data = list()
for runset in drc_files:
with open(runset, "r") as f:
for line in f:
if ".output" in line:
line_list = line.split("'")
rule_info = dict()
rule_info["table_name"] = os.path.basename(runset).replace(
".drc", ""
)
rule_info["rule_name"] = line_list[1]
rule_info["in_rule_deck"] = 1
rules_data.append(rule_info)
df = pd.DataFrame(rules_data)
df.drop_duplicates(inplace=True)
df.to_csv(os.path.join(output_path, "rule_deck_rules.csv"), index=False)
return df
def generate_merged_testcase(orignal_testcase, marker_testcase):
"""
This function will merge orignal gds file with generated
markers gds file.
Parameters
----------
orignal_testcase : string or Path object
Path string to the orignal testcase
marker_testcase : string or Path
Path of the output marker gds file generated from db file.
Returns
-------
merged_gds_path : string or Path
Path of the final merged gds file generated.
"""
new_lib = gdstk.Library()
lib_org = gdstk.read_gds(orignal_testcase)
lib_marker = gdstk.read_gds(marker_testcase)
# Getting flattened top cells
top_cell_org = lib_org.top_level()[0].flatten(apply_repetitions=True)
top_cell_marker = lib_marker.top_level()[0].flatten(apply_repetitions=True)
marker_polygons = top_cell_marker.get_polygons(
apply_repetitions=True, include_paths=True, depth=None
)
# Merging all polygons of markers with original testcase
for marker_polygon in marker_polygons:
top_cell_org.add(marker_polygon)
# Adding flattened merged cell
new_lib.add(top_cell_org.flatten(apply_repetitions=True))
# Writing final merged gds file
merged_gds_path = f'{marker_testcase.replace(".gds", "")}_merged.gds'
new_lib.write_gds(merged_gds_path)
return merged_gds_path
def draw_polygons(polygon_data, cell, lay_num, lay_dt, path_width):
"""
This function is used for drawing gds file with all violated polygons.
Parameters
----------
polygon_data : str
Contains data points for each violated polygon
cell: gdstk.Cell
Top cell will contains all generated polygons
lay_num: int
Number of layer used to draw violated polygons
lay_dt : int
Data type of layer used to draw violated polygons
path_width : float
Width will used to draw edges
Returns
-------
None
"""
# Cleaning data points
polygon_data = re.sub(r"\s+", "", polygon_data)
polygon_data = re.sub(r"[()]", "", polygon_data)
tag_split = polygon_data.split(":")
tag = tag_split[0]
poly_txt = tag_split[1]
polygons = re.split(r"[/|]", poly_txt)
# Select shape type to be drawn
if tag == "polygon":
for poly in polygons:
points = [
(float(p.split(",")[0]), float(p.split(",")[1]))
for p in poly.split(";")
]
cell.add(gdstk.Polygon(points, lay_num, lay_dt))
elif tag == "edge-pair":
for poly in polygons:
points = [
[float(p.split(",")[0]), float(p.split(",")[1])]
for p in poly.split(";")
]
dist = np.sqrt(
((points[0][0]) - (points[1][0])) ** 2
+ ((points[0][1]) - (points[1][1])) ** 2
)
# Adding condition for extremely small edge length
## to generate a path to be drawn
if dist < path_width:
points[1][0] = points[0][0] + 2 * path_width
cell.add(gdstk.FlexPath(points, path_width, layer=lay_num, datatype=lay_dt))
elif tag == "edge":
for poly in polygons:
points = [
[float(p.split(",")[0]), float(p.split(",")[1])]
for p in poly.split(";")
]
dist = np.sqrt(
((points[0][0]) - (points[1][0])) ** 2
+ ((points[0][1]) - (points[1][1])) ** 2
)
# Adding condition for extremely small edge length
## to generate a path to be drawn
if dist < path_width:
points[1][0] = points[0][0] + 2 * path_width
cell.add(gdstk.FlexPath(points, path_width, layer=lay_num, datatype=lay_dt))
elif "float" or "text" in tag :
# Known antenna values for antenna ratios
pass
else:
logging.error(f"## Unknown type: {tag} ignored")
def convert_results_db_to_gds(results_database: str, rules_tested: list):
"""
This function will parse Klayout database for analysis.
It converts the lyrdb klayout database file to GDSII file
Parameters
----------
results_database : string or Path object
Path string to the results file
rules_tested : list
List of strings that holds the rule names that are covered by the test case.
Returns
-------
output_gds_path : string or Path
Path of the output marker gds file generated from db file.
output_runset_path : string or Path
Path of the output drc runset used for analysis.
"""
# Writing analysis rule deck
pass_marker = "input(2, 222)"
fail_marker = "input(3, 222)"
fail_marker2 = "input(6, 222)"
text_marker = "input(11, 222)"
output_runset_path = f'{results_database.replace(".lyrdb", "")}_analysis.drc'
analysis_rules = []
runset_analysis_setup = f"""
source($input)
report("DRC analysis run report at", $report)
pass_marker = {pass_marker}
fail_marker = {fail_marker}
fail_marker2 = {fail_marker2}
text_marker = {text_marker}
full_chip = extent.sized(0.0)
"""
analysis_rules.append(runset_analysis_setup)
# Generating violated rules and its points
cell_name = ""
lib = None
cell = None
in_item = False
rule_data_type_map = list()
for ev, elem in tqdm(ET.iterparse(results_database, events=("start", "end"))):
if elem.tag != "item" and not in_item:
elem.clear()
continue
if elem.tag != "item" and in_item:
continue
if elem.tag == "item" and ev == "start":
in_item = True
continue
rules = elem.findall("category")
values = elem.findall("values")
if len(values) > 0:
polygons = values[0].findall("value")
else:
polygons = []
if cell_name == "":
all_cells = elem.findall("cell")
if len(all_cells) > 0:
cell_name = all_cells[0].text
if cell_name is None:
elem.clear()
continue
lib = gdstk.Library(f"{cell_name}_markers")
cell = lib.new_cell(f"{cell_name}_markers")
if len(rules) > 0:
rule_name = rules[0].text.replace("'", "")
if rule_name is None:
elem.clear()
continue
else:
elem.clear()
continue
if rule_name not in rule_data_type_map:
rule_data_type_map.append(rule_name)
## Drawing polygons here.
rule_lay_dt = rule_data_type_map.index(rule_name) + 1
if cell is not None:
for p in polygons:
draw_polygons(p.text, cell, RULE_LAY_NUM, rule_lay_dt, PATH_WIDTH)
## Clearing memeory
in_item = False
elem.clear()
# Writing final marker gds file
if lib is not None:
output_gds_path = f'{results_database.replace(".lyrdb", "")}_markers.gds'
lib.write_gds(output_gds_path)
else:
logging.error("Failed to get any results in the lyrdb database.")
exit(1)
# Saving analysis rule deck.
for r in rule_data_type_map:
rule_lay_dt = rule_data_type_map.index(r) + 1
rule_layer_name = f'rule_{r.replace(".", "_")}'
rule_layer = f"{rule_layer_name} = input({RULE_LAY_NUM}, {rule_lay_dt})"
pass_patterns_rule = f"""
pass_marker.interacting( text_marker.texts("{r}") ).output("{r}{RULE_STR_SEP}pass_patterns", "{r}{RULE_STR_SEP}pass_patterns polygons")
"""
fail_patterns_rule = f"""
fail_marker2.interacting(fail_marker.interacting(text_marker.texts("{r}")) ).or( fail_marker.interacting(text_marker.texts("{r}")).not_interacting(fail_marker2) ).output("{r}{RULE_STR_SEP}fail_patterns", "{r}{RULE_STR_SEP}fail_patterns polygons")
"""
false_pos_rule = f"""
pass_marker.interacting(text_marker.texts("{r}")).interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}false_positive", "{r}{RULE_STR_SEP}false_positive occurred")
"""
false_neg_rule = f"""
((fail_marker2.interacting(fail_marker.interacting(text_marker.texts("{r}")))).or((fail_marker.interacting(input(11, 222).texts("{r}")).not_interacting(fail_marker2)))).not_interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}false_negative", "{r}{RULE_STR_SEP}false_negative occurred")
"""
rule_not_tested = f"""
full_chip.not_interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}not_tested", "{r}{RULE_STR_SEP}not_tested occurred")
"""
analysis_rules.append(rule_layer)
analysis_rules.append(pass_patterns_rule)
analysis_rules.append(fail_patterns_rule)
analysis_rules.append(false_pos_rule)
analysis_rules.append(false_neg_rule)
analysis_rules.append(rule_not_tested)
rule_lay_dt = len(rule_data_type_map) + 1
for r in rules_tested:
if r in rule_data_type_map:
continue
rule_layer_name = f'rule_{r.replace(".", "_")}'
rule_layer = f"{rule_layer_name} = input({RULE_LAY_NUM}, {rule_lay_dt})"
pass_patterns_rule = f"""
pass_marker.interacting( text_marker.texts("{r}") ).output("{r}{RULE_STR_SEP}pass_patterns", "{r}{RULE_STR_SEP}pass_patterns polygons")
"""
fail_patterns_rule = f"""
fail_marker2.interacting(fail_marker.interacting(text_marker.texts("{r}")) ).or( fail_marker.interacting(text_marker.texts("{r}")).not_interacting(fail_marker2) ).output("{r}{RULE_STR_SEP}fail_patterns", "{r}{RULE_STR_SEP}fail_patterns polygons")
"""
false_pos_rule = f"""
pass_marker.interacting(text_marker.texts("{r}")).interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}false_positive", "{r}{RULE_STR_SEP}false_positive occurred")
"""
false_neg_rule = f"""
((fail_marker2.interacting(fail_marker.interacting(text_marker.texts("{r}")))).or((fail_marker.interacting(input(11, 222).texts("{r}")).not_interacting(fail_marker2)))).not_interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}false_negative", "{r}{RULE_STR_SEP}false_negative occurred")
"""
rule_not_tested = f"""
full_chip.not_interacting({rule_layer_name}).output("{r}{RULE_STR_SEP}not_tested", "{r}{RULE_STR_SEP}not_tested occurred")
"""
analysis_rules.append(rule_layer)
analysis_rules.append(pass_patterns_rule)
analysis_rules.append(fail_patterns_rule)
analysis_rules.append(false_pos_rule)
analysis_rules.append(false_neg_rule)
analysis_rules.append(rule_not_tested)
rule_lay_dt += 1
with open(output_runset_path, "w") as runset_analysis:
runset_analysis.write("".join(analysis_rules))
return output_gds_path, output_runset_path
def build_tests_dataframe(unit_test_cases_dir, target_table):
"""
This function is used for getting all test cases available in a formated dataframe before running.
Parameters
----------
unit_test_cases_dir : str
Path string to the location of unit test cases path.
target_table : str or None
Name of table that we want to run regression for. If None, run all found.
Returns
-------
pd.DataFrame
A DataFrame that has all the targetted test cases that we need to run.
"""
all_unit_test_cases = sorted(
Path(unit_test_cases_dir).rglob("*.{}".format(SUPPORTED_TC_EXT))
)
logging.info(
"## Total number of test cases found: {}".format(len(all_unit_test_cases))
)
# Get test cases df from test cases
tc_df = pd.DataFrame({"test_path": all_unit_test_cases})
tc_df["testcase_basename"] = tc_df["test_path"].apply(lambda x: x.name.replace(".gds", ""))
tc_df["table_name"] = tc_df["testcase_basename"].apply(lambda x: x.split("-")[0])
tc_df["test_criteria"] = tc_df["test_path"].apply(lambda x: x.parent.name)
if target_table is not None:
tc_df = tc_df[tc_df["table_name"] == target_table]
if len(tc_df) < 1:
logging.error("No test cases remaining after filtering.")
exit(1)
tc_df["run_id"] = range(len(tc_df))
return tc_df
def aggregate_results(
tc_df: pd.DataFrame, results_df: pd.DataFrame, rules_df: pd.DataFrame
):
"""
aggregate_results Aggregate the results for all runs.
Parameters
----------
tc_df : pd.DataFrame
Dataframe that holds the information about the test cases.
results_df : pd.DataFrame
Dataframe that holds the information about the unit test rules.
rules_df : pd.DataFrame
Dataframe that holds the information about all the rules implemented in the rule deck.
Returns
-------
pd.DataFrame
A DataFrame that has all data analysis aggregated into one.
"""
if len(rules_df) < 1 and len(results_df) < 1:
logging.error("## There are no rules for analysis or run.")
exit(1)
elif len(rules_df) < 1 and len(results_df) > 0:
df = results_df
elif len(rules_df) > 0 and len(results_df) < 1:
df = rules_df
for c in ANALYSIS_RULES:
df[c] = 0
else:
df = results_df.merge(rules_df, how="outer", on=["table_name", "rule_name"])
df[ANALYSIS_RULES] = df[ANALYSIS_RULES].fillna(0)
df["in_rule_deck"] = df["in_rule_deck"].fillna(0)
df = df.merge(tc_df[["table_name", "run_status"]], how="left", on="table_name")
df["rule_status"] = "Unknown"
df.loc[(df["false_negative"] > 0), "rule_status"] = "Rule Failed"
df.loc[(df["false_positive"] > 0), "rule_status"] = "Rule Failed"
df.loc[
(df["not_tested"] > 0) | (df["pass_patterns"] < 1), "rule_status"
] = "Rule Not Tested"
df.loc[
(df["fail_patterns"] < 1) | (df["pass_patterns"] < 1), "rule_status"
] = "Rule Not Tested"
df.loc[(df["in_rule_deck"] < 1), "rule_status"] = "Rule Not Implemented"
df.loc[
~(df["run_status"].isin(["completed"])), "rule_status"
] = "Test Case Run Failed"
df.loc[
(df["not_tested"] > 0) | (df["pass_patterns"] < 1), "rule_status"
] = "Rule Not Tested"
rule_exp_cond = (
(df["fail_patterns"] > 0) & (df["false_negative"] > 0) & (df["not_tested"] > 0)
)
df.loc[rule_exp_cond, "rule_status"] = "Rule Syntax Exception"
pass_cond = (
(df["pass_patterns"] > 0)
& (df["fail_patterns"] > 0)
& (df["false_negative"] < 1)
& (df["false_positive"] < 1)
& (df["in_rule_deck"] > 0)
)
df.loc[pass_cond, "rule_status"] = "Passed"
return df
def run_regression(drc_dir, output_path, target_table, cpu_count):
"""
Running Regression Procedure.
This function runs the full regression on all test cases.
Parameters
----------
drc_dir : string
Path string to the DRC directory where all the DRC files are located.
output_path : str
Path string to the location of the output results of the run.
target_table : string or None
Name of table that we want to run regression for. If None, run all found.
cpu_count : int
Number of cpus to use in running testcases.
Returns
-------
bool
If all regression passed, it returns true. If any of the rules failed it returns false.
"""
## Parse Existing Rules
rules_df = parse_existing_rules(drc_dir, output_path, target_table)
logging.info(
"## Total number of rules found in rule decks: {}".format(len(rules_df))
)
logging.info("## Parsed Rules: \n" + str(rules_df))
## Get all test cases available in the repo.
test_cases_path = os.path.join(drc_dir, "testing/testcases")
unit_test_cases_path = os.path.join(test_cases_path, "unit")
tc_df = build_tests_dataframe(unit_test_cases_path, target_table)
logging.info("## Total table gds files found: {}".format(len(tc_df)))
logging.info("## Found testcases: \n" + str(tc_df))
## Run all test cases.
results_df, tc_df = run_all_test_cases(tc_df, drc_dir, output_path, cpu_count)
logging.info("## Testcases found results: \n" + str(results_df))
logging.info("## Updated testcases: \n" + str(tc_df))
## Aggregate all dataframes into one
df = aggregate_results(tc_df, results_df, rules_df)
df.drop_duplicates(inplace=True)
logging.info("## Final analysis table: \n" + str(df))
## Generate error if there are any missing info or fails.
df.to_csv(os.path.join(output_path, "all_test_cases_results.csv"), index=False)
## Check if there any rules that generated false positive or false negative
failing_results = df[~df["rule_status"].isin(["Passed"])]
logging.info("## Failing test cases: \n" + str(failing_results))
if len(failing_results) > 0:
logging.error("## Some test cases failed .....")
return False
else:
logging.info("## All testcases passed.")
return True
def main(drc_dir: str, output_path: str, target_table: str):
"""
Main Procedure.
This function is the main execution procedure
Parameters
----------
drc_dir : str
Path string to the DRC directory where all the DRC files are located.
output_path : str
Path string to the location of the output results of the run.
target_table : str or None
Name of table that we want to run regression for. If None, run all found.
Returns
-------
bool
If all regression passed, it returns true. If any of the rules failed it returns false.
"""
# No. of threads
cpu_count = os.cpu_count() if args["--mp"] is None else int(args["--mp"])
# Pandas printing setup
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("max_colwidth", None)
pd.set_option("display.width", 1000)
# info logs for args
logging.info("## Run folder is: {}".format(run_name))
logging.info("## Target Table is: {}".format(target_table))
# Start of execution time
t0 = time.time()
## Check Klayout version
check_klayout_version()
# Calling regression function
run_status = run_regression(drc_dir, output_path, target_table, cpu_count)
# End of execution time
logging.info("Total execution time {}s".format(time.time() - t0))
if run_status:
logging.info("Test completed successfully.")
return True
else:
logging.error("Test failed.")
return False
# ================================================================
# -------------------------- MAIN --------------------------------
# ================================================================
if __name__ == "__main__":
# arguments
args = docopt(__doc__, version="DRC Regression: 1.0")
# Pandas printing setup
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("max_colwidth", None)
pd.set_option("display.width", 1000)
variant_name = args["--variant"]
run_name = args["--run_name"]
target_table = args["--table_name"]
if variant_name not in ["BCDLite", "IC", "ULL"]:
logging.error("Variant name must be in one of those: BCDLite, IC, ULL")
exit(1)
if run_name is None:
run_name = datetime.utcnow().strftime(
f"unit_tests_{variant_name}_%Y_%m_%d_%H_%M_%S"
)
# Paths of regression dirs
main_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
drc_dir = os.path.join(main_dir, variant_name, "klayout", "drc")
output_path = os.path.join(main_dir, run_name)
# Creating output dir
os.makedirs(output_path, exist_ok=True)
# logs format
logging.basicConfig(
level=logging.DEBUG,
handlers=[
logging.FileHandler(os.path.join(output_path, f"{run_name}.log")),
logging.StreamHandler(),
],
format="%(asctime)s | %(levelname)-7s | %(message)s",
datefmt="%d-%b-%Y %H:%M:%S",
)
logging.info(f"## Run folder is: {run_name}")
logging.info(f"## Target Table is: {target_table}")
logging.info(f"## Output directory: {output_path}")
## Check Klayout version
check_klayout_version()
# Start of execution time
t0 = time.time()
# Calling main function
run_status = main(drc_dir, output_path, target_table)
# End of execution time
logging.info(f"Total execution time {time.time() - t0}s")
if run_status:
logging.info("Unit test cases passed regression")
else:
logging.error("Unit test cases failed regression")
exit(1)