Source code for neal.sampler

# Copyright 2018 D-Wave Systems Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
# ================================================================================================
"""
A dimod :term:`sampler` that uses the simulated annealing algorithm.

"""
import math

from numbers import Integral
from numpy.random import randint
from collections import defaultdict

import dimod
import numpy as np

import neal.simulated_annealing as sa

import warnings

__all__ = ["Neal", "SimulatedAnnealingSampler", "default_beta_range"]


[docs]class SimulatedAnnealingSampler(dimod.Sampler, dimod.Initialized): """Simulated annealing sampler. Also aliased as :class:`.Neal`. Examples: This example solves a simple Ising problem. >>> import neal >>> sampler = neal.SimulatedAnnealingSampler() >>> h = {'a': 0.0, 'b': 0.0, 'c': 0.0} >>> J = {('a', 'b'): 1.0, ('b', 'c'): 1.0, ('a', 'c'): 1.0} >>> sampleset = sampler.sample_ising(h, J, num_reads=10) >>> print(sampleset.first.energy) -1.0 """ parameters = None """dict: A dict where keys are the keyword parameters accepted by the sampler methods (allowed kwargs) and values are lists of :attr:`SimulatedAnnealingSampler.properties` relevant to each parameter. See :meth:`.SimulatedAnnealingSampler.sample` for a description of the parameters. Examples: This example looks at a sampler's parameters and some of their values. >>> import neal >>> sampler = neal.SimulatedAnnealingSampler() >>> for kwarg in sorted(sampler.parameters): ... print(kwarg) beta_range beta_schedule_type initial_states initial_states_generator interrupt_function num_reads num_sweeps num_sweeps_per_beta seed >>> sampler.parameters['beta_range'] [] >>> sampler.parameters['beta_schedule_type'] ['beta_schedule_options'] """ properties = None """dict: A dict containing any additional information about the sampler. Examples: This example looks at the values set for a sampler property. >>> import neal >>> sampler = neal.SimulatedAnnealingSampler() >>> sampler.properties['beta_schedule_options'] ('linear', 'geometric', 'custom') """ def __init__(self): # create a local copy in case folks for some reason want to modify them self.parameters = {'beta_range': [], 'num_reads': [], 'num_sweeps': [], 'num_sweeps_per_beta': [], 'beta_schedule_type': ['beta_schedule_options'], 'seed': [], 'interrupt_function': [], 'initial_states': [], 'initial_states_generator': [], } self.properties = {'beta_schedule_options': ('linear', 'geometric', 'custom')}
[docs] def sample(self, bqm, beta_range=None, num_reads=None, num_sweeps = None, num_sweeps_per_beta=1, beta_schedule_type="geometric", seed=None, interrupt_function=None, beta_schedule = None, initial_states=None, initial_states_generator="random", **kwargs): """Sample from a binary quadratic model using an implemented sample method. Args: bqm (:class:`dimod.BinaryQuadraticModel`): The binary quadratic model to be sampled. beta_range (tuple or list, optional): A 2-tuple or list defining the beginning and end of the beta schedule, where beta is the inverse temperature. The schedule is interpolated within this range according to the value specified by ``beta_schedule_type``. Default range is set based on the total bias associated with each node. num_reads (int, optional, default=len(initial_states) or 1): Number of reads. Each read is generated by one run of the simulated annealing algorithm. If `num_reads` is not explicitly given, it is selected to match the number of initial states given. If initial states are not provided, only one read is performed. num_sweeps (int, optional, default=``len(beta_schedule)*num_sweeps_per_beta`` or 1000): Number of sweeps used in annealing. If no value is provided and ``beta_schedule`` is None the value is defaulted to 1000. num_sweeps_per_beta (int, optional, default=1) Number of sweeps to perform at each beta. One sweep consists of a sequential Metropolis update of all spins. beta_schedule_type (string, optional, default="geometric"): Beta schedule type, or how the beta values are interpolated between the given ``beta_range``. Supported values are: * "linear" * "geometric" * "custom" "custom" is recommended for high-performance applications, which typically require optimizing beta schedules beyond those of the "linear" and "geometric" options, with bounds beyond those provided by default. ``num_sweeps_per_beta`` and ``beta_schedule`` fully specify a custom schedule. beta_schedule (array-like, optional, default = None) Sequence of beta values swept. Format compatible with numpy.array(beta_schedule,dtype=float) required. Values should be non-negative. seed (int, optional, default = None): Seed to use for the PRNG. Specifying a particular seed with a constant set of parameters produces identical results. If not provided, a random seed is chosen. initial_states (samples-like, optional, default=None): One or more samples, each defining an initial state for all the problem variables. Initial states are given one per read, but if fewer than ``num_reads`` initial states are defined, additional values are generated as specified by ``initial_states_generator``. See func:``.as_samples`` for a description of "samples-like". initial_states_generator (str, "none"/"tile"/"random", optional, default="random"): Defines the expansion of ``initial_states`` if fewer than ``num_reads`` are specified: * "none": If the number of initial states specified is smaller than ``num_reads``, raises ValueError. * "tile": Reuses the specified initial states if fewer than ``num_reads`` or truncates if greater. * "random": Expands the specified initial states with randomly generated states if fewer than ``num_reads`` or truncates if greater. interrupt_function (function, optional): If provided, interrupt_function is called with no parameters between each sample of simulated annealing. If the function returns True, then simulated annealing will terminate and return with all of the samples and energies found so far. Returns: :class:`dimod.SampleSet` Examples: This example runs simulated annealing on a binary quadratic model with some different input parameters. >>> import dimod >>> import neal ... >>> sampler = neal.SimulatedAnnealingSampler() >>> bqm = dimod.BinaryQuadraticModel({'a': .5, 'b': -.5}, ... {('a', 'b'): -1}, 0.0, ... dimod.SPIN) >>> # Run with default parameters >>> sampleset = sampler.sample(bqm) >>> # Run with specified parameters >>> sampleset = sampler.sample(bqm, seed=1234, ... beta_range=[0.1, 4.2], ... num_sweeps=20, ... beta_schedule_type='geometric') >>> # Reuse a seed >>> a1 = next((sampler.sample(bqm, seed=88)).samples())['a'] >>> a2 = next((sampler.sample(bqm, seed=88)).samples())['a'] >>> a1 == a2 True """ # get the original vartype so we can return consistently original_vartype = bqm.vartype # convert to spin (if needed) if bqm.vartype is not dimod.SPIN: bqm = bqm.change_vartype(dimod.SPIN, inplace=False) # parse_initial_states could handle seed generation, but because we're # sharing it with the SA algo, we handle it out here if seed is None: seed = randint(2**31) elif not isinstance(seed, Integral): error_msg = ("'seed' should be None or an integer between 0 and 2^32 " "- 1: value = {}".format(seed)) raise TypeError(error_msg) elif not (0 <= seed < 2**31): error_msg = ("'seed' should be an integer between 0 and 2^32 - 1: " "value = {}".format(seed)) raise ValueError(error_msg) # parse the inputs parsed = self.parse_initial_states( bqm, num_reads=num_reads, initial_states=initial_states, initial_states_generator=initial_states_generator, seed=seed) num_reads = parsed.num_reads # read out the initial states and the variable order initial_states_array = np.ascontiguousarray( parsed.initial_states.record.sample) variable_order = parsed.initial_states.variables # read out the BQM ldata, (irow, icol, qdata), off = bqm.to_numpy_vectors( variable_order=variable_order) if interrupt_function and not callable(interrupt_function): raise TypeError("'interrupt_function' should be a callable") if not isinstance(num_sweeps_per_beta, Integral): error_msg = "'num_sweeps_per_beta' should be a positive integer: value = {}".format(num_sweeps_per_beta) raise TypeError(error_msg) if num_sweeps_per_beta < 1: error_msg = "'num_sweeps_per_beta' should be a positive integer: value = {}".format(num_sweeps_per_beta) raise ValueError(error_msg) # handle beta_schedule et al if beta_schedule_type == "custom": if beta_schedule is None: error_msg = "'beta_schedule' must be provided for beta_schedule_type = 'custom': value is None" raise ValueError(error_msg) else: try: beta_schedule = np.array(beta_schedule,dtype=float) except: raise ValueError('beta_schedule cannot be case as a numpy array of dtype=float') if num_sweeps is not None and num_sweeps!=len(beta_schedule)*num_sweeps_per_beta: error_msg = "'num_sweeps' should be set to None, or a value consistent with 'beta_schedule' and 'num_sweeps_per_beta' for 'beta_schedule_type' = 'custom': value = ".format(num_sweeps) raise ValueError(error_msg) if beta_range is not None and (beta_range[0]!=beta_schedule[0] or beta_range[-1]!=beta_schedule[-1]): error_msg = "'beta_range' should be set to None, or a value consistent with 'beta_schedule', for 'beta_schedule_type'='custom'." raise ValueError(error_msg) if np.min(beta_schedule)<0: error_msg = "'beta_schedule' cannot include negative values." raise ValueError(error_msg) else: if beta_schedule is not None: error_msg = "'beta_schedule' must be set to None for 'beta_schedule_type' not equal to 'custom'" raise ValueError(error_msg) if num_sweeps is None: num_sweeps = 1000 num_betas, rem = divmod(num_sweeps,num_sweeps_per_beta) if rem > 0 or num_betas < 1: error_msg = "'num_sweeps' must be a positive value divisible by 'num_sweeps_per_beta'." raise ValueError(error_msg) if beta_range is None: beta_range = _default_ising_beta_range(bqm.linear, bqm.quadratic) elif len(beta_range) != 2 or min(beta_range) < 0: error_msg = "'beta_range' should be a 2-tuple, or 2 element list of positive numbers. The latter value is the target value." raise ValueError(error_msg) if num_betas == 1: #One sweep in the target model beta_schedule = np.array([beta_range[-1]],dtype=float) else: if beta_schedule_type == "linear": # interpolate a linear beta schedule beta_schedule = np.linspace(*beta_range, num=num_betas) elif beta_schedule_type == "geometric": if min(beta_range) <= 0: error_msg = "'beta_range' must contain non-zero values for 'beta_schedule_type' = 'geometric'" raise ValueError(error_msg) # interpolate a geometric beta schedule beta_schedule = np.geomspace(*beta_range, num=num_betas) else: raise ValueError("Beta schedule type {} not implemented".format(beta_schedule_type)) # run the simulated annealing algorithm samples, energies = sa.simulated_annealing( num_reads, ldata, irow, icol, qdata, num_sweeps_per_beta, beta_schedule, seed, initial_states_array, interrupt_function) info = { "beta_range": beta_range, "beta_schedule_type": beta_schedule_type } response = dimod.SampleSet.from_samples( (samples, variable_order), energy=energies+bqm.offset, # add back in the offset info=info, vartype=dimod.SPIN ) response.change_vartype(original_vartype, inplace=True) return response
Neal = SimulatedAnnealingSampler def _default_ising_beta_range(h, J, max_single_qubit_excitation_rate = 0.01, scale_T_with_N = True): """Determine the starting and ending beta from h J. The assumed usage of annealing is optimization. Defaults are chosen so that at the hot temperature is fast mixing, and so that at low temperature the rate of excitations is small, exploiting a simple approximation to the ground state energy space to determine a low temperature bound and a worst case approximation to energy distribution to determine the high temperature bound. Args: h (dict): External field of Ising model (also called linear bias) J (dict): Couplings of Ising model (also called quadratic biases) max_single_qubit_excitation_rate (float, optional, default = 0.01): Targeted single qubit excitation rate at final temperature. We can set this value small to lower the probability of excitations at the end of the anneal, combined with a simple approximation to the ground-state energy structure. For standard schedule types (such as geometric), a trade-off exists between this threshold and time to solution (exploiting multiple restarts). scale_T_with_N (bool, optional, default = False): The expected number of excitations at finite temperature scales with the number of variables. For non-vanishing probability of the ground state it is necessary for T to scale to zero with N. A simplified approximation to the ground-state energy structure is used - for high precision problems T may not scale. Assume each variable in J is also in h. Generally one should optimize min_beta, max_beta and the shape of the schedule to maximize some objective such as time to solution. The approximations here are simplified, and can be far from optimal in some cases. Use custom schedules to address such cases. Approximations used are of complexity O(number of biases), O(1) approximations are also possible, as are stronger approximations, but methods are not bottlenecks in practice. We use the maximum bias per spin to give an upper bound on the gap, allowing the initial sweeps to be fast mixing. We use an approximation to the minimum bias per spin to give a lower bound on the minimum energy gap, such that for the final sweeps states are highly unlikely to excite away from a global (or local) minimum. """ if not 0 < max_single_qubit_excitation_rate < 1: raise ValueError('Targeted single qubit excitations rates must be in range (0,1)') # Approximate worst and best cases of the [non-zero] energy signal (effective field) # experienced per spin as function of neighbors: bias = h_i + sum_j Jij s_j: sum_abs_bias_dict = defaultdict(int, {k: abs(v) for k, v in h.items()}) if sum_abs_bias_dict: min_abs_bias_dict = {k: v for k, v in sum_abs_bias_dict.items() if v != 0} else: min_abs_bias_dict = {} #This loop is slow, but is not a bottleneck for practical implementations of simulated annealing: for (k1, k2), v in J.items(): for k in [k1,k2]: sum_abs_bias_dict[k] += abs(v) if v != 0: #This is slow, but this routine is not a bottleneck: if k in min_abs_bias_dict: min_abs_bias_dict[k] = min(abs(v),min_abs_bias_dict[k]) else: min_abs_bias_dict[k] = abs(v) if not min_abs_bias_dict: #If dictionary is empty, a Null problem is being presented. This is #likely a user error, or edge case test. #We should also consider warning for min(min_abs_bias_dict.values())==0. #Metropolis-Hastings is not suitable for unbiased and uncoupled #variables, sampling of equilibrium is possible, but only if a uniform #random initial condition is used (this is default, but allows changes). warn_msg = ('All bqm biases are zero (all energies are zero), this is ' 'likely a value error. Temperature range is set arbitrarily ' 'to [0.1,1]. Metropolis-Hastings update is non-ergodic.') warnings.warn(warn_msg) return([0.1,1]) # Selecting betas based on probability of flipping a qubit # Hot temp: We want to be able to quickly search the entire solution space (equilibrate). # Scale hot_beta so that all spins are able to flip with probability at least 50%, which # ensures mixing across all states is fast. # The most unlikely flip is related to the largest energy gap that must be overcome, with # Metropolis updates we require: # 0.50 = exp(-hot_beta * max_delta_energy) - (1) # This is solved as hot_beta = log(2)/max_delta_energy, max_delta energy is twice the # effective field, we take a worst case of the effective field to be conservative. # Max delta energy occurs when all biases are aligned, and contribute without frustration: max_effective_field = max(sum_abs_bias_dict.values(), default=0) if max_effective_field == 0: hot_beta = 1 else: hot_beta = np.log(2) / (2*max_effective_field) # Cold temp: Towards the end of the annealing schedule, we want to eliminate excitations assuming # an optimization application; the samples relax to a local (or ideally global) minima. # If we arrive at the ground state (or local minima) on the penultimate iteration, we can require # that we have low probability to excite on the final iteration. # This means that the probability to excite any spin must be bounded by 0.01 on the final sweep # 0.01 = sum_i exp(-cold_beta * min_delta_energy_i) - (2) # We can approximate min_delta_energy_i as the cost of frustrating the smallest J or h. This approximation # can be tight for low precision problems, but can be improved for higher precision ones. In the worst # case (high connectity and high precision) determining min_delta_energy_i is NP-hard (related to a # knapsack problem) we settle for this approximation, which yields satisfactory results in many standard # model types. # (2) is a convex optimization problem, and easy to solve with a root finder. However, for brevity we can # further simplify, by assuming only spins experiencing minimal gaps are excitable # 0.01 ~ #minimal_gaps exp(- cold_beta min_i min_delta_energy_i) - (2) # Where #minimal_gaps counts the number of cases i for which min_delta_energy_i = min_i(min_delta_energy_i) # i.e. the number of spins which are most easily excited out of the ground state. # The solution is cold_beta = log(#minimal_gaps/0.01) / min_delta_energy. # min_delta_energy is twice the smallest (non-zero) effective field, we approximate this per spin as the # smallest associated bias term (h or J). if len(min_abs_bias_dict)==0: #Trivial problem cold_beta = hot_beta else: values_array = np.array(list(min_abs_bias_dict.values()),dtype=float) min_effective_field = np.min(values_array) if scale_T_with_N: #This is effective assuming finite precision, for pathological #or real valued problems this approximation may perform poorly number_min_gaps = np.sum(min_effective_field == values_array) else: number_min_gaps = 1 cold_beta = np.log(number_min_gaps/max_single_qubit_excitation_rate) / (2*min_effective_field) return [hot_beta, cold_beta] def default_beta_range(bqm): ising = bqm.spin return _default_ising_beta_range(ising.linear, ising.quadratic)