Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiprocessing data exchange #926

Merged
merged 7 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 68 additions & 39 deletions fedot/core/optimisers/gp_comp/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import timeit
from abc import ABC, abstractmethod
from datetime import datetime
from functools import partial
from random import choice
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, List

from joblib import Parallel, delayed, cpu_count

Expand All @@ -13,12 +14,16 @@
from fedot.core.log import default_log, Log
from fedot.core.optimisers.fitness import Fitness
from fedot.core.optimisers.gp_comp.operators.operator import EvaluationOperator, PopulationT
from fedot.core.optimisers.graph import OptGraph
from fedot.core.optimisers.objective import GraphFunction, ObjectiveFunction
from fedot.core.optimisers.opt_history_objects.individual import Individual
from fedot.core.optimisers.opt_history_objects.individual import GraphEvalResult
from fedot.core.optimisers.timer import Timer, get_forever_timer
from fedot.core.pipelines.verification import verifier_for_task
from fedot.remote.remote_evaluator import RemoteEvaluator

OptionalEvalResult = Optional[GraphEvalResult]
EvalResultsList = List[OptionalEvalResult]


class ObjectiveEvaluationDispatcher(ABC):
"""Builder for evaluation operator.
Expand All @@ -37,6 +42,28 @@ def set_evaluation_callback(self, callback: Optional[GraphFunction]):
that's called on each graph after its evaluation."""
pass

@staticmethod
def split_individuals_to_evaluate(individuals: PopulationT) -> Tuple[PopulationT, PopulationT]:
"""Split individuals sequence to evaluated and skipped ones."""
individuals_to_evaluate = []
individuals_to_skip = []
for ind in individuals:
(individuals_to_evaluate, individuals_to_skip)[ind.fitness.valid].append(ind)
valer1435 marked this conversation as resolved.
Show resolved Hide resolved
return individuals_to_evaluate, individuals_to_skip

@staticmethod
def apply_evaluation_results(individuals: PopulationT,
evaluation_results: EvalResultsList) -> PopulationT:
"""Applies results of evaluation to the evaluated population.
Excludes individuals that weren't evaluated."""
individuals_evaluated = []
for ind, evaluation_results in zip(individuals, evaluation_results):
valer1435 marked this conversation as resolved.
Show resolved Hide resolved
if not evaluation_results:
continue
ind.set_evaluation_result(evaluation_results)
individuals_evaluated.append(ind)
return individuals_evaluated


