-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyGA.py
239 lines (208 loc) · 12.5 KB
/
pyGA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
'''
v1.0
26/11/2019
This code was developed by Michael Abdul-Masih and Calum Hawcroft
Instituut voor Sterrenkunde, KU Leuven, Belgium
'''
import numpy as np
import math
from collections import OrderedDict
from schwimmbad import MPIPool
import sys
if sys.version_info > (3,):
long = int
def crossover_raw(pop_pre_crossover, fitness, prob_type = 'rank'):
"""
Produces a new generation of models given a population and their associated fitness to the observed spectrum.
This works by taking two chromosomes, splicing them at random points and crossing them over to generate another two chromosomes.
The choice of which chromosomes are used is somewhat randomised but weighted on fitness, ie the best fitting models have a higher probability of
being selected to crossover with another model. We also order the models by fitness ranking, not raw fitness, this ensures an
even distribution of models.
"""
'''Must provide the raw chromosomes for the population, along with their fitness to the observed spectrum.'''
#Probability of parents being selected influenced by rank.
if prob_type == 'rank':
pop_size = len(fitness)
ind = np.argsort(fitness)[::-1]
ranks = np.array([np.where(ind == i)[0][0] for i in range(len(ind))])
probabilities = 2. * (pop_size + 1) - 2*(ranks)
probabilities = probabilities / np.sum(probabilities)
elif prob_type == 'fitness':
probabilities = fitness / np.sum(fitness)
chromosome_length = len(pop_pre_crossover[0])
indicies = range(len(pop_pre_crossover))
combinations = []
#For a list of parents, create a lsit of combined parent pairs.
for i in range(int(len(pop_pre_crossover) / 2) + 1):
parent1_ind = np.random.choice(len(probabilities), 1, p = probabilities)[0]
temp_indicies = list(indicies)
temp_indicies.pop(parent1_ind)
temp_fitnesses = fitness[temp_indicies] / np.sum(fitness[temp_indicies])
parent2_ind = np.random.choice(len(probabilities), 1, p = probabilities)[0]
combinations.append([pop_pre_crossover[parent1_ind], pop_pre_crossover[parent2_ind]])
#The chance that two parents will crossover chromosomes is 85%.
crossover_chance = np.random.rand(len(combinations), 2)
crossover_location = np.random.choice(chromosome_length, (len(combinations), 2))
new_population = []
#For each combination of parent chromosomes, if they decide to crossover,
#select random points to crossover and create a combination of child chromosomes. If they do not crossover, assign them to child
#chromosomes intact.
for i in range(len(combinations)):
if crossover_chance[i][0] <= 0.85:
if crossover_chance[i][1] <= 0.85/2:
child1 = combinations[i][0][:crossover_location[i][0]] + combinations[i][1][crossover_location[i][0]:]
child2 = combinations[i][1][:crossover_location[i][0]] + combinations[i][0][crossover_location[i][0]:]
else:
loc1 = min(crossover_location[i])
loc2 = max(crossover_location[i])
child1 = combinations[i][0][:loc1] + combinations[i][1][loc1:loc2] + combinations[i][0][loc2:]
child2 = combinations[i][1][:loc1] + combinations[i][0][loc1:loc2] + combinations[i][1][loc2:]
else:
child1 = combinations[i][0]
child2 = combinations[i][1]
new_population.append(child1)
new_population.append(child2)
return new_population[:len(pop_pre_crossover)]
#Returns a new population of chromosomes after crossover.
def crossover_and_mutate_raw(population_raw, fitness, mutation_rate):
"""
Call the crossover between models in a population to generate the offspring models/next generation.
Call the mutation to the models generated by crossover.
Call the creep mutation to fill in low magnitude mutations, populating small gaps in the parameter space.
"""
'''Must provide the raw chromosomes for the population, along with their fitness to observed spectra. Also the mutation rate.'''
crossover_pop = crossover_raw(population_raw, fitness)
mutated_pop = mutation_raw_all(crossover_pop, mutation_rate)
creep_pop = creep_mutation_raw_all(mutated_pop, mutation_rate)
return creep_pop
#Returns a population of models generated by the crossing of models in the previous generation, with mutation effects included.
def crossover_and_mutate(population_raw, fitness, precision):
"""Unused."""
pop = np.round(population_raw, precision)
pop_pre_crossover = [''.join([str(i).split('.')[-1].ljust(precision, '0') for i in j]) for j in pop]
crossover_pop = crossover_raw(pop_pre_crossover, fitness)
mutated_pop = mutation_raw_all(crossover_pop)
new_population = [[float('0.'+x[i:i+precision]) for i in range(0, len(x), precision)] for x in mutated_pop]
return np.array(new_population)
def mutation_raw_all(crossover_pop, mutation_rate = 0.25):
"""
Preform random mutations on a population generated by crossover of the previous generation.
If the chromosome is a string of numbers between 0 and 9 we just select a random number in the string and replace it with a
random number between 0 and 9. The rate at which digits are randomised is set by the mutation rate.
"""
'''Must provide the chromosomes for a population generated by crossover and a mutation rate.'''
chromosome_length = len(crossover_pop[0])
mutation_chance = np.random.rand(len(crossover_pop), chromosome_length)
mutation_replacement = np.random.choice(10, (len(crossover_pop), chromosome_length))
mutation_replacement_str = np.array(mutation_replacement, dtype='str')
cross_array = [list(i) for i in crossover_pop]
cross_array = np.array(cross_array, dtype='str')
cross_array[mutation_chance < mutation_rate] = mutation_replacement_str[mutation_chance < mutation_rate]
crossover_pop = np.array([''.join(i) for i in cross_array])
return crossover_pop
#Returns the chromosomes for a population after mutation has been applied.
def creep_mutation_raw_all(mutated_pop, mutation_rate = 0.25):
"""
The creep mutation introduces small changes to random digit in a parameter to ensure we are not leaving any gaps
in the parameter space. There is also theory that this mutation enables you to converge on the actual solution, without it the
probability of changing the high precision digits in your value is low so you may get close to the solution but then mutate away
instead of honing in.
In practise this works by choosing a random digit in a parameter and adding or subtracting 1 to that digit
eg. if the 4th digit of 0.56785 was mutated the parameter would become 0.56795 or 0.56775. The probability of this happening and which digit is affected is determined by the mutation rate.
"""
'''Must provide chromosomes for a population after crossover and random mutation, as well as a mutation rate.'''
chromosome_length = len(mutated_pop[0])
creep_pop = [i for i in mutated_pop]
creep_pop = np.array(creep_pop, dtype='str')
creep_chance = np.random.rand(len(mutated_pop), chromosome_length)
creep_adjustment = np.random.choice(2, (len(mutated_pop), chromosome_length)) * 2 - 1
for individual_number in range(len(mutated_pop)):
individual_longint = long(creep_pop[individual_number])
for locus in range(chromosome_length):
if creep_chance[individual_number][locus] <= mutation_rate:
individual_longint += long(creep_adjustment[individual_number][locus] * 10 ** (chromosome_length - locus-1))
if individual_longint >= long(10 ** (chromosome_length)): individual_longint -= long(10 ** (chromosome_length))
elif individual_longint < 0: individual_longint += long(10 ** (chromosome_length))
creep_pop[individual_number] = str(individual_longint).zfill(chromosome_length)
#creep_pop[individual_number] = str(abs(individual_longint)).zfill(chromosome_length)[-1*chromosome_length:]
return creep_pop
#Returns the chromosomes for a population after the creep mutation has been applied.
def adjust_mutation_rate(mutation_rate, fitnesses, fit_metric_min=0.05, fit_metric_max=0.25, mut_rate_min = .0005,
mut_rate_max = .25, delta=1.5):
"""
The mutation rate should vary as the run progresses to maximise the effiency of the exploration. The idea being that if the
population of models is sampling a very large parameter space the mutation rate is low, and if the models all start to converge,
therefore covering a limited region of the parameter space then the mutation rate is increased.
"""
'''Must proivde a mutation rate and a number of metrics. The details of this can be found in pikaia documentation.'''
fitness_metric = abs(np.max(fitnesses) - np.median(fitnesses)) / (np.max(fitnesses) + np.median(fitnesses))
if fitness_metric <= fit_metric_min:
mutation_rate = min(mut_rate_max, mutation_rate * delta)
elif fitness_metric >= fit_metric_max:
mutation_rate = max(mut_rate_min, mutation_rate / delta)
return mutation_rate
#Returns an adjusted mutation rate.
def create_chromosome(parameters, population_size):
"""
Create a number of chromosomes to generate a population of models.
"""
'''Must provide parameters, param_set from read_ini_file function, and the size of the population.'''
keys = list(parameters.keys())
precisions = [parameters[i].precision for i in keys]
chromosome_length = np.sum(precisions)
nucleotides = np.array(np.random.choice(10, (population_size, chromosome_length)), dtype='str')
chromosomes = [''.join(i) for i in nucleotides]
return chromosomes
#Returns a list of chromosomes used to generate the input parameters for a population.
def translate_chromosome(parameters, chromosome):
"""
Translate the input chromosome for a model into useable parameter values for the model input file.
"""
'''Must provide the parameters, param_set from read_ini_file function, and chromosomes generally named with a 'raw' suffix eg population_raw'''
keys = list(parameters.keys())
values_dict = OrderedDict()
for i in keys:
precision = parameters[i].precision
param_min = parameters[i].min
param_max = parameters[i].max
param_range = param_max - param_min
value = float('0.' + chromosome[:precision]) * param_range + param_min
order = int(math.ceil(np.log10(abs(param_max))))
value = round(value, precision - order)
chromosome = chromosome[precision:]
values_dict[parameters[i].name] = value
return values_dict
#Returns a dictionary of input parameters for a FASTWIND model.
def batch_translate_chromosomes(parameters, chromosomes, generation):
"""
Translate the input set of chromosomes into useable parameter values to population model input files.
"""
'''Must provide the parameters or param_set from read_ini_file function, takes bounds and step for each parameter. Also the chromosome list for the population,
this is a string of numbers for each model which represents the input parameters for that model. Generation number is required.
'''
dicts = []
#for each chromosome call the individual translation function which will return a list of values for each parameter as model input.
#then generate the run id for the model, should be in format: 'generation number_model number' eg 0000_0001 is the first generation, second model.
for ind, chromosome in enumerate(chromosomes):
values_dict = translate_chromosome(parameters, chromosome)
values_dict['run_id'] = str(generation).zfill(4) + '_' + str(ind).zfill(4)
dicts.append(values_dict)
return dicts
#Returns a dictionary of input parameters for a population of models.
class Parameters(OrderedDict):
"""Class object for parameters, allows for easy storage/identification of a parameter along with the bounds of the parameter
range and the step for sampling the parameter range (precision)."""
'''Parameters must be given in the form of an ordered dictionary.'''
def __init__(self):
super(Parameters, self).__init__(self)
def add(self, name, lower_bound, upper_bound, precision):
self[name] = Parameter(name, lower_bound, upper_bound, precision)
class Parameter(object):
"""Used to define a single parameter and to populate the parameters class object."""
def __init__(self, name, lower_bound, upper_bound, precision):
self.name = name
self.min = lower_bound
self.max = upper_bound
self.precision = precision