Skip to content

Commit

Permalink
Plot GP tree
Browse files Browse the repository at this point in the history
  • Loading branch information
woodRock committed Oct 6, 2024
1 parent e813ab4 commit 6396cd2
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 111 deletions.
Binary file modified code/siamese/figures/tree_0.pdf
Binary file not shown.
Binary file modified code/siamese/figures/tree_1.pdf
Binary file not shown.
Binary file modified code/siamese/figures/tree_2.pdf
Binary file not shown.
Binary file modified code/siamese/figures/tree_3.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_4.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_5.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_6.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_7.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_8.pdf
Binary file not shown.
Binary file added code/siamese/figures/tree_9.pdf
Binary file not shown.
229 changes: 118 additions & 111 deletions code/siamese/gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,120 +4,123 @@
from sklearn.metrics import balanced_accuracy_score
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from functools import partial
import random
from multiprocessing import Pool
from typing import List, Tuple, Callable, Any


# Define primitives that work with numpy arrays and return float arrays
def protectedDiv(left: np.ndarray, right: np.ndarray) -> np.ndarray:
return np.divide(left, right, out=np.ones_like(left), where=right!=0)
def protectedDiv(left, right):
return np.divide(left, right, out=np.ones_like(left, dtype=float), where=right!=0)

def add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x + y
def add(x, y):
return x.astype(float) + y.astype(float)

def sub(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x - y
def sub(x, y):
return x.astype(float) - y.astype(float)

def mul(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x * y
def mul(x, y):
return x.astype(float) * y.astype(float)

def neg(x: np.ndarray) -> np.ndarray:
return -x
def neg(x):
return -x.astype(float)

def sin(x: np.ndarray) -> np.ndarray:
return np.sin(x)
def sin(x):
return np.sin(x.astype(float))

def cos(x: np.ndarray) -> np.ndarray:
return np.cos(x)
def cos(x):
return np.cos(x.astype(float))

def rand101(x: np.ndarray) -> np.ndarray:
def rand101(x):
return np.random.uniform(-1, 1, size=x.shape)

# Function to create the primitive set
def create_pset(n_features: int) -> gp.PrimitiveSet:
pset = gp.PrimitiveSet("MAIN", n_features * 2) # n_features for each pair
pset.addPrimitive(add, 2)
pset.addPrimitive(sub, 2)
pset.addPrimitive(mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(neg, 1)
pset.addPrimitive(sin, 1)
pset.addPrimitive(cos, 1)
pset.addPrimitive(rand101, 1)

# Rename arguments
for i in range(n_features):
pset.renameArguments(**{f'ARG{i}': f'x1_{i}'})
pset.renameArguments(**{f'ARG{i+n_features}': f'x2_{i}'})

return pset
pset = gp.PrimitiveSet("MAIN", 2) # 2 inputs for pairwise comparison
pset.addPrimitive(add, 2)
pset.addPrimitive(sub, 2)
pset.addPrimitive(mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(neg, 1)
pset.addPrimitive(sin, 1)
pset.addPrimitive(cos, 1)
pset.addPrimitive(rand101, 1)
pset.renameArguments(ARG0='x1')
pset.renameArguments(ARG1='x2')

creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

NUM_TREES: int = 10
# Define the number of trees per individual
NUM_TREES = 5

def create_toolbox(pset: gp.PrimitiveSet) -> base.Toolbox:
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=3)
toolbox.register("tree", tools.initIterate, gp.PrimitiveTree, toolbox.expr)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.tree, n=NUM_TREES)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
return toolbox
# Toolbox initialization
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=6)
toolbox.register("tree", tools.initIterate, gp.PrimitiveTree, toolbox.expr)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.tree, n=NUM_TREES)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

def compile_trees(individual: List[gp.PrimitiveTree], pset: gp.PrimitiveSet) -> List[Callable[[np.ndarray], np.ndarray]]:
# Compile function
def compile_trees(individual):
return [gp.compile(expr, pset) for expr in individual]

@torch.jit.script
def contrastive_loss(output1: torch.Tensor, output2: torch.Tensor, label: float, margin: float = 1.0) -> torch.Tensor:
# Contrastive loss function
def contrastive_loss(output1, output2, label, margin=1.0):
euclidean_distance = F.pairwise_distance(output1.unsqueeze(0), output2.unsqueeze(0))
loss = label * torch.pow(euclidean_distance, 2) + (1 - label) * torch.pow(torch.clamp(margin - euclidean_distance, min=0.0), 2)
return torch.mean(loss)

def evalContrastive(individual: List[gp.PrimitiveTree], data: List[Tuple[np.ndarray, np.ndarray, float]], pset: gp.PrimitiveSet, alpha: float = 0.5) -> Tuple[float]:
trees = compile_trees(individual, pset)
total_loss = 0.0
similar_loss = label * torch.pow(euclidean_distance, 2)
dissimilar_loss = (1 - label) * torch.pow(torch.clamp(margin - euclidean_distance, min=0.0), 2)
loss = torch.mean(similar_loss + dissimilar_loss)
return min(1, max(loss.item(), 0.0)) # Ensure non-negative output

