Source code for coffe.utils

import sys
import os
import shutil
import time 
import datetime

import re
import yaml

# Constants used for formatting the subcircuit area/delay/power table. 
# These denote the widths of various columns - first (FIRS), last (LAST) and the rest (MIDL).
# We could use better libraries for pretty printing tables, but currently we use a simple method.
FIRS_COL_WIDTH = 30  #First solu
MIDL_COL_WIDTH = 22
LAST_COL_WIDTH = 22

VPR_DEL_COL_WIDTH = 75

#  ___    _   ___     ___ ___ _  _   ___  _   ___  ___ ___ _  _  ___   _   _ _____ ___ _    ___ 
# | _ \  /_\ |   \   / __| __| \| | | _ \/_\ | _ \/ __|_ _| \| |/ __| | | | |_   _|_ _| |  / __|
# |   / / _ \| |) | | (_ | _|| .` | |  _/ _ \|   /\__ \| || .` | (_ | | |_| | | |  | || |__\__ \
# |_|_\/_/ \_\___/   \___|___|_|\_| |_|/_/ \_\_|_\|___/___|_|\_|\___|  \___/  |_| |___|____|___/
#                                                                                     
#### Parsing Utilities, repeats from RAD-Gen TODO see if they can be removed ####

[docs] def check_for_valid_path(path): ret_val = False if os.path.exists(os.path.abspath(path)): ret_val = True else: raise FileNotFoundError(f"ERROR: {path} does not exist") return ret_val
[docs] def handle_error(fn, expected_vals: set=None): # for fn in funcs: if not fn() or (expected_vals is not None and fn() not in expected_vals): sys.exit(1)
[docs] def sanitize_config(config_dict) -> dict: """ Modifies values of yaml config file to do the following: - Expand relative paths to absolute paths """ for param, value in config_dict.copy().items(): if("path" in param or "sram_parameters" in param): if isinstance(value, list): config_dict[param] = [os.path.realpath(os.path.expanduser(v)) for v in value] elif isinstance(value, str): config_dict[param] = os.path.realpath(os.path.expanduser(value)) else: pass return config_dict
[docs] def parse_yml_config(yaml_file: str) -> dict: """ Takes in possibly unsafe path and returns a sanitized config """ safe_yaml_file = os.path.realpath(os.path.expanduser(yaml_file)) handle_error(lambda: check_for_valid_path(safe_yaml_file), {True : None}) with open(safe_yaml_file, 'r') as f: config = yaml.safe_load(f) return sanitize_config(config)
#### end of duplicates
[docs] def compare_tfall_trise(tfall, trise): """ Compare tfall and trise and returns largest value or -1.0 -1.0 is return if something went wrong in SPICE """ # Initialize output delay delay = -1.0 # Compare tfall and trise if (tfall == 0.0) or (trise == 0.0): # Couldn't find one of the values in output # This is an error because maybe SPICE failed delay = -1.0 elif tfall > trise: if tfall > 0.0: # tfall is positive and bigger than the trise, this is a valid result delay = tfall else: # Both tfall and trise are negative, this is an invalid result delay = -1.0 elif trise >= tfall: if trise > 0.0: # trise is positive and larger or equal to tfall, this is value result delay = trise else: delay = -1.0 else: delay = -1.0 return delay
[docs] def sanatize_str_input_to_list(value): """Makes sure unneeded quotes arent included when a string of values is seperated by a space and saved into string seperated by spaces and surrouneded with quotes""" vals = (value.strip("\"")).split(" ") return vals
[docs] def check_hard_params(hard_params, run_options): """ This function checks the hardblock/process parameters to make sure that all the parameters have been read in. Right now, this functions just checks really basic stuff like checking for unset values """ #These are optional parameters which have been determined to be optional for all run options optional_params = ["process_params_file","mode_signal","condensed_results_folder"] if(hard_params["partition_flag"] == False): optional_params.append("ptn_settings_file") #ungrouping regex is required to partition design optional_params.append("ungroup_regex") if(not run_options.parallel_hb_flow and not run_options.parse_pll_hb_flow and not run_options.gen_hb_scripts): optional_params.append("parallel_hardblock_folder") if(not run_options.parallel_hb_flow): optional_params.append("mp_num_cores") optional_params.append("run_settings_file") # if(not run_options.parse_pll_hb_flow): # optional_params.append("coffe_repo_path") #TODO make this sort of a documentation for each parameter for key,val in list(hard_params.items()): #Checks to see if value in parameter dict is unset, if its in the optional params list for this run type then it can be ignored if ((val == "" or val == -1 or val == -1.0 or val == []) and key not in optional_params): print(("param \"%s\" is unset, please go to your hardblock/process params file and set it" % (key))) sys.exit(1) elif(key == "pnr_tool" and val != "encounter" and val != "innovus" ): print("ERROR: pnr_tool must be set as either \"encounter\" or \"innovus\" ") sys.exit(1) if(hard_params["pnr_tool"] == "innovus" and hard_params["process_size"] == ""): print("param process_size is unset, please go to your hardblock/process params file and set it") sys.exit(1)
[docs] def load_run_params(filename): run_flow_stages = ["synth","pnr","sta"] per_flow_blocks = ["param_filters"] param_block_begin_str = "begin" begin_res = [re.compile(".*"+flow_stage+r"\s+"+param_block_begin_str+".*") for flow_stage in run_flow_stages] sub_begin_res = [re.compile(".*"+per_flow_block+r"\s+"+param_block_begin_str+".*") for per_flow_block in per_flow_blocks] end_re = re.compile(".*end.*") # end_res = [re.compile(".*"+flow_stage+"\s+"+param_block_end_str+".*") for flow_stage in run_flow_stages] fd = open(filename,"r") #read in file and get a list of all lines without comments run_params_text = fd.read() run_params_list = run_params_text.split("\n") run_params_clean = [line for line in run_params_list if not (line.startswith("#") or line == "")] run_params_dict = {} flow_str = "" per_flow_block_str = "" param_nest_lvl = 0 for line in run_params_clean: skip_parse = 0 for idx,begin_re in enumerate(begin_res): if begin_re.search(line): flow_str = run_flow_stages[idx] flow_tmp_dict = {} flow_tmp_dict[flow_str] = {} param_nest_lvl += 1 skip_parse = 1 break for idx,sub_begin_re in enumerate(sub_begin_res): if sub_begin_re.search(line): per_flow_block_str = per_flow_blocks[idx] flow_tmp_dict[flow_str][per_flow_block_str] = {} param_nest_lvl += 1 skip_parse = 1 break if(end_re.search(line)): param_nest_lvl -= 1 if(param_nest_lvl == 0): run_params_dict[flow_str] = flow_tmp_dict[flow_str] flow_str = "" elif(param_nest_lvl == 1): per_flow_block_str = "" skip_parse = 1 if(skip_parse): continue parsed_line = parse_ptn_param_line(line) if(flow_str != "" and per_flow_block_str != ""): if(not isinstance(parsed_line[1],list)): flow_tmp_dict[flow_str][per_flow_block_str][parsed_line[0]] = [parsed_line[1]] else: flow_tmp_dict[flow_str][per_flow_block_str][parsed_line[0]] = parsed_line[1] elif(flow_str != ""): flow_tmp_dict[flow_str][parsed_line[0]] = parsed_line[1] #need to make sure all elements in the dict are stored as list fd.close() return run_params_dict
[docs] def parse_ptn_param_line(line): # white_sp_re = re.compile("\s+") #This is a poor parser, just need it to work, when coffe is updated to python3 this can be done via yaml file split_line = line.split(":") #this list contains all characters which can legally bound ptn parameters valid_char_start_list = ["\"","["] valid_char_end_list = ["\"","]"] parsed_line = [] # print(split_line) for line in split_line: read_char_flag = 0 # t_line = white_sp_re.sub(string=line,repl="") clean_line = "" bounds_char= "" for char in line: #Works for parsing strings if(char in valid_char_start_list and not read_char_flag): bounds_char = valid_char_end_list[valid_char_start_list.index(char)] read_char_flag = 1 continue elif(char == bounds_char and read_char_flag): read_char_flag = 0 continue if(read_char_flag): clean_line = clean_line + char parsed_line.append(clean_line) # print(parsed_line) #if there are any lists in the parsed_line we can make them into a sublist for convenience updated_parsed_line = [] #all characters matching below regex are removed from the line list_clean_re = re.compile(r"\[|\]|\s") gen_clean_re = re.compile("\"") for subline in parsed_line: #remove the hard brackets, quotes and spaces from the line #if a comma is in the subline, it is a list new_subline = gen_clean_re.sub(repl="",string=subline) if("," in subline): #seperate subline into list via comma delimiter new_subline = list_clean_re.sub(repl="",string=new_subline) new_subline = new_subline.split(",") updated_parsed_line.append(new_subline) return updated_parsed_line
[docs] def load_ptn_params(filename): """ Parse the user defined partition settings, these get read into a dict for each partition in the design """ #init ptn data structure #parse the input file and create a ptn dict for each structure ptn_params = { "ptn_list" : [], #list of all partitions in design "scaling_array": [], #list of floorplan scales to be swept across "fp_init_dims": [], # two element list of "fp_pin_spacing": "" } ptn_dict = { "inst_name": "", "mod_name": "", "fp_coords": [] } top_settings_re = re.compile(r".*top_settings\s+begin.*") ptn_begin_re = re.compile(r".*ptn\s+begin.*") ptn_end_re = re.compile(r".*end.*") fd = open(os.path.expanduser(filename),"r") ptn_params_text = fd.read() ptn_params_list = ptn_params_text.split("\n") ptn_params_clean = [line for line in ptn_params_list if not (line.startswith("#") or line == "")] read_ptn_flag = 0 read_ptn_top_setting_flag = 0 parsed_lines = [] num_ptns = 0 for line in ptn_params_clean: #raise flag if the ptn params are found if(ptn_begin_re.search(line)): read_ptn_flag = 1 continue elif(ptn_end_re.search(line)): read_ptn_flag = 0 read_ptn_top_setting_flag = 0 num_ptns += 1 continue elif(top_settings_re.search(line)): read_ptn_top_setting_flag = 1 continue if(read_ptn_flag or read_ptn_top_setting_flag): parsed_line = parse_ptn_param_line(line) if(read_ptn_flag): parsed_lines.append(parsed_line) #this if statement handles top level settings (ie those applied to all ptns or the design fp) elif(read_ptn_top_setting_flag): if(len(parsed_line) > 1): ptn_params[parsed_line[0]] = parsed_line[1] #we will have the same number #some bug in python2 or something not sure but you cant assign values to dict list elements in below list in regular way #ptn_params["ptn_list"][ptn_idx][key] = val ptn_idx = 0 key_cnt = 0 tmp_dict = {} ptn_list = [] for line in parsed_lines: #first argument is always dict_key key = line[0] val = line[1] #dont continue if key is not in dict if(key not in ptn_dict): print(("ERROR: Found invalid partition parameter (" + key + ") in " + filename)) sys.exit(1) tmp_dict[key] = val key_cnt += 1 #If someone ever needs to do anything after all params have been read into a dict inst do it in below if statement if(key_cnt % len(list(ptn_dict.keys())) == 0): ptn_list.append(tmp_dict) #reset tmp dict tmp_dict = {} ptn_idx += 1 ptn_params["ptn_list"] = ptn_list fd.close() return ptn_params
[docs] def check_arch_params (arch_params, filename): """ This function checks the architecture parameters to make sure that all the parameters specified are compatible with COFFE. Right now, this functions just checks really basic stuff like checking for negative values where there shoulnd't be or making sure the LUT size is supported etc. But in the future I think it might be a good idea to make this checker a little more intelligent. For example, we might check things like Fc values that make no sense. Such as an Fc that is so small that you can't connect to wires, or something like that. """ # TODO: Make these error messages more descriptive of that the problem is. if arch_params['W'] <= 0: print_error (str(arch_params['W']), "W", filename) if arch_params['L'] <= 0: print_error (str(arch_params['L']), "L", filename) if arch_params['Fs'] <= 0: print_error (str(arch_params['Fs']), "Fs", filename) if arch_params['N'] <= 0: print_error (str(arch_params['N']), "N", filename) # We only support 4-LUT, 5-LUT or 6-LUT if arch_params['K'] < 4 or arch_params['K'] > 6: print_error (str(arch_params['K']), "K", filename) if arch_params['I'] <= 0: print_error (str(arch_params['I']), "I", filename) if arch_params['Fcin'] <= 0.0 or arch_params['Fcin'] > 1.0: print_error (str(arch_params['Fcin']), "Fcin", filename) if arch_params['Fcout'] <= 0.0 or arch_params['Fcout'] > 1.0 : print_error (str(arch_params['Fcout']), "Fcout", filename) if arch_params['Or'] <= 0: print_error (str(arch_params['Or']), "Or", filename) # We currently only support architectures that have local feedback routing. # It might be a good idea to change COFFE such that you can specify an # architecture with no local feedback routing. if arch_params['Ofb'] <= 0: print_error (str(arch_params['Ofb']), "Ofb", filename) if arch_params['Fclocal'] <= 0.0 or arch_params['Fclocal'] > 1.0: print_error (str(arch_params['Fclocal']), "Fclocal", filename) # Rsel can 'a' the last LUT input. For example for a 6-LUT Rsel can be 'a' to 'f' or 'z' which means no Rsel. if arch_params['Rsel'] != 'z' and (arch_params['Rsel'] < 'a' or arch_params['Rsel'] > chr(arch_params['K']+96)): print_error (arch_params['Rsel'], "Fclocal", filename) # Rfb can be 'z' which means to Rfb. If not 'z', Rfb is a string of the letters of all the LUT inputs that are Rfb. if arch_params['Rfb'] == 'z': pass elif len(arch_params['Rfb']) > arch_params['K']: print_error (arch_params['Rfb'], "Rfb", filename, "(you specified more Rfb LUT inputs than there are LUT inputs)") else: # Now, let's make sure all these characters are valid characters for character in arch_params['Rfb']: # The character has to be a valid LUT input if (character < 'a' or character > chr(arch_params['K']+96)): print_error (arch_params['Rfb'], "Rfb", filename, " (" + character + " is not a valid LUT input)") # The character should not appear twice elif arch_params['Rfb'].count(character) > 1: print_error (arch_params['Rfb'], "Rfb", filename, " (" + character + " appears more than once)") # only one or two FAs are allowed per ble if arch_params['FAs_per_flut'] > 2: print_error (str(arch_params['FAs_per_flut']), "FAs_per_flut", filename, "(number of FA per ble should be 2 or less)") # Currently, I only generate the circuit assuming we have a fracturable lut. It can easily be changed to support nonfracturable luts as well. if arch_params['enable_carry_chain'] == 1 and arch_params['use_fluts'] == False: print_error_not_compatable("carry chains", "non fracturable lut") # Check process technology parameters if arch_params['transistor_type'] != 'bulk' and arch_params['transistor_type'] != 'finfet': print_error (arch_params['transistor_type'], "transistor_type", filename) if arch_params['switch_type'] != 'pass_transistor' and arch_params['switch_type'] != 'transmission_gate': print_error (arch_params['switch_type'], "switch_type", filename) if arch_params['vdd'] <= 0 : print_error (str(arch_params['vdd']), "vdd", filename) if arch_params['gate_length'] <= 0 : print_error (str(arch_params['gate_length']), "gate_length", filename) if arch_params['rest_length_factor'] <= 0 : print_error (str(arch_params['rest_length_factor']), "rest_length_factor", filename) if arch_params['min_tran_width'] <= 0 : print_error (str(arch_params['min_tran_width']), "min_tran_width", filename) if arch_params['min_width_tran_area'] <= 0 : print_error (str(arch_params['min_width_tran_area']), "min_width_tran_area", filename) if arch_params['sram_cell_area'] <= 0 : print_error (str(arch_params['sram_cell_area']), "sram_cell_area", filename) if arch_params['trans_diffusion_length'] <= 0 : print_error (str(arch_params['trans_diffusion_length']), "trans_diffusion_length", filename) if arch_params['enable_bram_module'] == 1 and arch_params['use_finfet'] == True: print_error_not_compatable("finfet", "BRAM")
# if arch_params['use_finfet'] == True and arch_params['use_fluts'] == True: # print_error_not_compatable("finfet", "flut") # if arch_params['coffe_repo_path'].split("/")[-1] != "COFFE" or os.path.isdir(arch_params['coffe_repo_path']): # print_error (arch_params['coffe_repo_path'],"coffe_repo_path",filename)
[docs] def extract_initial_tran_size(filename, use_tgate): """ Parse the initial sizes file and load values into dictionary. Returns this dictionary. """ transistor_sizes = {} sizes_file = open(filename, 'r') for line in sizes_file: # Ignore comment lines if line.startswith('#'): continue # Remove line feeds and spaces line = line.replace('\n', '') line = line.replace('\r', '') line = line.replace('\t', '') line = line.replace(' ', '') # Ignore empty lines if line == "": continue # Split lines at '=' words = line.split('=') trans = words[0] size = words[1] transistor_sizes[trans] = float(size) sizes_file.close() return transistor_sizes
[docs] def use_initial_tran_size(initial_sizes, fpga_inst, tran_sizing, use_tgate): print("Extracting initial transistor sizes from: " + initial_sizes) initial_tran_size = extract_initial_tran_size(initial_sizes, use_tgate) print("Setting transistor sizes to extracted values") tran_sizing.override_transistor_sizes(fpga_inst, initial_tran_size) for tran in initial_tran_size : fpga_inst.transistor_sizes[tran] = initial_tran_size[tran] print("Re-calculating area...") fpga_inst.update_area() print("Re-calculating wire lengths...") fpga_inst.update_wires() print("Re-calculating resistance and capacitance...") fpga_inst.update_wire_rc() print("")
[docs] def check_for_time(): """ This finction should be used before each call for HSPICE it checks if the time is between 2:30 a.m and 3:30 a.m. since during this time it was found that the license doesn't work on my machine. So, to avoid program termination this function was written. If you're using COFFE on a machine that doesn't have this problem you can comment this function in the code """ now = datetime.datetime.now() if (now.hour == 2 or now.hour == 3): print("-----------------------------------------------------------------") print(" Entered the check for time function @ " + str(now.hour) +":" + str(now.minute) + ":" + str(now.second)) print("-----------------------------------------------------------------") print("") while (now.hour == 2 and now.minute >= 30) or (now.hour == 3 and now.minute < 30): #while (now.minute >= 20) and (now.minute < 25): print("\tI'm sleeping") time.sleep(60) now = datetime.datetime.now() if not ((now.hour == 2 and now.minute >= 30) or (now.hour == 3 and now.minute < 30)): print("\tExecution is resumed") now = datetime.datetime.now() if (now.hour == 2 or now.hour == 3): print("-----------------------------------------------------------------") print(" Exited the check for time function @ " + str(now.hour) +":" + str(now.minute) + ":" + str(now.second)) print("-----------------------------------------------------------------") print("")
[docs] def create_output_dir(arch_file_name, arch_out_folder): """ This function creates the architecture folder and returns its name. It also deletes the content of the folder in case it's already created to avoid any errors in case of multiple runs on the same architecture file. If arch_out_folder is specified in the input params file, then that is used as the architecture folder, otherwise the folder containing the arch params file is used. """ if arch_out_folder == "" or arch_out_folder == "None": arch_desc_words = arch_file_name.split('.') arch_folder = arch_desc_words[0] else: arch_folder = os.path.expanduser(arch_out_folder) if not os.path.exists(arch_folder): os.makedirs(arch_folder) else: # Delete contents of sub-directories # COFFE generates several 'intermediate results' files during sizing # so we delete them to avoid from having them pile up if we run COFFE # more than once. dir_contents = os.listdir(arch_folder) for content in dir_contents: if os.path.isdir(arch_folder + "/" + content): shutil.rmtree(arch_folder + "/" + content) return arch_folder