blob: dfbc6b32ba0ee19c6ff09fced49a30e1aaf3fa49 [file] [log] [blame]
Simon Waid69f0da62022-03-10 17:05:48 +01001from scipy.optimize import least_squares, differential_evolution
2import multiprocessing
3import pandas as pd
4from matplotlib.mlab import angle_spectrum
5import numpy as np
6import inspect
7import gc
8import time
9from multiprocessing import reduction
10import math
11import os
12import hashlib
13import pickle
14
15class NgOptimizer():
16 '''
17 Implements a circuit optimizer
18
19 '''
20 def __init__(self, ngSpiceInstance, spiceReplace, cacheDir):
21 '''
22
23 :param ngSpiceInstance:
24 :param spiceReplace:
25
26 '''
27 self.fixedParam={}
28 self.optimizeParam={}
29 self.spiceReplace=spiceReplace
30 self.method='diffevol'
31 self.ngSpiceInstance=ngSpiceInstance
32 if self.method=='diffevol':
33 self.ngSpiceInstance.parallel=False
34 self.cacheDir=cacheDir
35
36
37 def setFixedParam(self, param, keepOld=False):
38 '''
39
40 '''
41 if not keepOld: self.fixedParam={}
42
43 for k, v in param.items():
44 self.fixedParam[k] = v
45
46 def setOptimizerParam(self, optimizeParam, keepOld=False):
47 '''
48
49 :param initialGuess: {value: [ignored, min, max] }
50 :tpye initialGuess: dict
51 '''
52 #key: value: list; [0]: ignored [1]: min [2]: max
53 if not keepOld: self.optimizeParam={}
54
55 for k, v in optimizeParam.items():
56 self.optimizeParam[k] = v
57
58
59
60 def setCost(self, cost):
61 '''
62 '''
63 self.cost=cost
64
65
66 def optimize(self):
67 '''
68
69 '''
70 self.ngSpiceInstance.setConstants(self.fixedParam, keepOld=False)
71 self.spiceFileHash=self.ngSpiceInstance.getFileHash()
72 self.cacheDir=os.path.join(self.cacheDir, self.spiceFileHash)
73
74 if not os.path.exists(self.cacheDir):
75 os.mkdir(self.cacheDir)
76
77 # We need a defined sequence of parameters
78 self.parSequence=list(self.optimizeParam.keys())
79 self.costSequence=list(self.cost.keys())
80 # Generate a list out of the initial guess dictionary.
81 initialGuess=[self.optimizeParam[k][0] for k in self.parSequence]
82 # Generate a dummy result
83
84 # TODO: Perform a test run with spice using the initial guess to verify we get results for all parameters needed for cost calculation.
85
86
87 # Get boudaries for parameter space
88 bounds=[(self.optimizeParam[k][1],self.optimizeParam[k][2]) for k in self.parSequence]
89 if self.method == 'diffevol':
90 self.firstRun=True
91 default=9e10
92# bestResult=differential_evolution(differentialEvolutionFunc, bounds, args=(default), maxiter=1)#, workers=-1)#, bounds, args=default, )
93
94 bestResult=differential_evolution(self._deFunc, bounds, workers=-1, mutation=(0.1,1.5), seed=1)#, )#, bounds, args=default, )
95 print(bestResult)
96 resultDict={k:v for k,v in zip(self.parSequence, bestResult['x'])}
97 print('Optimization result: ', resultDict)
98
99 elif self.method == 'leastsq':
100 default=[9e10 for k in self.cost.keys()]
101
102 # Iterate
103 for iter in range(10):
104 simulationResult={}
105 for iteration in range(20):
106 maxfev=(iteration+1)*multiprocessing.cpu_count()*2
107
108 print("maxfev: ", maxfev)
109 bestResult=least_squares(leastsqFunc, initialGuess, args=(simulationResult, default), max_nfev=maxfev, method='dogbox')
110 print('Iteration: ', iteration)
111 print("Best result: ", bestResult['x'])
112 print("Cost: ", bestResult['fun'])
113 spiceResult=self._runSpiceMulti(simulationResult)
114 simulationResult=self._calcCost(simulationResult, spiceResult)
115 #print(simulationResult)
116
117 initialGuess=bestResult['x']
118 gc.collect()
119
120 #elif self.method == 'leastsq'
121
122
123 def _deFunc(self, param):
124 '''
125
126 '''
127
128 # We use a cache to speed up multiple runs.
129 paramHash=hashlib.sha1(param.tobytes()).hexdigest()
130
131 cacheFile=os.path.join(self.cacheDir, paramHash)
132
133
134 if os.path.exists(cacheFile):
135 # If we have a cache hit simply load the result from the file.
136 with open(cacheFile, 'rb') as file:
137 result, cost=pickle.load(file)
138 else:
139 spiceResult=self._runSpiceSingle(param)
140 result=self._spiceResultToDict(spiceResult, True)
141 if len(result) != 1:
142 raise(RuntimeError(f'This is a bug. Expected 1 simulation result. Got {len(result)}'))
143 cost=self._calcCostSingle(result[0])
144 with open(cacheFile, 'wb') as file:
145 pickle.dump((result, cost), file)
146 sumCost=np.sum(cost)
147 print('\nResult: ', result[0], ' Cost: ', cost, ' Sum Cost: ', sumCost)
148 return sumCost
149
150 def _runSpiceSingle(self, param):
151 forSpice=[]
152 # Generate a dataframe we can provide to out NgSpice class as input.
153 # Combine the parameter names from parSequence with the values requested by least_squares.
154 forSpice.append({k:p for k,p in zip(self.parSequence, param)})
155 # Transform the dict to a dataframe
156 df=pd.DataFrame.from_dict(forSpice)
157 # Run Ngspice
158 spiceResult=self.ngSpiceInstance.run(df, delete=True, rename=self.spiceReplace)
159
160 return spiceResult
161
162 def _runSpiceMulti(self, simulationResult):
163 '''
164 Run the spice simulation
165 '''
166
167 forSpice=[]
168 # Generate a dataframe we can provide to out NgSpice class as input.
169 for par, simResult in simulationResult.items():
170 # If for given parameters we don't have a simulation result create an input for Spice
171 if simResult[1] is None:
172 # Combine the parameter names from parSequence with the values requested by least_squares.
173 forSpice.append({k:p for k,p in zip(self.parSequence, simResult[0])})
174
175 # Transform the dict to a dataframe
176 df=pd.DataFrame.from_dict(forSpice)
177
178 # Run Ngspice
179 spiceResult=self.ngSpiceInstance.run(df, delete=True, rename=self.spiceReplace)
180 return spiceResult
181
182 def _spiceResultToDict(self, spiceResult, ignoreMissingResult):
183 '''
184 Converts the results of a series of spice simulations to dictionaries.
185
186 :param spiceResult: Result of :py:meth:ǸgSpice.run.
187
188 :returns: A list of dictionaries: The key in the dictionary is part of self.costSequence. The value corresponds to the output of spice.
189 '''
190 # Our simulation result should match our cost functions.
191 # Flatten the result and convert the pandas data frames to a dictionary.
192 # The column names of interest are the key of the initial guess.
193 columnNames=self.cost.keys()
194
195 result=[]
196
197 for spiceR in spiceResult:
198
199 resultDict={}
200 # Fill the dictionary.
201 for column in columnNames:
202 for res in spiceR['results']:
203 if column in res.columns:
204 # Only consider unique results.
205 resultDict[column] = res[column].unique()
206
207 # Do sanity checks.
208 for column in columnNames:
209
210 # First of all we want a result for each parameter for which a cost was defined.
211 if not column in resultDict:
212 # Handle the situation in which we don't have a result.
213 if ignoreMissingResult:
214 resultDict[column] = np.NaN
215 else:
216 raise(RuntimeError(f'No spice simulation result for item {column}'))
217 self.firstRun=False
218 # Try to convert numpy arrays to float.
219 if type(resultDict[column]) == np.ndarray:
220 if len(resultDict[column]) == 1:
221 resultDict[column]=resultDict[column][0]
222 # If the have a result it has to be unique.
223 datatype= type(resultDict[column])
224 if not datatype == float and not datatype == np.float64:
225 raise(RuntimeError(f"Got data type {datatype}. I'm interpreting this as non-unique simulation result"))
226 # Add the dictionary to the result
227 result.append(resultDict)
228
229 return result
230
231 def _calcCost(self, simulationResult, spiceResult):
232
233 # Fill simulationResult with the results of the simulation.
234 for par, simResult in simulationResult.items():
235 # We only need to add a result if none is there.
236 if simResult[1] is None:
237 # Reconstruct the spice input parameters
238 forSpice={k:p for k,p in zip(self.parSequence, simResult[0])}
239 # Search the simulation result for the given simulation set of input parameters.
240 for s in spiceResult:
241 if forSpice == s['param']:
242 # Apply cost funtion
243 #result=s['results']
244 resultDict=self._spiceResultToDict(s, True)
245
246 costResult=self._calcCostSingle(resultDict)
247
248 simulationResult[par] = (simResult[0], costResult)
249 # Do some sanity checks. E.g. if we don't have a result, we should fail.
250 if simulationResult[par][1] is None:
251 raise(RuntimeError('Failed to find simulation results for a given set of parameters. This is a Bug.'))
252 return simulationResult
253
254 def _calcCostSingle(self, result):
255 '''
256 Apply the cost function to a single simulation result.
257 '''
258 costResult=[]
259
260 for costName in self.costSequence:
261 # Get the numeric result
262 resultValue=result[costName]
263 # Get the cost function
264 costFunction=eval(self.cost[costName])
265 # Apply the const function
266 # If we have a NaN reutrn a very large cost
267 if np.isnan(resultValue):
268 costResult=[1E9 for x in self.costSequence]
269 #if sumCost:
270 return costResult
271 else:
272 try:
273 cost=costFunction(resultValue)
274 except:
275 print("Cost calculation failed.")
276 print(f"Spice simulation result: {resultValue}")
277 print(f"Cost name: {costName}")
278 print(f"Cost function: ", inspect.getsource(costFunction))
279 raise
280 costResult.append(cost)
281 # Write the result of the cost calculation to the simulation result.
282# if sumCost:
283 # costResult=np.sum(costResult)
284
285 return costResult
286
287def differentialEvolutionFunc(a, b):
288 '''
289
290 '''
291 print("Function call")
292
293
294def leastsqFunc(param, result, default):
295 '''
296 Funtion for least_squares. We don't run spice in here. Instead we collect input for spice and return know spice parameters.
297 '''
298
299 hash=str(param)
300
301 if hash in result:
302 res= result[hash][1]
303 else:
304 result[hash] = (param, None)
305 res=default
306 print(param,res)
307 if res is None:
308 res = default
309 return res
310