Source code for cartesian.sklearn_api

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.metrics import mean_squared_error
from sklearn.utils.validation import check_array
from sklearn.utils.validation import check_random_state

from .algorithm import oneplus
from .algorithm import optimize
from .cgp import Cartesian
from .cgp import compile
from .cgp import Constant
from .cgp import Primitive
from .cgp import PrimitiveSet
from .cgp import Symbol
from .util import replace_nan

DEFAULT_PRIMITIVES = [Primitive("add", np.add, 2), Primitive("mul", np.multiply, 2)]

def _ensure_1d(yhat, shape):
        return yhat

    except (AttributeError, TypeError, IndexError):
        return np.ones(shape) * yhat

class _Evaluate:  # ugly construct you can pickle it and use joblib
    def __init__(self, x, y, metric):
        """Wraps metric for optimization"""
        self.n_samples, *n_out = y.shape
        self.multi_output = False
        if n_out and n_out[0] > 1:
            self.multi_output = True
        self.x = x
        self.y = y
        self.metric = metric

    def error(self, f, consts=()):
        if self.multi_output:
            yhat = np.array([_ensure_1d(i, self.n_samples) for i in f(*self.x.T, *consts)]).T
            yhat = _ensure_1d(f(*self.x.T, *consts), self.n_samples)
        yhat = replace_nan(yhat)
        return self.metric(self.y, yhat)

    def __call__(self, individual):
        return optimize(self.error, individual)

[docs]class Symbolic(BaseEstimator, RegressorMixin): def __init__( self, operators=None, n_const=0, n_rows=1, n_columns=3, n_back=1, n_mutations=3, mutation_method="active", maxiter=1000, maxfev=10000, lambda_=4, f_tol=0, seeded_individual=None, random_state=None, n_jobs=1, metric=None, callback=None, ): """Wraps the 1 + lambda algorithm in sklearn api. Note: n_costs provides a convenience method to create Symbols. All constants can be directly passed via the operators. Args: operators: list of primitives n_const: number of symbolic constants n_rows: number of rows in the code block n_columns: number of columns in the code block n_back: number of rows to look back for connections n_mutations: number of mutations per offspring mutation_method: specific mutation method maxiter: maximum number of generations maxfev: maximum number of function evaluations. Important, if fun is another optimizer lambda_: number of offspring per generation f_tol: Absolute error in metric(ind) between iterations that is acceptable for convergence seeded_individual: an individual used to hot-start the optimization random_state: an instance of np.random.RandomState, an integer used as seed, or None n_jobs: number of jobs for joblib embarrassingly easy parallel metric: callable(individual), function to be optimized callback: callable(OptimizeResult), can be optionally used to monitor progress """ self.operators = operators if operators is not None else DEFAULT_PRIMITIVES self.constants = [Constant("c_{}".format(i)) for i in range(n_const)] self.n_rows = n_rows self.n_back = n_back self.n_columns = n_columns self.n_out = None self.pset = None self.res = None self.model = None # parameters for algorithm self.maxfev = maxfev self.maxiter = maxiter self.lambda_ = lambda_ self.f_tol = f_tol self.metric = metric if metric is not None else mean_squared_error self.random_state = check_random_state(random_state) self.n_jobs = n_jobs self.n_mutations = n_mutations self.mutation_method = mutation_method self.seeded_individual = seeded_individual self.callback = callback
[docs] def fit(self, x, y): """Trains the model given the regression task. Args: x (np.ndarray): input data matrix for fitting of size (number_of_input_points, number_of_features) y (np.ndarray): target data vector for fitting of size (number_of_input_points) Returns: self """ x = check_array(x) _, self.n_out = y.reshape(y.shape[0], -1).shape _, n_features = x.shape terminals = [Symbol("x_{}".format(i)) for i in range(n_features)] self.pset = PrimitiveSet.create(self.operators + terminals + self.constants) cls = Cartesian( str(hash(self)), self.pset, n_rows=self.n_rows, n_columns=self.n_columns, n_out=self.n_out, n_back=self.n_back, ) self.res = oneplus( _Evaluate(x, y, self.metric), random_state=self.random_state, cls=cls, lambda_=self.lambda_, maxiter=self.maxiter, maxfev=self.maxfev, n_mutations=self.n_mutations, mutation_method=self.mutation_method, f_tol=self.f_tol, n_jobs=self.n_jobs, seed=self.seeded_individual, callback=self.callback, ) self.model = compile(self.res.ind) return self
[docs] def predict(self, x): """Use the fitted model f to make a prediction. Args: x: input data matrix for scoring Returns: predicted target data vector """ if self.n_out > 1: yhat = np.array([_ensure_1d(i, x.shape[0]) for i in self.model(*x.T, *self.res.x)]).T else: yhat = _ensure_1d(self.model(*x.T, *self.res.x), x.shape[0]) return yhat