class MultiprocessingDispatcher(ObjectiveEvaluationDispatcher):
"""Evaluates objective function on population using multiprocessing pool
Expand Down Expand Up @@ -80,45 +107,46 @@ def evaluate_with_cache(self, population: PopulationT) -> Optional[PopulationT]:
return evaluated_population

def evaluate_population(self, individuals: PopulationT) -> Optional[PopulationT]:
individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals)
# Evaluate individuals without valid fitness in parallel.
n_jobs = determine_n_jobs(self._n_jobs, self.logger)

parallel = Parallel(n_jobs=n_jobs, verbose=0, pre_dispatch="2*n_jobs")
eval_inds = parallel(delayed(self.evaluate_single)(ind=ind, logs_initializer=Log().get_parameters())
for ind in individuals)
eval_func = partial(self.evaluate_single, logs_initializer=Log().get_parameters())
evaluation_results = parallel(delayed(eval_func)(ind.graph) for ind in individuals_to_evaluate)
individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, evaluation_results)
# If there were no successful evals then try once again getting at least one,
# even if time limit was reached
successful_evals = list(filter(None, eval_inds))
successful_evals = individuals_evaluated + individuals_to_skip
if not successful_evals:
single = self.evaluate_single(choice(individuals), with_time_limit=False)
if single:
successful_evals = [single]
else:
successful_evals = None

single_ind = choice(individuals)
evaluation_result = eval_func(single_ind.graph, with_time_limit=False)
successful_evals = self.apply_evaluation_results([single_ind], [evaluation_result]) or None
return successful_evals

def evaluate_single(self, ind: Individual, with_time_limit: bool = True,
logs_initializer: Optional[Tuple[int, pathlib.Path]] = None) -> Optional[Individual]:
if ind.fitness.valid:
return ind
def evaluate_single(self, graph: OptGraph, with_time_limit: bool = True, cache_key: Optional[str] = None,
logs_initializer: Optional[Tuple[int, pathlib.Path]] = None) -> OptionalEvalResult:

if with_time_limit and self.timer.is_time_limit_reached():
return None
if logs_initializer is not None:
# in case of multiprocessing run
Log.setup_in_mp(*logs_initializer)
start_time = timeit.default_timer()

graph = self.evaluation_cache.get(ind.uid, ind.graph)
graph = self.evaluation_cache.get(cache_key, graph)

adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
ind_fitness, ind_domain_graph = adapted_evaluate(graph)
ind.set_evaluation_result(ind_fitness, ind_domain_graph)

start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
end_time = timeit.default_timer()
eval_time_iso = datetime.now().isoformat()
valer1435 marked this conversation as resolved.
Show resolved Hide resolved

ind.metadata['computation_time_in_seconds'] = end_time - start_time
ind.metadata['evaluation_time_iso'] = datetime.now().isoformat()
return ind if ind.fitness.valid else None
eval_res = GraphEvalResult(
fitness=fitness, graph=graph, metadata={
'computation_time_in_seconds': end_time - start_time,
'evaluation_time_iso': eval_time_iso
}
)
return eval_res

def _evaluate_graph(self, domain_graph: Graph) -> Tuple[Fitness, Graph]:
fitness = self._objective_eval(domain_graph)
Expand Down Expand Up @@ -165,29 +193,30 @@ def dispatch(self, objective: ObjectiveFunction) -> EvaluationOperator:
return self.evaluate_population

def evaluate_population(self, individuals: PopulationT) -> Optional[PopulationT]:
mapped_evals = list(map(self.evaluate_single, individuals))
evaluated_population = list(filter(None, mapped_evals))
if not evaluated_population:
evaluated_population = None
individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals)
eval_results = list(map(self.evaluate_single, (ind.graph for ind in individuals_to_evaluate)))
individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, eval_results)
evaluated_population = individuals_evaluated + individuals_to_skip or None
return evaluated_population

def evaluate_single(self, ind: Individual, with_time_limit=True) -> Optional[Individual]:
if ind.fitness.valid:
return ind
def evaluate_single(self, graph: OptGraph, with_time_limit=True) -> OptionalEvalResult:
if with_time_limit and self.timer.is_time_limit_reached():
return None

start_time = timeit.default_timer()

adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
ind_fitness, ind_graph = adapted_evaluate(ind.graph)
ind.set_evaluation_result(ind_fitness, ind_graph)

start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
end_time = timeit.default_timer()

ind.metadata['computation_time_in_seconds'] = end_time - start_time
ind.metadata['evaluation_time_iso'] = datetime.now().isoformat()
return ind if ind.fitness.valid else None
eval_time_iso = datetime.now().isoformat()

eval_res = GraphEvalResult(
fitness=fitness, graph=graph, metadata={
'computation_time_in_seconds': end_time - start_time,
'evaluation_time_iso': eval_time_iso
}
)
return eval_res

def _evaluate_graph(self, graph: Graph) -> Tuple[Fitness, Graph]:
fitness = self._objective_eval(graph)
Expand Down
29 changes: 25 additions & 4 deletions fedot/core/optimisers/opt_history_objects/individual.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from copy import deepcopy
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from uuid import uuid4

from fedot.core.log import default_log
Expand All @@ -12,6 +12,7 @@
if TYPE_CHECKING:
from fedot.core.optimisers.opt_history_objects.parent_operator import ParentOperator


INDIVIDUAL_COPY_RESTRICTION_MESSAGE = ('`Individual` instance was copied.\n'
'Normally, you don\'t want to do that to keep uid-individual uniqueness.\n'
'If this happened during the optimization process, this misusage '
Expand All @@ -31,13 +32,23 @@ def set_native_generation(self, native_generation):
if self.native_generation is None:
super().__setattr__('native_generation', native_generation)

def set_evaluation_result(self, fitness: Fitness, updated_graph: Optional[OptGraph] = None):
if self.fitness.valid:
raise ValueError('The individual has valid fitness and can not be evaluated again.')
def _set_fitness_and_graph(self, fitness: Fitness, updated_graph: Optional[OptGraph] = None):
super().__setattr__('fitness', fitness)
if updated_graph is not None:
super().__setattr__('graph', updated_graph)

def set_evaluation_result(self, eval_result: Union[GraphEvalResult, Fitness],
updated_graph: Optional[OptGraph] = None):
if self.fitness.valid:
raise ValueError('The individual has valid fitness and can not be evaluated again.')

if isinstance(eval_result, Fitness):
self._set_fitness_and_graph(eval_result, updated_graph)
return

self._set_fitness_and_graph(eval_result.fitness, eval_result.graph)
self.metadata.update(eval_result.metadata)

@property
def has_native_generation(self) -> bool:
return self.native_generation is not None
Expand Down Expand Up @@ -108,3 +119,13 @@ def __deepcopy__(self, memo):
for k, v in self.__dict__.items():
object.__setattr__(result, k, deepcopy(v, memo))
return result


@dataclass
class GraphEvalResult:
fitness: Fitness
graph: OptGraph
nicl-nno marked this conversation as resolved.
Show resolved Hide resolved
metadata: Dict[str, Any] = field(default_factory=dict)

def __bool__(self):
return self.fitness.valid
5 changes: 3 additions & 2 deletions fedot/core/optimisers/opt_history_objects/opt_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import shutil
from copy import copy
from pathlib import Path
from typing import List, Optional, Sequence, Union, TYPE_CHECKING

Expand Down Expand Up @@ -42,10 +43,10 @@ def is_empty(self) -> bool:
return not self.individuals

def add_to_history(self, individuals: List[Individual]):
self.individuals.append(individuals)
self.individuals.append(copy(individuals))
valer1435 marked this conversation as resolved.
Show resolved Hide resolved

def add_to_archive_history(self, individuals: List[Individual]):
self.archive_history.append(individuals)
self.archive_history.append(copy(individuals))

def to_csv(self, save_dir: Optional[os.PathLike] = None, file: os.PathLike = 'history.csv'):
save_dir = save_dir or default_fedot_data_dir()
Expand Down
3 changes: 2 additions & 1 deletion test/unit/composer/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ def test_ancestor_for_crossover():
assert crossover_result.parents[1].uid == parent_ind_second.uid


def test_newly_generated_history(n_jobs: int = 1):
@pytest.mark.parametrize('n_jobs', [1, 2])
def test_newly_generated_history(n_jobs: int):
project_root_path = str(fedot_project_root())
file_path_train = os.path.join(project_root_path, 'test/data/simple_classification.csv')

Expand Down