# Evaluation function
def evalContrastive(individual, data, alpha=0.5):
trees = compile_trees(individual)
total_loss = 0
predictions = []
labels = []

for x1, x2, label in data:
combined_input = np.concatenate((x1, x2))
outputs = torch.tensor([np.mean(tree(*combined_input)) for tree in trees], dtype=torch.float32)
reverse_input = np.concatenate((x2, x1))
reverse_outputs = torch.tensor([np.mean(tree(*reverse_input)) for tree in trees], dtype=torch.float32)

loss = contrastive_loss(outputs, reverse_outputs, label)
total_loss += loss.item()
outputs1 = [np.mean(tree(x1.astype(float), x2.astype(float))) for tree in trees]
outputs2 = [np.mean(tree(x2.astype(float), x1.astype(float))) for tree in trees]
output1 = torch.tensor(outputs1, dtype=torch.float32)
output2 = torch.tensor(outputs2, dtype=torch.float32)
loss = contrastive_loss(output1, output2, label)
total_loss += loss

euclidean_distance = F.pairwise_distance(outputs.unsqueeze(0), reverse_outputs.unsqueeze(0))
pred = 0 if euclidean_distance < 0.5 else 1
euclidean_distance = F.pairwise_distance(output1.unsqueeze(0), output2.unsqueeze(0))
pred = 0 if euclidean_distance < 0.5 else 1 # Adjust threshold as needed
predictions.append(pred)
labels.append(label)

avg_loss = total_loss / len(data)
balanced_accuracy = balanced_accuracy_score(labels, predictions)
fitness = alpha * (1 - balanced_accuracy) + (1 - alpha) * avg_loss
loss_balanced = 1 - balanced_accuracy
fitness = alpha * loss_balanced + (1 - alpha) * avg_loss # Combine accuracy and loss
return (fitness,)

def customCrossover(ind1: List[gp.PrimitiveTree], ind2: List[gp.PrimitiveTree]) -> Tuple[List[gp.PrimitiveTree], List[gp.PrimitiveTree]]:
# Custom crossover function
def customCrossover(ind1, ind2):
for i in range(len(ind1)):
if random.random() < 0.5:
ind1[i], ind2[i] = gp.cxOnePoint(ind1[i], ind2[i])
return ind1, ind2

def customMutate(individual: List[gp.PrimitiveTree], expr: Callable, pset: gp.PrimitiveSet) -> Tuple[List[gp.PrimitiveTree]]:
# Custom mutation function
def customMutate(individual):
for i in range(len(individual)):
if random.random() < 0.2:
individual[i], = gp.mutUniform(individual[i], expr=expr, pset=pset)
if random.random() < 0.2: # 20% chance to mutate each tree
individual[i], = gp.mutUniform(individual[i], expr=toolbox.expr, pset=pset)
return individual,

def eaSimpleWithElitism(population: List[Any], toolbox: base.Toolbox, cxpb: float, mutpb: float, ngen: int,
stats: tools.Statistics, halloffame: tools.HallOfFame, verbose: bool, elite_size: int,
train_dataset: List[Tuple[np.ndarray, np.ndarray, float]],
val_dataset: List[Tuple[np.ndarray, np.ndarray, float]],
pset: gp.PrimitiveSet) -> Tuple[List[Any], tools.Logbook]:
# Genetic operators
toolbox.register("mate", customCrossover)
toolbox.register("mutate", customMutate)
toolbox.register("select", tools.selTournament, tournsize=3)


def eaSimpleWithElitism(population, toolbox, cxpb, mutpb, ngen, stats=None,
halloffame=None, verbose=__debug__, elite_size=1):
logbook = tools.Logbook()
logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

