Source code for cartesian.cgp

import copy
import itertools
import re
import sys
from operator import attrgetter

from dataclasses import dataclass
from sklearn.base import TransformerMixin
from sklearn.utils.validation import check_random_state

from cartesian.util import make_it


[docs]class Primitive(object): def __init__(self, name, function, arity): """Basic build block for cartesian programs. :param name: for text representation :param function: :param arity: """ self.name = name self.function = function self.arity = arity
[docs]class Symbol(Primitive): def __init__(self, name): """Base class for variables. Will always be used in the boilerplate ensuring a uniform signature. Even if variable is not used in the genotype. Args: name: name of the primitive """ self.name = name self.arity = 0
[docs]class Constant(Symbol): """Base class for symbolic constants. Will be used for constant optimization. Boilerplate: will only appear when used. """
[docs]class Ephemeral(Primitive): def __init__(self, name, fun): """Base class for ERC's. ERC's are terminals, but they are implemented as zero arity functions, as they do not need to appear in the argument list of the lambda expression. Note: Compilation behaviour: Each individual has a dict to store its numeric values. Each position in the code block will only execute the function once. Values are lost during copying. Args: name: for text representation fun: callback, should return a random numeric values. """ super().__init__(name, fun, 0)
[docs]class Structural(Primitive): def __init__(self, name, function, arity): """Structural constants are operators which take the graph representation of its arguments and convert it to a numeric value. Args: name: for text representation function: arity: """ self.name = name self.function = function self.arity = arity
[docs]@dataclass class PrimitiveSet: """A container holding the primitives and pre-compiled helper attributes. Args: operators: all non-terminal primitives (arity > 0) terminals: all terminals max_arity: maximum arity of all terminals. Determines the number of links for each register mapping: sorted and indexed list of the primitive set imapping: inverse of mapping context: links names of primitives to their functions symbols: all sybolic constants """ operators: list terminals: list mapping: dict imapping: dict context: dict symbols: list max_arity: int
[docs] @classmethod def create(cls, primitives): """Create immutable PrimitiveSet with some attributes for quick lookups""" terminals = [p for p in primitives if p.arity == 0] symbols = [p for p in primitives if isinstance(p, Symbol)] non_symbols = [p for p in terminals if not isinstance(p, Symbol)] operators = [p for p in primitives if p.arity > 0] if operators: max_arity = max(operators, key=attrgetter("arity")).arity else: max_arity = 0 mapping = { i: prim for i, prim in enumerate( sorted(symbols, key=attrgetter("name")) + sorted(non_symbols, key=attrgetter("name")) + sorted(operators, key=attrgetter("name")) ) } imapping = {v: k for k, v in mapping.items()} context = {f.name: f.function for f in operators} return cls( operators=operators, terminals=terminals, imapping=imapping, max_arity=max_arity, mapping=mapping, context=context, symbols=symbols, )
def _make_map(*lists): i = 0 for c, l in enumerate(lists): for r, el in enumerate(l): yield i, el, c, r, l i += 1 def _code_index(n_in, n_row, c, r): return n_in + c * n_row + r def _out_index(n_rows, n_columns, n_in, o): return n_rows * n_columns + n_in + o def _get_valid_inputs(n_rows, n_columns, n_back, n_inputs, n_out): """dict of valid input genes for each gene in genom. Genes in the same column are not allowed to be connected to each other. """ inputs = {i: [] for i in range(n_inputs)} for c in range(n_columns): first_index_this_column = c * n_rows + n_inputs min_ = max(min(first_index_this_column - n_rows * n_back, n_inputs), 0) max_ = first_index_this_column for r in range(n_rows): i = _code_index(n_inputs, n_rows, c, r) inputs[i] = list(range(min_, max_)) + list(range(n_inputs)) min_ = min(max(0, (n_columns - n_back - 1) * n_rows), n_inputs) max_ = max(inputs) for o in range(n_out): inputs[o + max_ + 1] = list(range(min_, max_ + 1)) + list(range(n_inputs)) for k, v in inputs.items(): inputs[k] = list(set(v)) return inputs
[docs]class Base(TransformerMixin): def __init__(self, code, outputs): self.n_inputs = len(self.pset.terminals) self.inputs = list(range(self.n_inputs)) self.symbols = self.inputs[: len(self.pset.symbols)] self.code = code self.outputs = outputs # Primitives are allowed to write their name values for storage self.memory = {} self.__out_idx = None @property def mapping(self): """Helper dictionary to index the cartesian registers.""" return {i: (el, c, r, l) for i, el, c, r, l in _make_map(self.inputs, *self.code, self.outputs)} @property def _out_idx(self): """Returns the indices of the output genes in self""" if self.__out_idx is None: self.__out_idx = [ _out_index(self.n_rows, self.n_columns, self.n_inputs, o) for o in range(self.n_out) ] return self.__out_idx def _decode_code_gene(self, idx): gene = make_it(self[idx]) primitive = self.pset.mapping[next(gene)] return primitive, gene def _iter_subgraph(self, idx): if idx in self._out_idx: yield idx, None yield from self._iter_subgraph(self[idx]) else: primitive, args = self._decode_code_gene(idx) yield idx, primitive if args: for a, _ in zip(args, range(primitive.arity)): yield from self._iter_subgraph(a) def _map_reduce_subgraph(self, idx, f=lambda *x: x, agg=list): return agg((f(*ip) for ip in self._iter_subgraph(idx)))
[docs] def len_subgraph(self, idx): """Compute the length of the subgraph with head-node a idx.""" return self._map_reduce_subgraph(idx, agg=lambda x: len(list(x)))
@property def active_genes(self): """Computes the set of active gene in an individual.""" f = lambda i, p: i active = set.union(*[self._map_reduce_subgraph(o, f=f, agg=set) for o in self._out_idx]) return active - set(self.inputs) def __getitem__(self, index): return self.mapping[index][0] def __setitem__(self, index, item): # element, column, row, gene *_, row, gene = self.mapping[index] gene[row] = item def __len__(self): """Returns the number of genes in self.""" return self.n_columns * self.n_rows + self.n_out + self.n_inputs def __str__(self): return "\n".join(to_polish(self, return_args=False)) def __repr__(self): # pragma: no cover return f"{type(self)}\n\tInputs: {self.inputs}\n\tOutputs: {self.outputs}\n\tCode: {self.code}" def __getstate__(self): # for compatibility with vanilla pickle protocol state = dict(self.__dict__) try: del state["_transform"] except KeyError: pass return state def __copy__(self): """Save copy, discard memory to refresh random constants""" return type(self)([c[:] for c in self.code], self.outputs[:])
[docs] def clone(self): """Save copy, discard memory to refresh random constants""" return copy.copy(self)
[docs] @staticmethod def format(x): # pragma: no cover return "{}".format(x)
[docs] def fit(self, x, y=None, **fit_params): self._transform = compile(self) self.fit_params = fit_params return self
[docs] def transform(self, x, y=None): return self._transform(*x.T)
[docs] @classmethod def create(cls, random_state=None): """Creates a new individual. Each gene is picked with a uniform distribution from all allowed inputs or functions. Args: random_state: an instance of np.random.RandomState, a seed integer or None Returns: a new (random) individual """ random_state = check_random_state(random_state) n_in = len(cls.pset.terminals) operator_keys = list(range(n_in, max(cls.pset.mapping) + 1)) code = [] for i in range(cls.n_columns): column = [] for j in range(cls.n_rows): index = _code_index(n_in, cls.n_rows, i, j) in_ = cls._valid_inputs[index] gene = [random_state.choice(operator_keys)] + [ random_state.choice(in_) for _ in range(cls.pset.max_arity) ] column.append(gene) code.append(column) outputs = [ random_state.choice(cls._valid_inputs[_out_index(cls.n_rows, cls.n_columns, n_in, o)]) for o in range(cls.n_out) ] return cls(code, outputs)
[docs]class Cartesian(type): """Meta class to set class parameters and primitive set.""" def __new__(mcs, name, primitive_set, n_columns=3, n_rows=1, n_back=1, n_out=1): valid_inputs = _get_valid_inputs(n_rows, n_columns, n_back, len(primitive_set.terminals), n_out) dct = dict( pset=primitive_set, n_columns=n_columns, n_rows=n_rows, n_back=n_back, n_out=n_out, _valid_inputs=valid_inputs, ) cls = super().__new__(mcs, name, (Base,), dct) setattr(sys.modules[__name__], name, cls) return cls def __init__(cls, name, primitive_set, n_columns=3, n_rows=1, n_back=1, n_out=1): valid_inputs = _get_valid_inputs(n_rows, n_columns, n_back, len(primitive_set.terminals), n_out) dct = dict( pset=primitive_set, n_columns=n_columns, n_rows=n_rows, n_back=n_back, n_out=n_out, _valid_inputs=valid_inputs, ) super().__init__(name, (Base,), dct)
def _point_mutation(individual, idx, random_state): """Apply a point mutation on individual at position idx Args: individual: instance of Base idx: index random_state: an instance of np.random.RandomState Returns: mutated individual """ el, c, r, lst = individual.mapping[idx] gene = lst[r] if isinstance(gene, list): new_gene = gene[:] j = random_state.randint(0, len(gene)) if j == 0: # function choices = individual.pset.operators[:] if len(choices) > 1: # don't allow the same function again prim = individual.pset.mapping[gene[j]] choices.pop(choices.index(prim)) new_j = individual.pset.imapping[random_state.choice(choices)] else: # input valid_inputs = individual._valid_inputs[idx][:] if len(valid_inputs) > 1: # don't allow the same inputs again valid_inputs.pop(valid_inputs.index(gene[j])) new_j = random_state.choice(valid_inputs) new_gene[j] = new_j else: # output gene new_gene = random_state.randint(0, len(individual) - individual.n_out - 1) new_individual = individual.clone() new_individual[idx] = new_gene return new_individual
[docs]def mutate(individual, n_mutations=1, method="active", random_state=None): """Create offsprings by mutating an individual. Args: individual: instance of Base n_mutations: number of mutations method: specific mutation method random_state: an instance of np.random.RandomState, a seed integer or None Returns: mutated individual """ random_state = check_random_state(random_state) mutate_ = active_gene_mutation if method == "active" else point_mutation offspring = individual.clone() for _ in range(n_mutations): offspring = mutate_(offspring, random_state=random_state) return offspring
[docs]def active_gene_mutation(individual, random_state=None): """Picks an active gene and Args: individual: instance of Base random_state: an instance of np.random.RandomState, a seed integer or None Returns: mutated individual """ random_state = check_random_state(random_state) i = random_state.choice(list(individual.active_genes), 1)[0] return _point_mutation(individual, i, random_state)
[docs]def point_mutation(individual, random_state=None): """Picks a gene at random and mutates it. The mutation is either rewiring, i.e. changing the inputs, or changing the operator (head of gene). Args: individual: instance of Base random_state: an instance of np.random.RandomState, a seed integer or None Returns: mutated individual """ random_state = check_random_state(random_state) n_terminals = len(individual.pset.terminals) i = random_state.randint(n_terminals, len(individual) - 1) return _point_mutation(individual, i, random_state)
[docs]def to_polish(c, return_args=True): """Generates the polish notation of expression encoded by c. Resolves the outputs recursively. Note: Function has side-effects on the individual c. See Symbols for details. Args: c: instance of base return_args: optionally return the used arguments too Returns: polish notation of expression encoded by c """ primitives = c.pset.mapping used_arguments = set() def h(g): gene = make_it(c[g]) primitive = primitives[next(gene)] # refactor to primitive.format() ? side-effects? if primitive.arity == 0: if isinstance(primitive, Symbol): used_arguments.add(primitive) elif isinstance(primitive, Ephemeral): if g not in c.memory: c.memory[g] = c.format(primitive.function()) return c.memory[g] if isinstance(primitive, Symbol): return primitive.name else: return "{}()".format(primitive.name) elif isinstance(primitive, Structural): inputs = [c.len_subgraph(a) for a, _ in zip(gene, range(primitive.arity))] fmt = c.format(primitive.function(*inputs)) return fmt else: return "{}({})".format( primitive.name, ", ".join(h(a) for a, _ in zip(gene, range(primitive.arity))) ) polish = [h(o) for o in c.outputs] if return_args: return polish, used_arguments else: return polish
def _boilerplate(c, used_arguments=()): """Generates the overhead needed to compile the polish notation. If used_arguments are provided, the boilerplate will only include the constants which are used as well as all variables. Args: c: instance of Base used_arguments: list of terminals actually used in c. Returns: overhead needed to compile the polish notation. """ mapping = c.pset.mapping if used_arguments: index = sorted([k for (k, v) in mapping.items() if v in used_arguments]) args = [mapping[i] for i in index] else: args = [mapping[i] for i in c.inputs] args_ = [a for a in args if isinstance(a, Symbol) and not isinstance(a, Constant)] args_ += [a for a in args if isinstance(a, Constant)] return "lambda {}:".format(", ".join(a.name for a in args_))
[docs]def compile(c): """Transform an individual into a lambda function Args: c: instance of Base Returns: lambda function """ polish, args = to_polish(c, return_args=True) for t in c.pset.symbols: if not isinstance(t, Constant): args.add(t) bp = _boilerplate(c, used_arguments=args) code = "({})".format(", ".join(polish)) if len(polish) > 1 else polish[0] return eval(bp + code, c.pset.context)