Source code for parl.core.fluid.model

#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import paddle.fluid as fluid
from parl.core.fluid.layers.layer_wrappers import LayerFunc
from parl.core.fluid.plutils import *
from parl.core.model_base import ModelBase
from parl.utils.deprecation import deprecated
from parl.utils import machine_info

__all__ = ['Model']


[docs]class Model(ModelBase): """ | `alias`: ``parl.Model`` | `alias`: ``parl.core.fluid.agent.Model`` | ``Model`` is a base class of PARL for the neural network. A ``Model`` is usually a policy or Q-value function, which predicts an action or an estimate according to the environmental observation. | To track all the layers , users are required to implement neural networks with the layers from ``parl.layers`` (e.g., parl.layers.fc). These layers has the same APIs as fluid.layers. | ``Model`` supports duplicating a ``Model`` instance in a pythonic way: | ``copied_model = copy.deepcopy(model)`` Example: .. code-block:: python import parl class Policy(parl.Model): def __init__(self): self.fc = parl.layers.fc(size=12, act='softmax') def policy(self, obs): out = self.fc(obs) return out policy = Policy() copied_policy = copy.deepcopy(model) Attributes: model_id(str): each model instance has its uniqe model_id. Public Functions: - ``sync_weights_to``: synchronize parameters of the current model to another model. - ``get_weights``: return a list containing all the parameters of the current model. - ``set_weights``: copy parameters from ``set_weights()`` to the model. - ``forward``: define the computations of a neural network. **Should** be overridden by all subclasses. - ``parameters``: return a list containting names of parameters of the model. - ``set_model_id``: set ``model_id`` of current model explicitly. - ``get_model_id``: return the ``model_id`` of current model. """
[docs] @deprecated( deprecated_in='1.2', removed_in='1.3', replace_function='sync_weights_to') def sync_params_to(self, target_net, gpu_id=None, decay=0.0, share_vars_parallel_executor=None): """Synchronize parameters in the model to another model (target_net). target_net_weights = decay * target_net_weights + (1 - decay) * source_net_weights Args: target_model (`parl.Model`): an instance of ``Model`` that has the same neural network architecture as the current model. decay (float): the rate of decline in copying parameters. 0 if no parameters decay when synchronizing the parameters. share_vars_parallel_executor (fluid.ParallelExecutor): Optional. If not None, will use fluid.ParallelExecutor to run program instead of fluid.Executor """ self.sync_weights_to( target_model=target_net, decay=decay, share_vars_parallel_executor=share_vars_parallel_executor)
[docs] def sync_weights_to(self, target_model, decay=0.0, share_vars_parallel_executor=None): """Synchronize parameters of current model to another model. To speed up the synchronizing process, it will create a program implicitly to finish the process. It also stores a program as the cache to avoid creating program repeatedly. target_model_weights = decay * target_model_weights + (1 - decay) * current_model_weights Args: target_model (`parl.Model`): an instance of ``Model`` that has the same neural network architecture as the current model. decay (float): the rate of decline in copying parameters. 0 if no parameters decay when synchronizing the parameters. share_vars_parallel_executor (fluid.ParallelExecutor): Optional. If not None, will use ``fluid.ParallelExecutor`` to run program instead of ``fluid.Executor``. Example: .. code-block:: python import copy # create a model that has the same neural network structures. target_model = copy.deepcopy(model) # after initilizing the parameters ... model.sync_weights_to(target_mdodel) Note: Before calling ``sync_weights_to``, parameters of the model must have been initialized. """ args_hash_id = hashlib.md5('{}_{}'.format( id(target_model), decay).encode('utf-8')).hexdigest() has_cached = False try: if self._cached_id == args_hash_id: has_cached = True except AttributeError: has_cached = False if not has_cached: # Can not run _cached program, need create a new program self._cached_id = args_hash_id assert not target_model is self, "cannot copy between identical model" assert isinstance(target_model, Model) assert self.__class__.__name__ == target_model.__class__.__name__, \ "must be the same class for params syncing!" assert (decay >= 0 and decay <= 1) param_pairs = self._get_parameter_pairs(self, target_model) self._cached_sync_weights_program = fluid.Program() with fluid.program_guard(self._cached_sync_weights_program): for (src_var_name, target_var_name) in param_pairs: src_var = fetch_framework_var(src_var_name) target_var = fetch_framework_var(target_var_name) fluid.layers.assign( decay * target_var + (1 - decay) * src_var, target_var) if share_vars_parallel_executor is None: # use fluid.Executor place = fluid.CUDAPlace(0) if machine_info.is_gpu_available( ) else fluid.CPUPlace() self._cached_fluid_executor = fluid.Executor(place) else: # use fluid.ParallelExecutor # specify strategy to make ParallelExecutor run faster exec_strategy = fluid.ExecutionStrategy() exec_strategy.use_experimental_executor = True exec_strategy.num_threads = 4 build_strategy = fluid.BuildStrategy() build_strategy.remove_unnecessary_lock = True with fluid.scope_guard(fluid.global_scope().new_scope()): self._cached_fluid_executor = fluid.ParallelExecutor( use_cuda=machine_info.is_gpu_available(), main_program=self._cached_sync_weights_program, share_vars_from=share_vars_parallel_executor, exec_strategy=exec_strategy, build_strategy=build_strategy, ) if share_vars_parallel_executor is None: self._cached_fluid_executor.run(self._cached_sync_weights_program) else: self._cached_fluid_executor.run(fetch_list=[])
@property @deprecated( deprecated_in='1.2', removed_in='1.3', replace_function='parameters') def parameter_names(self): """Get names of all parameters in this ``Model``. Only parameters created by ``parl.layers`` are included. The order of parameter names is consistent among different instances of the same `Model`. Returns: param_names(list): list of string containing parameter names of all parameters. """ return self.parameters()
[docs] def parameters(self): """Get names of all parameters in this ``Model``. Only parameters created by ``parl.layers`` are included. The order of parameter names is consistent among different instances of the same `Model`. Returns: param_names(list): list of string containing parameter names of all parameters Example: .. code-block:: python model = Model() model.parameters() # output: ['fc0.w0', 'fc0.bias0'] """ try: return self._parameter_names except AttributeError: self._parameter_names = self._get_parameter_names(self) return self._parameter_names
[docs] @deprecated( deprecated_in='1.2', removed_in='1.3', replace_function='get_weights') def get_params(self): """ Return a Python list containing parameters of current model. Returns: parameters: a Python list containing parameters of the current model. """ return self.get_weights()
[docs] @deprecated( deprecated_in='1.2', removed_in='1.3', replace_function='set_weights') def set_params(self, params, gpu_id=None): """Set parameters in the model with params. Args: params (List): List of numpy array . """ self.set_weights(weights=params)
[docs] def get_weights(self): """Returns a Python list containing parameters of current model. Returns: a Python list containing the parameters of current model. """ weights = [] for param_name in self.parameters(): weight = fetch_value(param_name) weights.append(weight) return weights
[docs] def set_weights(self, weights): """Copy parameters from ``set_weights()`` to the model. Args: weights (list): a Python list containing the parameters. """ assert len(weights) == len(self.parameters()), \ 'size of input weights should be same as weights number of current model' try: is_gpu_available = self._is_gpu_available except AttributeError: self._is_gpu_available = machine_info.is_gpu_available() is_gpu_available = self._is_gpu_available for (param_name, weight) in list(zip(self.parameters(), weights)): set_value(param_name, weight, is_gpu_available)
def _get_parameter_names(self, obj): """ Recursively get parameter names in an object. Args: obj (Object): any object Returns: parameter_names (list): all parameter names in this object. """ parameter_names = [] if isinstance(obj, Model): for attr in sorted(obj.__dict__.keys()): val = getattr(obj, attr) parameter_names.extend(self._get_parameter_names(val)) elif isinstance(obj, LayerFunc): for attr in obj.attr_holder.sorted(): if attr: parameter_names.append(attr.name) elif isinstance(obj, tuple) or isinstance(obj, list): for x in obj: parameter_names.extend(self._get_parameter_names(x)) elif isinstance(obj, dict): for x in list(obj.values()): parameter_names.extend(self._get_parameter_names(x)) else: # for any other type, won't be handled. E.g. set pass return parameter_names def _get_parameter_pairs(self, src, target): """ Recursively gets parameters in source model and corresponding parameters in target model. Args: src (`Model`/`LayerFunc`/list/tuple/dict): source object target (`Model`/`LayerFunc`/list/tuple/dict): target object Returns: param_pairs (list of tuple): all parameter names in source model and corresponding parameter names in target model. """ param_pairs = [] if isinstance(src, Model): for attr in src.__dict__: if not attr in target.__dict__: continue src_var = getattr(src, attr) target_var = getattr(target, attr) param_pairs.extend( self._get_parameter_pairs(src_var, target_var)) elif isinstance(src, LayerFunc): src_attrs = src.attr_holder.sorted() target_attrs = target.attr_holder.sorted() assert len(src_attrs) == len(target_attrs), \ "number of ParamAttr between source layer and target layer should be same." for (src_attr, target_attr) in zip(src_attrs, target_attrs): if src_attr: assert target_attr, "ParamAttr between source layer and target layer is inconsistent." param_pairs.append((src_attr.name, target_attr.name)) elif isinstance(src, tuple) or isinstance(src, list): for src_var, target_var in zip(src, target): param_pairs.extend( self._get_parameter_pairs(src_var, target_var)) elif isinstance(src, dict): for k in src.keys(): assert k in target param_pairs.extend( self._get_parameter_pairs(src[k], target[k])) else: # for any other type, won't be handled. E.g. set pass return param_pairs