blob: dfbc6b32ba0ee19c6ff09fced49a30e1aaf3fa49 [file] [log] [blame]
from scipy.optimize import least_squares, differential_evolution
import multiprocessing
import pandas as pd
from matplotlib.mlab import angle_spectrum
import numpy as np
import inspect
import gc
import time
from multiprocessing import reduction
import math
import os
import hashlib
import pickle
class NgOptimizer():
'''
Implements a circuit optimizer
'''
def __init__(self, ngSpiceInstance, spiceReplace, cacheDir):
'''
:param ngSpiceInstance:
:param spiceReplace:
'''
self.fixedParam={}
self.optimizeParam={}
self.spiceReplace=spiceReplace
self.method='diffevol'
self.ngSpiceInstance=ngSpiceInstance
if self.method=='diffevol':
self.ngSpiceInstance.parallel=False
self.cacheDir=cacheDir
def setFixedParam(self, param, keepOld=False):
'''
'''
if not keepOld: self.fixedParam={}
for k, v in param.items():
self.fixedParam[k] = v
def setOptimizerParam(self, optimizeParam, keepOld=False):
'''
:param initialGuess: {value: [ignored, min, max] }
:tpye initialGuess: dict
'''
#key: value: list; [0]: ignored [1]: min [2]: max
if not keepOld: self.optimizeParam={}
for k, v in optimizeParam.items():
self.optimizeParam[k] = v
def setCost(self, cost):
'''
'''
self.cost=cost
def optimize(self):
'''
'''
self.ngSpiceInstance.setConstants(self.fixedParam, keepOld=False)
self.spiceFileHash=self.ngSpiceInstance.getFileHash()
self.cacheDir=os.path.join(self.cacheDir, self.spiceFileHash)
if not os.path.exists(self.cacheDir):
os.mkdir(self.cacheDir)
# We need a defined sequence of parameters
self.parSequence=list(self.optimizeParam.keys())
self.costSequence=list(self.cost.keys())
# Generate a list out of the initial guess dictionary.
initialGuess=[self.optimizeParam[k][0] for k in self.parSequence]
# Generate a dummy result
# TODO: Perform a test run with spice using the initial guess to verify we get results for all parameters needed for cost calculation.
# Get boudaries for parameter space
bounds=[(self.optimizeParam[k][1],self.optimizeParam[k][2]) for k in self.parSequence]
if self.method == 'diffevol':
self.firstRun=True
default=9e10
# bestResult=differential_evolution(differentialEvolutionFunc, bounds, args=(default), maxiter=1)#, workers=-1)#, bounds, args=default, )
bestResult=differential_evolution(self._deFunc, bounds, workers=-1, mutation=(0.1,1.5), seed=1)#, )#, bounds, args=default, )
print(bestResult)
resultDict={k:v for k,v in zip(self.parSequence, bestResult['x'])}
print('Optimization result: ', resultDict)
elif self.method == 'leastsq':
default=[9e10 for k in self.cost.keys()]
# Iterate
for iter in range(10):
simulationResult={}
for iteration in range(20):
maxfev=(iteration+1)*multiprocessing.cpu_count()*2
print("maxfev: ", maxfev)
bestResult=least_squares(leastsqFunc, initialGuess, args=(simulationResult, default), max_nfev=maxfev, method='dogbox')
print('Iteration: ', iteration)
print("Best result: ", bestResult['x'])
print("Cost: ", bestResult['fun'])
spiceResult=self._runSpiceMulti(simulationResult)
simulationResult=self._calcCost(simulationResult, spiceResult)
#print(simulationResult)
initialGuess=bestResult['x']
gc.collect()
#elif self.method == 'leastsq'
def _deFunc(self, param):
'''
'''
# We use a cache to speed up multiple runs.
paramHash=hashlib.sha1(param.tobytes()).hexdigest()
cacheFile=os.path.join(self.cacheDir, paramHash)
if os.path.exists(cacheFile):
# If we have a cache hit simply load the result from the file.
with open(cacheFile, 'rb') as file:
result, cost=pickle.load(file)
else:
spiceResult=self._runSpiceSingle(param)
result=self._spiceResultToDict(spiceResult, True)
if len(result) != 1:
raise(RuntimeError(f'This is a bug. Expected 1 simulation result. Got {len(result)}'))
cost=self._calcCostSingle(result[0])
with open(cacheFile, 'wb') as file:
pickle.dump((result, cost), file)
sumCost=np.sum(cost)
print('\nResult: ', result[0], ' Cost: ', cost, ' Sum Cost: ', sumCost)
return sumCost
def _runSpiceSingle(self, param):
forSpice=[]
# Generate a dataframe we can provide to out NgSpice class as input.
# Combine the parameter names from parSequence with the values requested by least_squares.
forSpice.append({k:p for k,p in zip(self.parSequence, param)})
# Transform the dict to a dataframe
df=pd.DataFrame.from_dict(forSpice)
# Run Ngspice
spiceResult=self.ngSpiceInstance.run(df, delete=True, rename=self.spiceReplace)
return spiceResult
def _runSpiceMulti(self, simulationResult):
'''
Run the spice simulation
'''
forSpice=[]
# Generate a dataframe we can provide to out NgSpice class as input.
for par, simResult in simulationResult.items():
# If for given parameters we don't have a simulation result create an input for Spice
if simResult[1] is None:
# Combine the parameter names from parSequence with the values requested by least_squares.
forSpice.append({k:p for k,p in zip(self.parSequence, simResult[0])})
# Transform the dict to a dataframe
df=pd.DataFrame.from_dict(forSpice)
# Run Ngspice
spiceResult=self.ngSpiceInstance.run(df, delete=True, rename=self.spiceReplace)
return spiceResult
def _spiceResultToDict(self, spiceResult, ignoreMissingResult):
'''
Converts the results of a series of spice simulations to dictionaries.
:param spiceResult: Result of :py:meth:ǸgSpice.run.
:returns: A list of dictionaries: The key in the dictionary is part of self.costSequence. The value corresponds to the output of spice.
'''
# Our simulation result should match our cost functions.
# Flatten the result and convert the pandas data frames to a dictionary.
# The column names of interest are the key of the initial guess.
columnNames=self.cost.keys()
result=[]
for spiceR in spiceResult:
resultDict={}
# Fill the dictionary.
for column in columnNames:
for res in spiceR['results']:
if column in res.columns:
# Only consider unique results.
resultDict[column] = res[column].unique()
# Do sanity checks.
for column in columnNames:
# First of all we want a result for each parameter for which a cost was defined.
if not column in resultDict:
# Handle the situation in which we don't have a result.
if ignoreMissingResult:
resultDict[column] = np.NaN
else:
raise(RuntimeError(f'No spice simulation result for item {column}'))
self.firstRun=False
# Try to convert numpy arrays to float.
if type(resultDict[column]) == np.ndarray:
if len(resultDict[column]) == 1:
resultDict[column]=resultDict[column][0]
# If the have a result it has to be unique.
datatype= type(resultDict[column])
if not datatype == float and not datatype == np.float64:
raise(RuntimeError(f"Got data type {datatype}. I'm interpreting this as non-unique simulation result"))
# Add the dictionary to the result
result.append(resultDict)
return result
def _calcCost(self, simulationResult, spiceResult):
# Fill simulationResult with the results of the simulation.
for par, simResult in simulationResult.items():
# We only need to add a result if none is there.
if simResult[1] is None:
# Reconstruct the spice input parameters
forSpice={k:p for k,p in zip(self.parSequence, simResult[0])}
# Search the simulation result for the given simulation set of input parameters.
for s in spiceResult:
if forSpice == s['param']:
# Apply cost funtion
#result=s['results']
resultDict=self._spiceResultToDict(s, True)
costResult=self._calcCostSingle(resultDict)
simulationResult[par] = (simResult[0], costResult)
# Do some sanity checks. E.g. if we don't have a result, we should fail.
if simulationResult[par][1] is None:
raise(RuntimeError('Failed to find simulation results for a given set of parameters. This is a Bug.'))
return simulationResult
def _calcCostSingle(self, result):
'''
Apply the cost function to a single simulation result.
'''
costResult=[]
for costName in self.costSequence:
# Get the numeric result
resultValue=result[costName]
# Get the cost function
costFunction=eval(self.cost[costName])
# Apply the const function
# If we have a NaN reutrn a very large cost
if np.isnan(resultValue):
costResult=[1E9 for x in self.costSequence]
#if sumCost:
return costResult
else:
try:
cost=costFunction(resultValue)
except:
print("Cost calculation failed.")
print(f"Spice simulation result: {resultValue}")
print(f"Cost name: {costName}")
print(f"Cost function: ", inspect.getsource(costFunction))
raise
costResult.append(cost)
# Write the result of the cost calculation to the simulation result.
# if sumCost:
# costResult=np.sum(costResult)
return costResult
def differentialEvolutionFunc(a, b):
'''
'''
print("Function call")
def leastsqFunc(param, result, default):
'''
Funtion for least_squares. We don't run spice in here. Instead we collect input for spice and return know spice parameters.
'''
hash=str(param)
if hash in result:
res= result[hash][1]
else:
result[hash] = (param, None)
res=default
print(param,res)
if res is None:
res = default
return res