Module ainshamsflow.optimizers
Optimizers Module.
In this Module, we include our optimization algorithms such as Stocastic Gradient Descent (SGD).
Expand source code
"""Optimizers Module.
In this Module, we include our optimization algorithms such as
Stocastic Gradient Descent (SGD).
"""
import numpy as np
import time
from ainshamsflow.metrics import Metric
from ainshamsflow.utils.asf_errors import BaseClassError, NameNotFoundError, UnsupportedShapeError
from ainshamsflow.utils.history import History
from ainshamsflow.utils.utils import time_elapsed
def get(opt_name):
"""Get any Optimizer in this Module by name."""
opts = [SGD, Momentum, AdaDelta, RMSProp, Adam, AdaDelta]
for opt in opts:
if opt.__name__.lower() == opt_name.lower():
return opt()
else:
raise NameNotFoundError(opt_name, __name__)
def _check_dims(weights, dw):
if weights.shape != dw.shape:
raise UnsupportedShapeError(dw.shape, weights.shape)
class Optimizer:
"""Optimiser Base Class.
Used to generalize learning using Gradient Descent.
Creating a new optimizer is as easy as creating a new class that
inherits from this class.
You then have to add any extra parameters in your constructor
(while still calling this class' constructor) and redefine the
_update() method.
"""
def __init__(self, lr=0.01):
"""Initialize the Learning Rate."""
self.lr = lr
def __call__(self, ds_train, ds_valid, epochs, train_batch_size, valid_batch_size, layers, loss, metrics,
regularizer, training=True, verbose=True, live_plot=False):
"""Run optimization using Gradient Descent. Return History Object for training session."""
m = ds_train.cardinality()
if train_batch_size is not None:
ds_train.batch(train_batch_size)
if ds_valid is not None:
if valid_batch_size is not None:
ds_valid.batch(train_batch_size)
history = History(loss, metrics, ds_valid is not None)
loss_values = []
metrics_values = []
val_loss_values = []
val_metrics_values = []
if verbose:
if training:
print('Training Model for {} epochs:'.format(epochs))
else:
print('Evaluating Model:')
t1 = time.time()
for i, (batch_x, batch_y) in enumerate(ds_train):
loss_value, metric_values = self._single_iteration(batch_x, batch_y, m, layers,
loss, metrics, regularizer, False, i)
loss_values.append(loss_value)
metrics_values.append(metric_values)
loss_values = np.array(loss_values).mean()
metrics_values = np.array(metrics_values).mean(axis=0)
if ds_valid is not None:
for i, (batch_x, batch_y) in enumerate(ds_valid):
val_loss_value, val_metric_values = self._single_iteration(batch_x, batch_y, m, layers,
loss, metrics, regularizer, False, i)
val_loss_values.append(val_loss_value)
val_metrics_values.append(val_metric_values)
val_loss_values = np.array(val_loss_values).mean()
val_metrics_values = np.array(val_metrics_values).mean(axis=0)
if verbose:
t2 = time.time()
if training:
print('Epoch #{:4d}:'.format(0), end=' ')
print(
't={},'.format(time_elapsed(t2-t1)),
'{}={:8.4f},'.format('loss', loss_values),
*['{}={:8.4f},'.format(metric.__name__, metrics_values[j])
for j, metric in enumerate(metrics)],
end=' '
)
if ds_valid is not None:
print(
'{}={:8.4f},'.format('val_loss', val_loss_values),
*['{}={:8.4f},'.format('val_' + metric.__name__, val_metrics_values[j])
for j, metric in enumerate(metrics)],
end=' '
)
print()
if training:
history.add(loss_values, metrics_values, val_loss_values, val_metrics_values)
if not training:
if verbose:
return
else:
return loss_values, metrics_values
for epoch_num in range(epochs):
loss_values = []
metrics_values = []
val_loss_values = []
val_metrics_values = []
t1 = time.time()
for i, (batch_x, batch_y) in enumerate(ds_train):
loss_value, metric_values = self._single_iteration(batch_x, batch_y, m, layers,
loss, metrics, regularizer, training, i)
loss_values.append(loss_value)
metrics_values.append(metric_values)
loss_values = np.array(loss_values).mean()
metrics_values = np.array(metrics_values).mean(axis=0)
if ds_valid is not None :
for i, (batch_x, batch_y) in enumerate(ds_valid):
val_loss_value, val_metric_values = self._single_iteration(batch_x, batch_y, m, layers,
loss, metrics, regularizer, False, i)
val_loss_values.append(val_loss_value)
val_metrics_values.append(val_metric_values)
val_loss_values = np.array(val_loss_values).mean()
val_metrics_values = np.array(val_metrics_values).mean(axis=0)
history.add(loss_values, metrics_values, val_loss_values, val_metrics_values)
if verbose:
t2 = time.time()
print(
'Epoch #{:4d}: t={},'.format(epoch_num+1, time_elapsed(t2-t1)),
'{}={:8.4f},'.format('loss', loss_values),
*['{}={:8.4f},'.format(metric.__name__, metrics_values[j]) for j, metric in enumerate(metrics)],
end=' '
)
if ds_valid is not None:
print(
'{}={:8.4f},'.format('val_loss', val_loss_values),
*['{}={:8.4f},'.format('val_' + metric.__name__, val_metrics_values[j])
for j, metric in enumerate(metrics)],
end=' '
)
print()
if live_plot:
history.show()
return history
def _single_iteration(self, batch_x, batch_y, m, layers, loss, metrics, regularizer, training, i):
"""Run optimization for a single batch. Return loss value and metric values for iteration."""
weights_list = [layer.get_weights()[0] for layer in layers]
# Forward Pass
batch_a = batch_x
for j, layer in enumerate(layers):
batch_a = layer(batch_a, training)
regularization_term = 0 if regularizer is None else regularizer(weights_list, m)
loss_value = loss(batch_a, batch_y) + regularization_term
metric_values = []
if metrics:
Metric.calc_confusion_matrix(batch_a, batch_y)
for metric in metrics:
metric_values.append(metric())
# Backward Pass
regularization_diff = None if regularizer is None else regularizer.diff(weights_list, m)
da = loss.diff(batch_a, batch_y)
if training:
for j in reversed(range(len(layers))):
da, dw, db = layers[j].diff(da)
if layers[j].trainable:
if regularization_diff is not None:
dw = self._add_reg_diff(dw, regularization_diff[j])
weights, biases = layers[j].get_weights()
updated_weights = self._update(i, weights, dw, layer_num=j, is_weight=True)
updated_biases = self._update(i, biases, db, layer_num=j, is_weight=False)
layers[j].set_weights(updated_weights, updated_biases)
return loss_value, metric_values
def _update(self, i, weights, dw, layer_num, is_weight):
raise BaseClassError
def _add_reg_diff(self, dw, reg_diff):
if isinstance(dw, list) and isinstance(reg_diff, list):
dw_new = []
for single_dw, single_reg_diff in zip(dw, reg_diff):
single_dw = self._add_reg_diff(single_dw, single_reg_diff)
dw_new.append(single_dw)
else:
dw_new = dw + reg_diff
return dw_new
class SGD(Optimizer):
"""Stochastic Gradient Descent Algorithm."""
__name__ = 'SGD'
def _update(self, i, weights, dw, layer_num, is_weight):
"""Update step for SGD."""
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
return weights - self.lr * dw
class Momentum(Optimizer):
"""SGD with Momentum"""
__name__ = 'Momentum'
def __init__(self, lr=0.01, beta=0.9):
super().__init__(lr)
self.beta = beta
self.momentum = {}
def _update(self, i, weights, dw, layer_num, is_weight):
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
# check for first time:
layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b')
if layer_id not in self.momentum.keys():
self.momentum[layer_id] = np.zeros(weights.shape)
# update step
self.momentum[layer_id] = self.beta * self.momentum[layer_id] + (1 - self.beta) * dw
return weights - self.lr * self.momentum[layer_id]
class AdaGrad(Optimizer):
"""AdaGrad"""
__name__ = 'AdaGrad'
def __init__(self, lr=0.01, beta=0.9):
super().__init__(lr)
self.beta = beta
self.RMS = {}
def _update(self, i, weights, dw, layer_num, is_weight):
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
# check for first time:
layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b')
if layer_id not in self.RMS.keys():
self.RMS[layer_id] = np.zeros(weights.shape)
# update step
self.RMS[layer_id] = self.RMS[layer_id] + np.square(dw)
return weights - self.lr * dw / (np.sqrt(self.RMS[layer_id]) + 1e-8)
class AdaDelta(Optimizer):
"""AdaDelta"""
__name__ = 'AdaDelta'
def __init__(self, lr=0.01, beta=0.9):
super().__init__(lr)
self.beta = beta
self.delta = {}
def _update(self, i, weights, dw, layer_num, is_weight):
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
# check for first time:
layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b')
if layer_id not in self.delta.keys():
self.delta[layer_id] = np.zeros(weights.shape)
# update step
self.delta[layer_id] = self.beta * self.delta[layer_id] + (1 - self.beta) * np.square(dw)
return weights - self.lr * dw / (np.sqrt(self.delta[layer_id]) + 1e-8)
class RMSProp(Optimizer):
"""RMSProp"""
__name__ = 'RMSProp'
def __init__(self, lr=0.01, beta=0.9):
super().__init__(lr)
self.beta = beta
self.stMoment = {}
self.ndMoment = {}
self.RMS = {}
def _update(self, i, weights, dw, layer_num, is_weight):
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
# check for first time:
layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b')
if layer_id not in self.stMoment.keys():
self.stMoment[layer_id] = np.zeros(weights.shape)
if layer_id not in self.ndMoment.keys():
self.ndMoment[layer_id] = np.zeros(weights.shape)
if layer_id not in self.RMS.keys():
self.RMS[layer_id] = np.zeros(weights.shape)
# update step
self.stMoment[layer_id] = self.beta * self.stMoment[layer_id] + (1 - self.beta) * dw
self.ndMoment[layer_id] = self.beta * self.ndMoment[layer_id] + (1 - self.beta) * np.square(dw)
self.RMS[layer_id] = self.beta * self.RMS[layer_id] + self.lr * dw / (
np.sqrt(self.ndMoment[layer_id] - np.square(self.stMoment[layer_id]) + 1e-8))
return weights - self.RMS[layer_id]
class Adam(Optimizer):
"""Adam Optimizer"""
__name__ = 'Adam'
def __init__(self, lr=0.01, beta1=0.9, beta2=0.999):
super().__init__(lr)
self.beta1 = beta1
self.beta2 = beta2
self.momentum = {}
self.secMoment = {}
def _update(self, i, weights, dw, layer_num, is_weight):
if isinstance(weights, list) and isinstance(dw, list):
ans = []
for j, (weight, d) in enumerate(zip(weights, dw)):
l_num = '{}.{}'.format(layer_num, j)
ans.append(self._update(i, weight, d, l_num, is_weight))
return ans
else:
_check_dims(weights, dw)
# check for first time:
layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b')
if layer_id not in self.secMoment.keys():
self.secMoment[layer_id] = np.zeros(weights.shape)
if layer_id not in self.momentum.keys():
self.momentum[layer_id] = np.zeros(weights.shape)
# update step
self.secMoment[layer_id] = (self.beta2 * self.secMoment[layer_id] + (1 - self.beta2) * np.square(dw)) / (
1 - np.power(self.beta2, i + 1))
self.momentum[layer_id] = (self.beta1 * self.momentum[layer_id] + (1 - self.beta1) * dw) / (
1 - np.power(self.beta1, i + 1))
return weights - self.lr * self.momentum[layer_id] / (np.sqrt(self.secMoment[layer_id]) + 1e-8)
Functions
def get(opt_name)-
Get any Optimizer in this Module by name.
Expand source code
def get(opt_name): """Get any Optimizer in this Module by name.""" opts = [SGD, Momentum, AdaDelta, RMSProp, Adam, AdaDelta] for opt in opts: if opt.__name__.lower() == opt_name.lower(): return opt() else: raise NameNotFoundError(opt_name, __name__)
Classes
class AdaDelta (lr=0.01, beta=0.9)-
AdaDelta
Initialize the Learning Rate.
Expand source code
class AdaDelta(Optimizer): """AdaDelta""" __name__ = 'AdaDelta' def __init__(self, lr=0.01, beta=0.9): super().__init__(lr) self.beta = beta self.delta = {} def _update(self, i, weights, dw, layer_num, is_weight): if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) # check for first time: layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b') if layer_id not in self.delta.keys(): self.delta[layer_id] = np.zeros(weights.shape) # update step self.delta[layer_id] = self.beta * self.delta[layer_id] + (1 - self.beta) * np.square(dw) return weights - self.lr * dw / (np.sqrt(self.delta[layer_id]) + 1e-8)Ancestors
class AdaGrad (lr=0.01, beta=0.9)-
AdaGrad
Initialize the Learning Rate.
Expand source code
class AdaGrad(Optimizer): """AdaGrad""" __name__ = 'AdaGrad' def __init__(self, lr=0.01, beta=0.9): super().__init__(lr) self.beta = beta self.RMS = {} def _update(self, i, weights, dw, layer_num, is_weight): if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) # check for first time: layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b') if layer_id not in self.RMS.keys(): self.RMS[layer_id] = np.zeros(weights.shape) # update step self.RMS[layer_id] = self.RMS[layer_id] + np.square(dw) return weights - self.lr * dw / (np.sqrt(self.RMS[layer_id]) + 1e-8)Ancestors
class Adam (lr=0.01, beta1=0.9, beta2=0.999)-
Adam Optimizer
Initialize the Learning Rate.
Expand source code
class Adam(Optimizer): """Adam Optimizer""" __name__ = 'Adam' def __init__(self, lr=0.01, beta1=0.9, beta2=0.999): super().__init__(lr) self.beta1 = beta1 self.beta2 = beta2 self.momentum = {} self.secMoment = {} def _update(self, i, weights, dw, layer_num, is_weight): if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) # check for first time: layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b') if layer_id not in self.secMoment.keys(): self.secMoment[layer_id] = np.zeros(weights.shape) if layer_id not in self.momentum.keys(): self.momentum[layer_id] = np.zeros(weights.shape) # update step self.secMoment[layer_id] = (self.beta2 * self.secMoment[layer_id] + (1 - self.beta2) * np.square(dw)) / ( 1 - np.power(self.beta2, i + 1)) self.momentum[layer_id] = (self.beta1 * self.momentum[layer_id] + (1 - self.beta1) * dw) / ( 1 - np.power(self.beta1, i + 1)) return weights - self.lr * self.momentum[layer_id] / (np.sqrt(self.secMoment[layer_id]) + 1e-8)Ancestors
class Momentum (lr=0.01, beta=0.9)-
SGD with Momentum
Initialize the Learning Rate.
Expand source code
class Momentum(Optimizer): """SGD with Momentum""" __name__ = 'Momentum' def __init__(self, lr=0.01, beta=0.9): super().__init__(lr) self.beta = beta self.momentum = {} def _update(self, i, weights, dw, layer_num, is_weight): if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) # check for first time: layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b') if layer_id not in self.momentum.keys(): self.momentum[layer_id] = np.zeros(weights.shape) # update step self.momentum[layer_id] = self.beta * self.momentum[layer_id] + (1 - self.beta) * dw return weights - self.lr * self.momentum[layer_id]Ancestors
class Optimizer (lr=0.01)-
Optimiser Base Class.
Used to generalize learning using Gradient Descent.
Creating a new optimizer is as easy as creating a new class that inherits from this class. You then have to add any extra parameters in your constructor (while still calling this class' constructor) and redefine the _update() method.
Initialize the Learning Rate.
Expand source code
class Optimizer: """Optimiser Base Class. Used to generalize learning using Gradient Descent. Creating a new optimizer is as easy as creating a new class that inherits from this class. You then have to add any extra parameters in your constructor (while still calling this class' constructor) and redefine the _update() method. """ def __init__(self, lr=0.01): """Initialize the Learning Rate.""" self.lr = lr def __call__(self, ds_train, ds_valid, epochs, train_batch_size, valid_batch_size, layers, loss, metrics, regularizer, training=True, verbose=True, live_plot=False): """Run optimization using Gradient Descent. Return History Object for training session.""" m = ds_train.cardinality() if train_batch_size is not None: ds_train.batch(train_batch_size) if ds_valid is not None: if valid_batch_size is not None: ds_valid.batch(train_batch_size) history = History(loss, metrics, ds_valid is not None) loss_values = [] metrics_values = [] val_loss_values = [] val_metrics_values = [] if verbose: if training: print('Training Model for {} epochs:'.format(epochs)) else: print('Evaluating Model:') t1 = time.time() for i, (batch_x, batch_y) in enumerate(ds_train): loss_value, metric_values = self._single_iteration(batch_x, batch_y, m, layers, loss, metrics, regularizer, False, i) loss_values.append(loss_value) metrics_values.append(metric_values) loss_values = np.array(loss_values).mean() metrics_values = np.array(metrics_values).mean(axis=0) if ds_valid is not None: for i, (batch_x, batch_y) in enumerate(ds_valid): val_loss_value, val_metric_values = self._single_iteration(batch_x, batch_y, m, layers, loss, metrics, regularizer, False, i) val_loss_values.append(val_loss_value) val_metrics_values.append(val_metric_values) val_loss_values = np.array(val_loss_values).mean() val_metrics_values = np.array(val_metrics_values).mean(axis=0) if verbose: t2 = time.time() if training: print('Epoch #{:4d}:'.format(0), end=' ') print( 't={},'.format(time_elapsed(t2-t1)), '{}={:8.4f},'.format('loss', loss_values), *['{}={:8.4f},'.format(metric.__name__, metrics_values[j]) for j, metric in enumerate(metrics)], end=' ' ) if ds_valid is not None: print( '{}={:8.4f},'.format('val_loss', val_loss_values), *['{}={:8.4f},'.format('val_' + metric.__name__, val_metrics_values[j]) for j, metric in enumerate(metrics)], end=' ' ) print() if training: history.add(loss_values, metrics_values, val_loss_values, val_metrics_values) if not training: if verbose: return else: return loss_values, metrics_values for epoch_num in range(epochs): loss_values = [] metrics_values = [] val_loss_values = [] val_metrics_values = [] t1 = time.time() for i, (batch_x, batch_y) in enumerate(ds_train): loss_value, metric_values = self._single_iteration(batch_x, batch_y, m, layers, loss, metrics, regularizer, training, i) loss_values.append(loss_value) metrics_values.append(metric_values) loss_values = np.array(loss_values).mean() metrics_values = np.array(metrics_values).mean(axis=0) if ds_valid is not None : for i, (batch_x, batch_y) in enumerate(ds_valid): val_loss_value, val_metric_values = self._single_iteration(batch_x, batch_y, m, layers, loss, metrics, regularizer, False, i) val_loss_values.append(val_loss_value) val_metrics_values.append(val_metric_values) val_loss_values = np.array(val_loss_values).mean() val_metrics_values = np.array(val_metrics_values).mean(axis=0) history.add(loss_values, metrics_values, val_loss_values, val_metrics_values) if verbose: t2 = time.time() print( 'Epoch #{:4d}: t={},'.format(epoch_num+1, time_elapsed(t2-t1)), '{}={:8.4f},'.format('loss', loss_values), *['{}={:8.4f},'.format(metric.__name__, metrics_values[j]) for j, metric in enumerate(metrics)], end=' ' ) if ds_valid is not None: print( '{}={:8.4f},'.format('val_loss', val_loss_values), *['{}={:8.4f},'.format('val_' + metric.__name__, val_metrics_values[j]) for j, metric in enumerate(metrics)], end=' ' ) print() if live_plot: history.show() return history def _single_iteration(self, batch_x, batch_y, m, layers, loss, metrics, regularizer, training, i): """Run optimization for a single batch. Return loss value and metric values for iteration.""" weights_list = [layer.get_weights()[0] for layer in layers] # Forward Pass batch_a = batch_x for j, layer in enumerate(layers): batch_a = layer(batch_a, training) regularization_term = 0 if regularizer is None else regularizer(weights_list, m) loss_value = loss(batch_a, batch_y) + regularization_term metric_values = [] if metrics: Metric.calc_confusion_matrix(batch_a, batch_y) for metric in metrics: metric_values.append(metric()) # Backward Pass regularization_diff = None if regularizer is None else regularizer.diff(weights_list, m) da = loss.diff(batch_a, batch_y) if training: for j in reversed(range(len(layers))): da, dw, db = layers[j].diff(da) if layers[j].trainable: if regularization_diff is not None: dw = self._add_reg_diff(dw, regularization_diff[j]) weights, biases = layers[j].get_weights() updated_weights = self._update(i, weights, dw, layer_num=j, is_weight=True) updated_biases = self._update(i, biases, db, layer_num=j, is_weight=False) layers[j].set_weights(updated_weights, updated_biases) return loss_value, metric_values def _update(self, i, weights, dw, layer_num, is_weight): raise BaseClassError def _add_reg_diff(self, dw, reg_diff): if isinstance(dw, list) and isinstance(reg_diff, list): dw_new = [] for single_dw, single_reg_diff in zip(dw, reg_diff): single_dw = self._add_reg_diff(single_dw, single_reg_diff) dw_new.append(single_dw) else: dw_new = dw + reg_diff return dw_newSubclasses
class RMSProp (lr=0.01, beta=0.9)-
RMSProp
Initialize the Learning Rate.
Expand source code
class RMSProp(Optimizer): """RMSProp""" __name__ = 'RMSProp' def __init__(self, lr=0.01, beta=0.9): super().__init__(lr) self.beta = beta self.stMoment = {} self.ndMoment = {} self.RMS = {} def _update(self, i, weights, dw, layer_num, is_weight): if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) # check for first time: layer_id = "{}{}".format(layer_num, 'w' if is_weight else 'b') if layer_id not in self.stMoment.keys(): self.stMoment[layer_id] = np.zeros(weights.shape) if layer_id not in self.ndMoment.keys(): self.ndMoment[layer_id] = np.zeros(weights.shape) if layer_id not in self.RMS.keys(): self.RMS[layer_id] = np.zeros(weights.shape) # update step self.stMoment[layer_id] = self.beta * self.stMoment[layer_id] + (1 - self.beta) * dw self.ndMoment[layer_id] = self.beta * self.ndMoment[layer_id] + (1 - self.beta) * np.square(dw) self.RMS[layer_id] = self.beta * self.RMS[layer_id] + self.lr * dw / ( np.sqrt(self.ndMoment[layer_id] - np.square(self.stMoment[layer_id]) + 1e-8)) return weights - self.RMS[layer_id]Ancestors
class SGD (lr=0.01)-
Stochastic Gradient Descent Algorithm.
Initialize the Learning Rate.
Expand source code
class SGD(Optimizer): """Stochastic Gradient Descent Algorithm.""" __name__ = 'SGD' def _update(self, i, weights, dw, layer_num, is_weight): """Update step for SGD.""" if isinstance(weights, list) and isinstance(dw, list): ans = [] for j, (weight, d) in enumerate(zip(weights, dw)): l_num = '{}.{}'.format(layer_num, j) ans.append(self._update(i, weight, d, l_num, is_weight)) return ans else: _check_dims(weights, dw) return weights - self.lr * dwAncestors