Expand All @@ -127,6 +130,8 @@ def eaSimpleWithElitism(population: List[Any], toolbox: base.Toolbox, cxpb: floa
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit

if halloffame is None:
raise ValueError("halloffame parameter must not be None")
halloffame.update(population)

record = stats.compile(population) if stats else {}
Expand Down Expand Up @@ -165,35 +170,38 @@ def eaSimpleWithElitism(population: List[Any], toolbox: base.Toolbox, cxpb: floa

# Print the best (lowest) fitness in this generation
best_fit = halloffame[0].fitness.values[0]
train_balanced_accuracy = evaluate_best_individual(halloffame[0], train_dataset, pset)
val_balanced_accuracy = evaluate_best_individual(halloffame[0], val_dataset, pset)
print(f"Generation {gen}: Best Fitness = {best_fit:.4f}, Balanced Accuracy - Train: {train_balanced_accuracy:.4f} Validation: {val_balanced_accuracy:.4f}")
print(f"Generation {gen}: Best Fitness = {best_fit}")

return population, logbook

def evaluate_best_individual(individual: List[gp.PrimitiveTree], data: List[Tuple[np.ndarray, np.ndarray, float]], pset: gp.PrimitiveSet) -> float:
trees = compile_trees(individual, pset)

def evaluate_best_individual(individual, data):
trees = compile_trees(individual)
predictions = []
labels = []

for x1, x2, label in data:
combined_input = np.concatenate((x1, x2))
outputs = torch.tensor([np.mean(tree(*combined_input)) for tree in trees], dtype=torch.float32)
reverse_input = np.concatenate((x2, x1))
reverse_outputs = torch.tensor([np.mean(tree(*reverse_input)) for tree in trees], dtype=torch.float32)
outputs1 = [np.mean(tree(x1.astype(float), x2.astype(float))) for tree in trees]
outputs2 = [np.mean(tree(x2.astype(float), x1.astype(float))) for tree in trees]
output1 = torch.tensor(outputs1, dtype=torch.float32)
output2 = torch.tensor(outputs2, dtype=torch.float32)

euclidean_distance = F.pairwise_distance(outputs.unsqueeze(0), reverse_outputs.unsqueeze(0))
pred = 0 if euclidean_distance < 0.5 else 1
euclidean_distance = F.pairwise_distance(output1.unsqueeze(0), output2.unsqueeze(0))
pred = 0 if euclidean_distance < 0.5 else 1 # Adjust threshold as needed
predictions.append(pred)
labels.append(label)

return balanced_accuracy_score(labels, predictions)
balanced_accuracy = balanced_accuracy_score(labels, predictions)
return balanced_accuracy


def main() -> Tuple[List[Any], tools.Logbook, tools.HallOfFame]:
def main():
# Load and preprocess your data
from util import preprocess_dataset
train_loader, val_loader = preprocess_dataset(dataset="instance-recognition", batch_size=64)

def loader_to_list(loader: torch.utils.data.DataLoader) -> List[Tuple[np.ndarray, np.ndarray, float]]:
# Convert data loaders to list format for GP
def loader_to_list(loader):
data_list = []
for x1, x2, y in loader:
for i in range(len(y)):
Expand All @@ -203,50 +211,46 @@ def loader_to_list(loader: torch.utils.data.DataLoader) -> List[Tuple[np.ndarray
train_data = loader_to_list(train_loader)
val_data = loader_to_list(val_loader)

# Get the number of features
n_features = train_data[0][0].shape[0]

# Create primitive set and toolbox
pset = create_pset(n_features)
toolbox = create_toolbox(pset)

toolbox.register("evaluate", evalContrastive, data=train_data, pset=pset)
toolbox.register("mate", customCrossover)
toolbox.register("mutate", customMutate, expr=toolbox.expr, pset=pset)
toolbox.register("select", tools.selTournament, tournsize=3)
# Register the evaluation function with the training data
toolbox.register("evaluate", evalContrastive, data=train_data)

# GP parameters
pop_size = 100
generations = 50
elite_size = 5
generations = 5
elite_size = 5 # Number of elite individuals to preserve

# Initialize population
pop = toolbox.population(n=pop_size)
hof = tools.HallOfFame(elite_size)
hof = tools.HallOfFame(elite_size) # This will now correctly store the individuals with lowest fitness
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)

with Pool() as pool:
toolbox.register("map", pool.map)
pop, log = eaSimpleWithElitism(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=generations,
stats=stats, halloffame=hof, verbose=True, elite_size=elite_size,
train_dataset=train_data, val_dataset=val_data, pset=pset)
pool = Pool()
toolbox.register("map", pool.map)

# Run GP with elitism
pop, log = eaSimpleWithElitism(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=generations,
stats=stats, halloffame=hof, verbose=True, elite_size=elite_size)

# Evaluate best individual on validation set
best_individual = hof[0]
best_fitness = evalContrastive(best_individual, val_data, pset)
best_fitness = evalContrastive(best_individual, val_data)
print(f"Best individual fitness on validation set: {best_fitness[0]}")

balanced_accuracy = evaluate_best_individual(best_individual, val_data, pset)
# Calculate and print the balanced accuracy score for the best individual
balanced_accuracy = evaluate_best_individual(best_individual, val_data)
print(f"Balanced Accuracy Score of the best individual on validation set: {balanced_accuracy:.4f}")

print(f"Plotting the GP trees")
# Source: https://deap.readthedocs.io/en/master/tutorials/advanced/gp.html#plotting-trees
import pygraphviz as pgv


print(f"Printing the GP trees")
for tree_idx in range(NUM_TREES):
nodes, edges, labels = gp.graph(best_individual[tree_idx])

### Graphviz Section ###
import pygraphviz as pgv

g = pgv.AGraph()
g.add_nodes_from(nodes)
g.add_edges_from(edges)
Expand All @@ -257,7 +261,10 @@ def loader_to_list(loader: torch.utils.data.DataLoader) -> List[Tuple[np.ndarray
n.attr["label"] = labels[i]

g.draw(f"figures/tree_{tree_idx}.pdf")


pool.close()
pool.join()

return pop, log, hof

if __name__ == "__main__":
Expand Down

0 comments on commit 6396cd2

Please sign in to comment.