Source code for pylmnn.lmnn

# coding: utf-8
"""
Large Margin Nearest Neighbor Classification
"""

# Author: John Chiotellis <johnyc.code@gmail.com>
# License: BSD 3 clause

from __future__ import print_function
from warnings import warn

import sys
import time
import numpy as np
from scipy.optimize import minimize
from scipy.sparse import csr_matrix, csc_matrix, coo_matrix

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.neighbors import NearestNeighbors, KNeighborsClassifier
from sklearn.decomposition import PCA
from sklearn.utils import gen_batches
from sklearn.utils.extmath import row_norms, safe_sparse_dot
from sklearn.utils.random import check_random_state
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import check_is_fitted, check_array, check_X_y
from sklearn.externals.six import integer_types, string_types
from sklearn.exceptions import ConvergenceWarning

from .utils import _euclidean_distances_without_checks


[docs]class LargeMarginNearestNeighbor(BaseEstimator, TransformerMixin): """Distance metric learning for large margin classification. Parameters ---------- n_neighbors : int, optional (default=3) Number of neighbors to use as target neighbors for each sample. n_components : int, optional (default=None) Preferred dimensionality of the embedding. If None it is inferred from ``init``. init : string or numpy array, optional (default='pca') Initialization of the linear transformation. Possible options are 'pca', 'identity' and a numpy array of shape (n_features_a, n_features_b). pca: ``n_components`` many principal components of the inputs passed to :meth:`fit` will be used to initialize the transformation. identity: If ``n_components`` is strictly smaller than the dimensionality of the inputs passed to :meth:`fit`, the identity matrix will be truncated to the first ``n_components`` rows. numpy array: n_features_b must match the dimensionality of the inputs passed to :meth:`fit` and n_features_a must be less than or equal to that. If ``n_components`` is not None, n_features_a must match it. warm_start : bool, optional, (default=False) If True and :meth:`fit` has been called before, the solution of the previous call to :meth:`fit` is used as the initial linear transformation (``n_components`` and ``init`` will be ignored). max_impostors : int, optional (default=500000) Maximum number of impostors to consider per iteration. In the worst case this will allow ``max_impostors * n_neighbors`` constraints to be active. neighbors_params : dict, optional (default=None) Parameters to pass to a :class:`neighbors.NearestNeighbors` instance - apart from ``n_neighbors`` - that will be used to select the target neighbors. weight_push_loss : float, optional (default=0.5) A float in (0, 1], weighting the push loss. This is parameter ``μ`` in the journal paper (See references below). In practice, the objective function will be normalized so that the push loss has weight 1 and hence the pull loss has weight ``(1 - μ)/μ``. impostor_store : str ['auto'|'list'|'sparse'], optional list : Three lists will be used to store the indices of reference samples, the indices of their impostors and the (squared) distances between the (sample, impostor) pairs. sparse : A sparse indicator matrix will be used to store the (sample, impostor) pairs. The (squared) distances to the impostors will be computed twice (once to determine the impostors and once to be stored), but this option tends to be faster than 'list' as the size of the data set increases. auto : Will attempt to decide the most appropriate choice of data structure based on the values passed to :meth:`fit`. max_iter : int, optional (default=50) Maximum number of iterations in the optimization. tol : float, optional (default=1e-5) Convergence tolerance for the optimization. callback : callable, optional (default=None) If not None, this function is called after every iteration of the optimizer, taking as arguments the current solution (transformation) and the number of iterations. This might be useful in case one wants to examine or store the transformation found after each iteration. store_opt_result : bool, optional (default=False) If True, the :class:`scipy.optimize.OptimizeResult` object returned by :meth:`minimize` of `scipy.optimize` will be stored as attribute ``opt_result_``. verbose : int, optional (default=0) If 0, no progress messages will be printed. If 1, progress messages will be printed to stdout. If > 1, progress messages will be printed and the ``iprint`` parameter of :meth:`_minimize_lbfgsb` of `scipy.optimize` will be set to ``verbose - 2``. random_state : int or numpy.RandomState or None, optional (default=None) A pseudo random number generator object or a seed for it if int. n_jobs : int, optional (default=1) The number of parallel jobs to run for neighbors search. If ``-1``, then the number of jobs is set to the number of CPU cores. Doesn't affect :meth:`fit` method. Attributes ---------- components_ : array, shape (n_components, n_features) The linear transformation learned during fitting. n_neighbors_ : int The provided ``n_neighbors`` is decreased if it is greater than or equal to min(number of elements in each class). n_iter_ : int Counts the number of iterations performed by the optimizer. opt_result_ : scipy.optimize.OptimizeResult (optional) A dictionary of information representing the optimization result. This is stored only if ``store_opt_result`` is True. It contains the following attributes: x : ndarray The solution of the optimization. success : bool Whether or not the optimizer exited successfully. status : int Termination status of the optimizer. message : str Description of the cause of the termination. fun, jac : ndarray Values of objective function and its Jacobian. hess_inv : scipy.sparse.linalg.LinearOperator the product of a vector with the approximate inverse of the Hessian of the objective function.. nfev : int Number of evaluations of the objective function.. nit : int Number of iterations performed by the optimizer. Examples -------- >>> from pylmnn import LargeMarginNearestNeighbor >>> from sklearn.neighbors import KNeighborsClassifier >>> from sklearn.datasets import load_iris >>> from sklearn.model_selection import train_test_split >>> X, y = load_iris(return_X_y=True) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, ... stratify=y, test_size=0.7, random_state=42) >>> lmnn = LargeMarginNearestNeighbor(n_neighbors=3, random_state=42) >>> lmnn.fit(X_train, y_train) # doctest: +ELLIPSIS LargeMarginNearestNeighbor(...) >>> # Fit and evaluate a simple nearest neighbor classifier for comparison >>> knn = KNeighborsClassifier(n_neighbors=3) >>> knn.fit(X_train, y_train) # doctest: +ELLIPSIS KNeighborsClassifier(...) >>> print(knn.score(X_test, y_test)) 0.933333333333 >>> # Now fit on the data transformed by the learned transformation >>> knn.fit(lmnn.transform(X_train), y_train) # doctest: +ELLIPSIS KNeighborsClassifier(...) >>> print(knn.score(lmnn.transform(X_test), y_test)) 0.971428571429 .. warning:: Exact floating-point reproducibility is generally not guaranteed (unless special care is taken with library and compiler options). As a consequence, the transformations computed in 2 identical runs of LargeMarginNearestNeighbor can differ from each other. This can happen even before the optimizer is called if initialization with PCA is used (init='pca'). References ---------- .. [1] Weinberger, Kilian Q., and Lawrence K. Saul. "Distance Metric Learning for Large Margin Nearest Neighbor Classification." Journal of Machine Learning Research, Vol. 10, Feb. 2009, pp. 207-244. http://jmlr.csail.mit.edu/papers/volume10/weinberger09a/weinberger09a.pdf .. [2] Wikipedia entry on Large Margin Nearest Neighbor https://en.wikipedia.org/wiki/Large_margin_nearest_neighbor """ def __init__(self, n_neighbors=3, n_components=None, init='pca', warm_start=False, max_impostors=500000, neighbors_params=None, weight_push_loss=0.5, impostor_store='auto', max_iter=50, tol=1e-5, callback=None, store_opt_result=False, verbose=0, random_state=None, n_jobs=1): # Parameters self.n_neighbors = n_neighbors self.n_components = n_components self.init = init self.warm_start = warm_start self.max_impostors = max_impostors self.neighbors_params = neighbors_params self.weight_push_loss = weight_push_loss self.impostor_store = impostor_store self.max_iter = max_iter self.tol = tol self.callback = callback self.store_opt_result = store_opt_result self.verbose = verbose self.random_state = random_state self.n_jobs = n_jobs
[docs] def fit(self, X, y): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape (n_samples, n_features) The training samples. y : array-like, shape (n_samples,) The corresponding training labels. Returns ------- self : object returns a trained LargeMarginNearestNeighbor model. """ # Validate the inputs X, y = check_X_y(X, y, ensure_min_samples=2) check_classification_targets(y) # Check that the inputs are consistent with the parameters X_valid, y_valid, classes, init = self._validate_params(X, y) # Initialize the random generator self.random_state_ = check_random_state(self.random_state) # Measure the total training time t_train = time.time() # Initialize the linear transformation transformation = self._initialize(X_valid, init) # Find the target neighbors target_neighbors = self._select_target_neighbors_wrapper( X_valid, y_valid, classes) # Compute the gradient part contributed by the target neighbors grad_static = self._compute_grad_static(X_valid, target_neighbors) # Compute the pull loss coefficient pull_loss_coef = (1. - self.weight_push_loss) / self.weight_push_loss grad_static *= pull_loss_coef # Decide how to store the impostors if self.impostor_store == 'sparse': use_sparse = True elif self.impostor_store == 'list': use_sparse = False else: # auto: Use a heuristic based on the data set size use_sparse = X_valid.shape[0] > 6500 # Create a dictionary of parameters to be passed to the optimizer disp = self.verbose - 2 if self.verbose > 1 else -1 optimizer_params = {'method': 'L-BFGS-B', 'fun': self._loss_grad_lbfgs, 'jac': True, 'args': (X_valid, y_valid, classes, target_neighbors, grad_static, use_sparse), 'x0': transformation, 'tol': self.tol, 'options': dict(maxiter=self.max_iter, disp=disp), 'callback': self._callback } # Call the optimizer self.n_iter_ = 0 opt_result = minimize(**optimizer_params) # Reshape the solution found by the optimizer self.components_ = opt_result.x.reshape(-1, X_valid.shape[1]) # Stop timer t_train = time.time() - t_train if self.verbose: cls_name = self.__class__.__name__ # Warn the user if the algorithm did not converge if not opt_result.success: warn('[{}] LMNN did not converge: {}'.format( cls_name, opt_result.message), ConvergenceWarning) print('[{}] Training took {:8.2f}s.'.format(cls_name, t_train)) # Optionally store information returned by the optimizer if self.store_opt_result: self.opt_result_ = opt_result return self
[docs] def transform(self, X): """Applies the learned transformation to the given data. Parameters ---------- X : array-like, shape (n_samples, n_features) Data samples. Returns ------- X_embedded: array, shape (n_samples, n_components) The data samples transformed. Raises ------ NotFittedError If :meth:`fit` has not been called before. """ check_is_fitted(self, ['components_']) X = check_array(X) return np.dot(X, self.components_.T)
def _transform_without_checks(self, X): """Same as transform but without validating the inputs. Parameters ---------- X : array, shape (n_samples, n_features) Data samples. Returns ------- X_embedded: array, shape (n_samples, n_components) The data samples transformed. """ return np.dot(X, self.components_.T) def _validate_params(self, X, y): """Validate parameters as soon as :meth:`fit` is called. Parameters ---------- X : array-like, shape (n_samples, n_features) The training samples. y : array-like, shape (n_samples,) The corresponding training labels. Returns ------- X : array, shape (n_samples, n_features) The validated training samples. y_inverse : array, shape (n_samples,) The validated training labels, encoded to be integers in the range(0, n_classes). classes_inverse_non_singleton : array, shape (n_classes_non_singleton,) The non-singleton classes, encoded as integers in [0, n_classes). init : string or numpy array of shape (n_features_a, n_features_b) The validated initialization of the linear transformation. Raises ------- TypeError If a parameter is not an instance of the desired type. ValueError If a parameter's value violates its legal value range or if the combination of two or more given parameters is incompatible. """ # Find the appearing classes and the class index for each sample classes, y_inverse = np.unique(y, return_inverse=True) classes_inverse = np.arange(len(classes)) # Ignore classes that have less than 2 samples (singleton classes) class_sizes = np.bincount(y_inverse) mask_singleton_class = class_sizes == 1 singleton_classes, = np.where(mask_singleton_class) if len(singleton_classes): warn('There are {} singleton classes that will be ignored during ' 'training. A copy of the inputs `X` and `y` will be made.' .format(len(singleton_classes))) mask_singleton_sample = np.asarray([yi in singleton_classes for yi in y_inverse]) X = X[~mask_singleton_sample].copy() y_inverse = y_inverse[~mask_singleton_sample].copy() # Check that there are at least 2 non-singleton classes n_classes_non_singleton = len(classes) - len(singleton_classes) if n_classes_non_singleton < 2: raise ValueError('LargeMarginNearestNeighbor needs at least 2 ' 'non-singleton classes, got {}.' .format(n_classes_non_singleton)) classes_inverse_non_singleton = classes_inverse[~mask_singleton_class] # Check the preferred embedding dimensionality if self.n_components is not None: _check_scalar(self.n_components, 'n_components', integer_types, 1) if self.n_components > X.shape[1]: raise ValueError('The preferred embedding dimensionality ' '`n_components` ({}) cannot be greater ' 'than the given data dimensionality ({})!' .format(self.n_components, X.shape[1])) # If warm_start is enabled, check that the inputs are consistent _check_scalar(self.warm_start, 'warm_start', bool) if self.warm_start and hasattr(self, 'components_'): if self.components_.shape[1] != X.shape[1]: raise ValueError('The new inputs dimensionality ({}) does not ' 'match the input dimensionality of the ' 'previously learned transformation ({}).' .format(X.shape[1], self.components_.shape[1])) _check_scalar(self.n_neighbors, 'n_neighbors', integer_types, 1, X.shape[0] - 1) _check_scalar(self.max_iter, 'max_iter', integer_types, 1) _check_scalar(self.tol, 'tol', float, 0.) _check_scalar(self.weight_push_loss, 'weight_push_loss', float, 0., 1.) if self.weight_push_loss == 0: raise ValueError('`weight_push_loss` cannot be zero.') _check_scalar(self.max_impostors, 'max_impostors', integer_types, 1) _check_scalar(self.impostor_store, 'impostor_store', string_types) _check_scalar(self.n_jobs, 'n_jobs', integer_types) _check_scalar(self.verbose, 'verbose', integer_types, 0) if self.impostor_store not in ['auto', 'sparse', 'list']: raise ValueError("`impostor_store` must be 'auto', 'sparse' or " "'list'.") if self.callback is not None: if not callable(self.callback): raise ValueError('`callback` is not callable.') # Check how the linear transformation should be initialized init = self.init if isinstance(init, np.ndarray): init = check_array(init) # Assert that init.shape[1] = X.shape[1] if init.shape[1] != X.shape[1]: raise ValueError('The input dimensionality ({}) of the given ' 'linear transformation `init` must match the ' 'dimensionality of the given inputs `X` ({}).' .format(init.shape[1], X.shape[1])) # Assert that init.shape[0] <= init.shape[1] if init.shape[0] > init.shape[1]: raise ValueError('The output dimensionality ({}) of the given ' 'linear transformation `init` cannot be ' 'greater than its input dimensionality ({}).' .format(init.shape[0], init.shape[1])) if self.n_components is not None: # Assert that self.n_components = init.shape[0] if self.n_components != init.shape[0]: raise ValueError('The preferred embedding dimensionality ' '`n_components` ({}) does not match ' 'the output dimensionality of the given ' 'linear transformation `init` ({})!' .format(self.n_components, init.shape[0])) elif init in ['pca', 'identity']: pass else: raise ValueError("`init` must be 'pca', 'identity', or a numpy " "array of shape (n_components, n_features).") # Check the preferred number of neighbors min_non_singleton_size = class_sizes[~mask_singleton_class].min() if self.n_neighbors >= min_non_singleton_size: warn('`n_neighbors` (={}) is not less than the number of ' 'samples in the smallest non-singleton class (={}). ' '`n_neighbors_` will be set to {} for estimation.' .format(self.n_neighbors, min_non_singleton_size, min_non_singleton_size - 1)) self.n_neighbors_ = min(self.n_neighbors, min_non_singleton_size - 1) neighbors_params = self.neighbors_params if neighbors_params is not None: _check_scalar(neighbors_params, 'neighbors_params', dict) neighbors_params.setdefault('n_jobs', self.n_jobs) # Attempt to instantiate a NearestNeighbors instance here to # raise any errors before actually fitting NearestNeighbors(n_neighbors=self.n_neighbors_, **neighbors_params) return X, y_inverse, classes_inverse_non_singleton, init def _initialize(self, X, init): """ Parameters ---------- X : array, shape (n_samples, n_features) The training samples. init : string or numpy array of shape (n_features_a, n_features) The initialization of the linear transformation. Returns ------- transformation : array, shape (n_components, n_features) The initialized linear transformation. """ transformation = init if self.warm_start and hasattr(self, 'components_'): transformation = self.components_ elif isinstance(init, np.ndarray): pass elif init == 'pca': pca = PCA(n_components=self.n_components, random_state=self.random_state_) t_pca = time.time() if self.verbose: print('[{}] Finding principal components...'.format( self.__class__.__name__)) sys.stdout.flush() pca.fit(X) if self.verbose: t_pca = time.time() - t_pca print('[{}] Found principal components in {:5.2f}s.'.format( self.__class__.__name__, t_pca)) transformation = pca.components_ elif init == 'identity': if self.n_components is None: transformation = np.eye(X.shape[1]) else: transformation = np.eye(self.n_components, X.shape[1]) return transformation def _select_target_neighbors_wrapper(self, X, y, classes=None): """Find the target neighbors of each data sample. Parameters ---------- X : array, shape (n_samples, n_features) The training samples. y : array, shape (n_samples,) The corresponding training labels indices. classes : array, shape (n_classes,), optional (default=None) The non-singleton classes, encoded as integers in [0, n_classes). If None (default), they will be inferred from ``y``. Returns ------- target_neighbors: array, shape (n_samples, n_neighbors) An array of neighbors indices for each sample. """ t_start = time.time() if self.verbose: print('[{}] Finding the target neighbors...'.format( self.__class__.__name__)) sys.stdout.flush() neighbors_params = self.neighbors_params if neighbors_params is None: neighbors_params = {} neighbors_params.setdefault('n_jobs', self.n_jobs) target_neighbors = _select_target_neighbors( X, y, self.n_neighbors_, classes=classes, **neighbors_params) if self.verbose: print('[{}] Found the target neighbors in {:5.2f}s.'.format( self.__class__.__name__, time.time() - t_start)) return target_neighbors def _compute_grad_static(self, X, target_neighbors): """Compute the gradient contributed by the target neighbors. Parameters ---------- X : array, shape (n_samples, n_features) The training samples. target_neighbors : array, shape (n_samples, n_neighbors) The k nearest neighbors of each sample from the same class. Returns ------- grad_target_neighbors, shape (n_features, n_features) An array with the sum of all outer products of (sample, target_neighbor) pairs. """ t_grad_static = time.time() if self.verbose: print('[{}] Computing static part of the gradient...'.format( self.__class__.__name__)) n_samples, n_neighbors = target_neighbors.shape row = np.repeat(range(n_samples), n_neighbors) col = target_neighbors.ravel() tn_graph = csr_matrix((np.ones(target_neighbors.size), (row, col)), shape=(n_samples, n_samples)) grad_target_neighbors = _sum_weighted_outer_differences(X, tn_graph) if self.verbose: t_grad_static = time.time() - t_grad_static print('[{}] Computed static part of the gradient in {:5.2f}s.' .format(self.__class__.__name__, t_grad_static)) return grad_target_neighbors def _callback(self, transformation): """Called after each iteration of the optimizer. Parameters ---------- transformation : array, shape(n_components, n_features) The solution computed by the optimizer in this iteration. """ if self.callback is not None: self.callback(transformation, self.n_iter_) self.n_iter_ += 1 def _loss_grad_lbfgs(self, transformation, X, y, classes, target_neighbors, grad_static, use_sparse): """Compute the loss and the loss gradient w.r.t. ``transformation``. Parameters ---------- transformation : array, shape (n_components * n_features,) The current (flattened) linear transformation. X : array, shape (n_samples, n_features) The training samples. y : array, shape (n_samples,) The corresponding training labels. classes : array, shape (n_classes,) The non-singleton classes, encoded as integers in [0, n_classes). target_neighbors : array, shape (n_samples, n_neighbors) The target neighbors of each sample. grad_static : array, shape (n_features, n_features) The (weighted) gradient component caused by target neighbors, that stays fixed throughout the algorithm. use_sparse : bool Whether to use a sparse matrix to store the impostors. Returns ------- loss: float The loss based on the given transformation. grad: array, shape (n_components * n_features,) The new (flattened) gradient of the loss. """ n_samples, n_features = X.shape transformation = transformation.reshape(-1, n_features) self.components_ = transformation if self.n_iter_ == 0: self.n_iter_ += 1 if self.verbose: header_fields = ['Iteration', 'Objective Value', '#Active Triplets', 'Time(s)'] header_fmt = '{:>10} {:>20} {:>20} {:>10}' header = header_fmt.format(*header_fields) cls_name = self.__class__.__name__ print('[{}]'.format(cls_name)) print('[{}] {}\n[{}] {}'.format(cls_name, header, cls_name, '-' * len(header))) t_funcall = time.time() X_embedded = self._transform_without_checks(X) # Compute (squared) distances to the target neighbors n_neighbors = target_neighbors.shape[1] dist_tn = np.zeros((n_samples, n_neighbors)) for k in range(n_neighbors): dist_tn[:, k] = row_norms(X_embedded - X_embedded[target_neighbors[:, k]], squared=True) # Add the margin to all (squared) distances to target neighbors dist_tn += 1 # Find the impostors and compute (squared) distances to them impostors_graph = self._find_impostors( X_embedded, y, classes, dist_tn[:, -1], use_sparse) # Compute the push loss and its gradient loss, grad_new, n_active_triplets = \ _compute_push_loss(X, target_neighbors, dist_tn, impostors_graph) # Compute the total gradient grad = np.dot(transformation, grad_static + grad_new) grad *= 2 # Add the (weighted) pull loss to the total loss metric = np.dot(transformation.T, transformation) loss += np.dot(grad_static.ravel(), metric.ravel()) if self.verbose: t_funcall = time.time() - t_funcall values_fmt = '[{}] {:>10} {:>20.6e} {:>20,} {:>10.2f}' print(values_fmt.format(self.__class__.__name__, self.n_iter_, loss, n_active_triplets, t_funcall)) sys.stdout.flush() return loss, grad.ravel() def _find_impostors(self, X_embedded, y, classes, margin_radii, use_sparse=True): """Compute the (sample, impostor) pairs exactly. Parameters ---------- X_embedded : array, shape (n_samples, n_components) An array of transformed samples. y : array, shape (n_samples,) The corresponding (possibly encoded) class labels. classes : array, shape (n_classes,) The non-singleton classes, encoded as integers in [0, n_classes). margin_radii : array, shape (n_samples,) (Squared) distances of samples to their farthest target neighbors plus margin. use_sparse : bool, optional (default=True) Whether to use a sparse matrix to store the (sample, impostor) pairs. Returns ------- impostors_graph : coo_matrix, shape (n_samples, n_samples) Element (i, j) is the distance between samples i and j if j is an impostor to i, otherwise zero. """ n_samples = X_embedded.shape[0] if use_sparse: # Initialize a sparse (indicator) matrix for impostors storage impostors_sp = csr_matrix((n_samples, n_samples), dtype=np.int8) for class_id in classes[:-1]: ind_in, = np.where(y == class_id) ind_out, = np.where(y > class_id) # Split ind_out x ind_in into chunks of a size that fits # in memory imp_ind = _find_impostors_blockwise( X_embedded[ind_out], X_embedded[ind_in], margin_radii[ind_out], margin_radii[ind_in]) if len(imp_ind): # sample impostors if they are too many if len(imp_ind) > self.max_impostors: imp_ind = self.random_state_.choice( imp_ind, self.max_impostors, replace=False) dims = (len(ind_out), len(ind_in)) ii, jj = np.unravel_index(imp_ind, dims=dims) # Convert indices to refer to the original data matrix imp_row = ind_out[ii] imp_col = ind_in[jj] new_imp = csr_matrix((np.ones(len(imp_row), dtype=np.int8), (imp_row, imp_col)), dtype=np.int8, shape=(n_samples, n_samples)) impostors_sp = impostors_sp + new_imp impostors_sp = impostors_sp.tocoo(copy=False) imp_row = impostors_sp.row imp_col = impostors_sp.col # Make sure we do not exceed max_impostors n_impostors = len(imp_row) if n_impostors > self.max_impostors: ind_sampled = self.random_state_.choice( n_impostors, self.max_impostors, replace=False) imp_row = imp_row[ind_sampled] imp_col = imp_col[ind_sampled] imp_dist = _paired_distances_blockwise(X_embedded, imp_row, imp_col) else: # Initialize lists for impostors storage imp_row, imp_col, imp_dist = [], [], [] for class_id in classes[:-1]: ind_in, = np.where(y == class_id) ind_out, = np.where(y > class_id) # Split ind_out x ind_in into chunks of a size that fits in # memory imp_ind, dist_batch = _find_impostors_blockwise( X_embedded[ind_out], X_embedded[ind_in], margin_radii[ind_out], margin_radii[ind_in], return_distance=True) if len(imp_ind): # sample impostors if they are too many if len(imp_ind) > self.max_impostors: ind_sampled = self.random_state_.choice( len(imp_ind), self.max_impostors, replace=False) imp_ind = imp_ind[ind_sampled] dist_batch = dist_batch[ind_sampled] dims = (len(ind_out), len(ind_in)) ii, jj = np.unravel_index(imp_ind, dims=dims) # Convert indices to refer to the original data matrix imp_row.extend(ind_out[ii]) imp_col.extend(ind_in[jj]) imp_dist.extend(dist_batch) imp_row = np.asarray(imp_row, dtype=np.intp) imp_col = np.asarray(imp_col, dtype=np.intp) imp_dist = np.asarray(imp_dist) # Make sure we do not exceed max_impostors n_impostors = len(imp_row) if n_impostors > self.max_impostors: ind_sampled = self.random_state_.choice( n_impostors, self.max_impostors, replace=False) imp_row = imp_row[ind_sampled] imp_col = imp_col[ind_sampled] imp_dist = imp_dist[ind_sampled] impostors_graph = coo_matrix((imp_dist, (imp_row, imp_col)), shape=(n_samples, n_samples)) return impostors_graph
######################## # Some core functions # ####################### def _select_target_neighbors(X, y, n_neighbors, classes=None, **nn_kwargs): """Find the target neighbors of each data sample. Parameters ---------- X : array, shape (n_samples, n_features) The training samples. y : array, shape (n_samples,) The corresponding (encoded) training labels. n_neighbors : int The number of target neighbors to select for each sample in X. classes : array, shape (n_classes,), optional (default=None) The non-singleton classes, encoded as integers in [0, n_classes). If None (default), they will be inferred from ``y``. **nn_kwargs : keyword arguments Parameters to be passed to a :class:`neighbors.NearestNeighbors` instance except from ``n_neighbors``. Returns ------- target_neighbors: array, shape (n_samples, n_neighbors) The indices of the target neighbors of each sample. """ target_neighbors = np.zeros((X.shape[0], n_neighbors), dtype=np.intp) nn = NearestNeighbors(n_neighbors=n_neighbors, **nn_kwargs) if classes is None: classes = np.unique(y) for class_id in classes: ind_class, = np.where(y == class_id) nn.fit(X[ind_class]) neigh_ind = nn.kneighbors(return_distance=False) target_neighbors[ind_class] = ind_class[neigh_ind] return target_neighbors def _find_impostors_blockwise(X_a, X_b, radii_a, radii_b, return_distance=False, block_size=8): """Find (sample, impostor) pairs in blocks to avoid large memory usage. Parameters ---------- X_a : array, shape (n_samples_a, n_components) Transformed data samples from class A. X_b : array, shape (n_samples_b, n_components) Transformed data samples from class B. radii_a : array, shape (n_samples_a,) Squared distances of the samples in ``X_a`` to their margins. radii_b : array, shape (n_samples_b,) Squared distances of the samples in ``X_b`` to their margins. block_size : int, optional (default=8) The maximum number of mebibytes (MiB) of memory to use at a time for calculating paired squared distances. return_distance : bool, optional (default=False) Whether to return the squared distances to the impostors. Returns ------- imp_indices : array, shape (n_impostors,) Unraveled indices of (sample, impostor) pairs referring to a matrix of shape (n_samples_a, n_samples_b). imp_distances : array, shape (n_impostors,), optional imp_distances[i] is the squared distance between samples imp_row[i] and imp_col[i], where imp_row, imp_col = np.unravel_index(imp_indices, dims=(n_samples_a, n_samples_b)) """ n_samples_a = X_a.shape[0] bytes_per_row = X_b.shape[0] * X_b.itemsize block_n_rows = int(block_size*1024*1024 // bytes_per_row) imp_indices, imp_distances = [], [] # X_b squared norm stays constant, so pre-compute it to get a speed-up X_b_norm_squared = row_norms(X_b, squared=True)[np.newaxis, :] for chunk in gen_batches(n_samples_a, block_n_rows): # The function `sklearn.metrics.pairwise.euclidean_distances` would # add an extra ~8% time of computation due to input validation on # every chunk and another ~8% due to clipping of negative values. distances_ab = _euclidean_distances_without_checks( X_a[chunk], X_b, squared=True, Y_norm_squared=X_b_norm_squared, clip=False) ind_b, = np.where((distances_ab < radii_a[chunk, None]).ravel()) ind_a, = np.where((distances_ab < radii_b[None, :]).ravel()) ind = np.unique(np.concatenate((ind_a, ind_b))) if len(ind): ind_plus_offset = ind + chunk.start * X_b.shape[0] imp_indices.extend(ind_plus_offset) if return_distance: # We only need to do clipping if we return the distances. distances_chunk = distances_ab.ravel()[ind] # Clip only the indexed (unique) distances np.maximum(distances_chunk, 0, out=distances_chunk) imp_distances.extend(distances_chunk) imp_indices = np.asarray(imp_indices) if return_distance: return imp_indices, np.asarray(imp_distances) else: return imp_indices def _compute_push_loss(X, target_neighbors, dist_tn, impostors_graph): """ Parameters ---------- X : array, shape (n_samples, n_features) The training input samples. target_neighbors : array, shape (n_samples, n_neighbors) Indices of target neighbors of each sample. dist_tn : array, shape (n_samples, n_neighbors) (Squared) distances of samples to their target neighbors. impostors_graph : coo_matrix, shape (n_samples, n_samples) Element (i, j) is the distance between sample i and j if j is an impostor to i, otherwise zero. Returns ------- loss : float The push loss caused by the given target neighbors and impostors. grad : array, shape (n_features, n_features) The gradient of the push loss. n_active_triplets : int The number of active triplet constraints. """ n_samples, n_neighbors = dist_tn.shape imp_row = impostors_graph.row imp_col = impostors_graph.col dist_impostors = impostors_graph.data loss = 0 shape = (n_samples, n_samples) A0 = csr_matrix(shape) sample_range = range(n_samples) n_active_triplets = 0 for k in range(n_neighbors - 1, -1, -1): loss1 = np.maximum(dist_tn[imp_row, k] - dist_impostors, 0) ac, = np.where(loss1 > 0) n_active_triplets += len(ac) A1 = csr_matrix((2 * loss1[ac], (imp_row[ac], imp_col[ac])), shape) loss2 = np.maximum(dist_tn[imp_col, k] - dist_impostors, 0) ac, = np.where(loss2 > 0) n_active_triplets += len(ac) A2 = csc_matrix((2 * loss2[ac], (imp_row[ac], imp_col[ac])), shape) val = (A1.sum(1).ravel() + A2.sum(0)).getA1() A3 = csr_matrix((val, (sample_range, target_neighbors[:, k])), shape) A0 = A0 - A1 - A2 + A3 loss += np.dot(loss1, loss1) + np.dot(loss2, loss2) grad = _sum_weighted_outer_differences(X, A0) return loss, grad, n_active_triplets ########################## # Some helper functions # ######################### def _paired_distances_blockwise(X, ind_a, ind_b, squared=True, block_size=8): """Equivalent to row_norms(X[ind_a] - X[ind_b], squared=squared). Parameters ---------- X : array, shape (n_samples, n_features) An array of data samples. ind_a : array, shape (n_indices,) An array of sample indices. ind_b : array, shape (n_indices,) Another array of sample indices. squared : bool (default=True) Whether to return the squared distances. block_size : int, optional (default=8) The maximum number of mebibytes (MiB) of memory to use at a time for calculating paired (squared) distances. Returns ------- distances: array, shape (n_indices,) An array of pairwise, optionally squared, distances. """ bytes_per_row = X.shape[1] * X.itemsize batch_size = int(block_size*1024*1024 // bytes_per_row) n_pairs = len(ind_a) distances = np.zeros(n_pairs) for chunk in gen_batches(n_pairs, batch_size): distances[chunk] = row_norms(X[ind_a[chunk]] - X[ind_b[chunk]], True) return distances if squared else np.sqrt(distances, out=distances) def _sum_weighted_outer_differences(X, weights): """Compute the sum of weighted outer pairwise differences. Parameters ---------- X : array, shape (n_samples, n_features) An array of data samples. weights : csr_matrix, shape (n_samples, n_samples) A sparse weights matrix. Returns ------- sum_weighted_outer_diffs : array, shape (n_features, n_features) The sum of all outer weighted differences. """ weights_sym = weights + weights.T diagonal = weights_sym.sum(1).getA() laplacian_dot_X = diagonal * X - safe_sparse_dot(weights_sym, X, dense_output=True) result = np.dot(X.T, laplacian_dot_X) return result def _check_scalar(x, name, target_type, min_val=None, max_val=None): """Validate scalar parameters type and value. Parameters ---------- x : object The scalar parameter to validate. name : str The name of the parameter to be printed in error messages. target_type : type or tuple Acceptable data types for the parameter. min_val : float or int, optional (default=None) The minimum value value the parameter can take. If None (default) it is implied that the parameter does not have a lower bound. max_val: float or int, optional (default=None) The maximum valid value the parameter can take. If None (default) it is implied that the parameter does not have an upper bound. Raises ------- TypeError If the parameter's type does not match the desired type. ValueError If the parameter's value violates the given bounds. """ if not isinstance(x, target_type): raise TypeError('`{}` must be an instance of {}, not {}.' .format(name, target_type, type(x))) if min_val is not None and x < min_val: raise ValueError('`{}`= {}, must be >= {}.'.format(name, x, min_val)) if max_val is not None and x > max_val: raise ValueError('`{}`= {}, must be <= {}.'.format(name, x, max_val)) ##################################################################### # Convenience function to construct the trivial LMNN - KNN pipeline # #####################################################################
[docs]def make_lmnn_pipeline( n_neighbors=3, n_components=None, init='pca', warm_start=False, max_impostors=500000, neighbors_params=None, weight_push_loss=0.5, impostor_store='auto', max_iter=50, tol=1e-5, callback=None, store_opt_result=False, verbose=0, random_state=None, n_jobs=1, n_neighbors_predict=None, weights='uniform', algorithm='auto', leaf_size=30, n_jobs_predict=None, **kwargs): """Constructs a LargeMarginNearestNeighbor - KNeighborsClassifier pipeline. See LargeMarginNearestNeighbor module documentation for details. Parameters ---------- n_neighbors_predict : int, optional (default=None) The number of neighbors to use during prediction. If None (default) the value of ``n_neighbors`` used to train the model is used. weights : str or callable, optional (default = 'uniform') weight function used in prediction. Possible values: - 'uniform' : uniform weights. All points in each neighborhood are weighted equally. - 'distance' : weight points by the inverse of their distance. in this case, closer neighbors of a query point will have a greater influence than neighbors which are further away. - [callable] : a user-defined function which accepts an array of distances, and returns an array of the same shape containing the weights. algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, optional Algorithm used to compute the nearest neighbors: - 'ball_tree' will use :class:`BallTree` - 'kd_tree' will use :class:`KDTree` - 'brute' will use a brute-force search. - 'auto' will attempt to decide the most appropriate algorithm based on the values passed to :meth:`fit` method. Note: fitting on sparse input will override the setting of this parameter, using brute force. leaf_size : int, optional (default = 30) Leaf size passed to BallTree or KDTree. This can affect the speed of the construction and query, as well as the memory required to store the tree. The optimal value depends on the nature of the problem. n_jobs_predict : int, optional (default=None) The number of parallel jobs to run for neighbors search during prediction. If None (default), then the value of ``n_jobs`` is used. memory : None, str or object with the joblib.Memory interface, optional Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute ``named_steps`` or ``steps`` to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming. Returns ------- lmnn_pipe : Pipeline A Pipeline instance with two steps: a ``LargeMarginNearestNeighbor`` instance that is used to fit the model and a ``KNeighborsClassifier`` instance that is used for prediction. Examples -------- >>> from pylmnn import make_lmnn_pipeline >>> from sklearn.datasets import load_iris >>> from sklearn.model_selection import train_test_split >>> X, y = load_iris(return_X_y=True) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, ... stratify=y, test_size=0.7, random_state=42) >>> lmnn_pipe = make_lmnn_pipeline(n_neighbors=3, n_neighbors_predict=3, ... random_state=42) >>> lmnn_pipe.fit(X_train, y_train) # doctest: +ELLIPSIS Pipeline(...) >>> print(lmnn_pipe.score(X_test, y_test)) 0.971428571429 """ memory = kwargs.pop('memory', None) if kwargs: raise TypeError('Unknown keyword arguments: "{}"' .format(list(kwargs.keys())[0])) lmnn = LargeMarginNearestNeighbor( n_neighbors=n_neighbors, n_components=n_components, init=init, warm_start=warm_start, max_impostors=max_impostors, neighbors_params=neighbors_params, weight_push_loss=weight_push_loss, impostor_store=impostor_store, max_iter=max_iter, tol=tol, callback=callback, store_opt_result=store_opt_result, verbose=verbose, random_state=random_state, n_jobs=n_jobs) if n_neighbors_predict is None: n_neighbors_predict = n_neighbors if n_jobs_predict is None: n_jobs_predict = n_jobs knn = KNeighborsClassifier( n_neighbors=n_neighbors_predict, weights=weights, algorithm=algorithm, leaf_size=leaf_size, n_jobs=n_jobs_predict) return Pipeline([('lmnn', lmnn), ('knn', knn)], memory=memory)