diff --git a/src/Food.py b/src/Food.py deleted file mode 100644 index da34826..0000000 --- a/src/Food.py +++ /dev/null @@ -1,16 +0,0 @@ -class Food: - x = 0 - y = 0 - STEP = 24 - - def __init__(self, step, x=0, y=0): - self.x = x * self.STEP; - self.y = y * self.STEP; - self.step = step - - def draw(self, surface, image): - surface.blit(image, (self.x, self.y)) - - def rellocate(self, x=0, y=0): - self.x = x * self.STEP; - self.y = y * self.STEP; \ No newline at end of file diff --git a/src/Game.py b/src/Game.py deleted file mode 100644 index f7a69b5..0000000 --- a/src/Game.py +++ /dev/null @@ -1,228 +0,0 @@ -from pygame.locals import * -import pygame -import time -from Snake import Snake -from Food import Food -import random -from math import sqrt -from math import floor -import operator -from Neuron import visitedNeurons -import sys - -def checkCollision(step, x1, y1, x2, y2): - if x1 >= x2 and x1 < x2 + step and y1 >= y2 and y1 < y2 + step: - return True - - return False - -def getDistance(x1, y1, x2, y2): - return sqrt((x1 - x2)*(x1 - x2) + (y1 - y2)*(y1 - y2)) - -class Game: - WIDTH = 984 - HEIGHT = 984 - STEP = 24 - MAX_SNAKES = 20 # make sure it's even - MAX_FOOD = 20 - MAX_LOOPS_PER_RUN = 500 - TIME_SLEEP = 0.001 - MAX_COORD = WIDTH / STEP - 1 - INITIAL_LENGTH = 4 - - run = 1 - start_time = 0 - min_fitness = 100 - mean_fitness = 0 - overall_min_fitness = 100 - - snakes_array = [] - food_array = [] - - loop = 0 - - def __init__(self): - self._running = True - self._display_surf = None - self._snake_surf = None - self._food_surf = None - self._start_time = time.time() - - for i in range(self.MAX_SNAKES): - self.snakes_array.append(Snake(self.INITIAL_LENGTH, self.STEP, i + 1)) - self.liveSnakes = self.MAX_SNAKES - - for i in range(self.MAX_FOOD): - self.food_array.append(Food(self.STEP, random.randint(0, self.MAX_COORD), random.randint(0, self.MAX_COORD))) - - - def on_init(self): - pygame.init() - self._display_surf = pygame.display.set_mode((self.WIDTH, self.HEIGHT), pygame.HWSURFACE) - - pygame.display.set_caption('Snake Learning AI') - self._running = True - self._snake_surf = pygame.image.load("../assets/block.png").convert() - self._food_surf = pygame.image.load("../assets/food.png").convert() - - def on_event(self, event): - if event.type == QUIT: - self._running = False - - def on_loop(self): - self.loop += 1 - for snake in self.snakes_array: - if self.liveSnakes <= 0: - self.resetLevel() - - if snake.isDead == True: - continue - - snake.update() - - idxMinDistance = 0 - minDistance = self.WIDTH - i = 0 - for food in self.food_array: - dist = getDistance(snake.x[0], snake.y[0], food.x, food.y) - if dist < minDistance: - idxMinDistance = i - minDistance = dist - i += 1 - - if checkCollision(self.STEP, snake.x[0], snake.y[0], food.x, food.y): - # Found food - snake.foundFood() - food.rellocate(random.randint(0, self.MAX_COORD), random.randint(0, self.MAX_COORD)) - #print("Got food! - Food Count:", snake.foodCount) - - snake.setInputValues(self.food_array[idxMinDistance].x, self.food_array[idxMinDistance].y) - - # Wall collision - if snake.x[0] > self.WIDTH or snake.x[0] < 0 or snake.y[0] > self.HEIGHT or snake.y[0] < 0: - snake.isDead = True - self.liveSnakes -= 1 - snake.calcFitness(-1 + time.time()-self.start_time) - self.mean_fitness += snake.fitness - - if snake.fitness < self.min_fitness: - self.min_fitness = snake.fitness - - #print("Wall collision! - Food Count:", snake.foodCount, "/ Fitness:", snake.fitness) - - # Self collision - if snake.collidedOnSelf() == True: - snake.isDead = True - self.liveSnakes -= 1 - snake.calcFitness(-3 + time.time()-self.start_time) - self.mean_fitness += snake.fitness - - if snake.fitness < self.min_fitness: - self.min_fitness = snake.fitness - - #print("Self collision! - Food Count:", snake.foodCount, "/ Fitness:", snake.fitness) - - if self.loop == self.MAX_LOOPS_PER_RUN: - self.resetLevel() - - def on_render(self): - self._display_surf.fill((0, 0, 0)) - for snake in self.snakes_array: - snake.draw(self._display_surf, self._snake_surf) - for f in self.food_array: - f.draw(self._display_surf, self._food_surf) - - pygame.display.flip() - - #print("Live snakes: ", self.liveSnakes) - - def on_cleanup(self): - print("") - print("Elapsed time:", time.time() - self._start_time, "seconds.") - print("") - pygame.display.quit() - pygame.quit() - - def on_execute(self): - if self.on_init() == False: - self._running = False - - self.start_time = time.time() - while self._running: - pygame.event.pump() - keys = pygame.key.get_pressed() - - for event in pygame.event.get(): - if event.type == pygame.QUIT: - self._running = False - self.on_cleanup() - return - elif event.type == pygame.KEYDOWN: - if event.key == pygame.K_ESCAPE: - self._running = False - self.on_cleanup() - """ - if event.key == pygame.K_RIGHT: - for snake in self.snakes_array: - snake.turn(dir = 0) - elif event.key == pygame.K_LEFT: - for snake in self.snakes_array: - snake.turn(dir = 1) - """ - - self.on_loop() - self.on_render() - - time.sleep( self.TIME_SLEEP ) - - def resetLevel(self): - fitness = {} - for idx, snake in enumerate(self.snakes_array): - if snake.isDead == False: - snake.calcFitness(time.time()-self.start_time) - if self.min_fitness > snake.fitness: - self.min_fitness = snake.fitness - self.mean_fitness += snake.fitness - - fitness[idx] = snake.fitness - - fitness = sorted(fitness.items(), key=operator.itemgetter(1)) - new_list = [] - for i in range(self.MAX_SNAKES): - idx = fitness[i][0] - new_list.append(self.snakes_array[idx]) - - N = floor(self.MAX_SNAKES / 2) - if N == 0: - N = 1 - for i in range(N): - goodBoy = new_list[i] - if self.MAX_SNAKES > 1: - badBoy = new_list[i + N] - badBoy.setSnakeNetwork(goodBoy.getSnakeNetwork().copy()) - goodBoy.getSnakeNetwork().mutateChangeWeights() - goodBoy.getSnakeNetwork().mutate() - - for snake in self.snakes_array: - snake.reset() - - self.mean_fitness /= self.MAX_SNAKES - if self.overall_min_fitness >= self.min_fitness: - self.overall_min_fitness = self.min_fitness - - print("") - print("Resetting level...") - print("Gen", self.run, "info:") - print("Time elapsed:", time.time() - self.start_time, "seconds") - print("Min fitness gen: ", self.min_fitness, "/ Mean fitness: ", self.mean_fitness) - print("Overall Min Fitness:", self.overall_min_fitness) - print("") - - visitedNeurons.clear() - self.loop = 0 - self.liveSnakes = self.MAX_SNAKES - self.run += 1 - self.min_fitness = 100 - self.mean_fitness = 0 - self.start_time = time.time() - diff --git a/src/Network.py b/src/Network.py deleted file mode 100644 index 51b9e33..0000000 --- a/src/Network.py +++ /dev/null @@ -1,223 +0,0 @@ -# Neural network consisting of 1 output neuron and -# 2 input neurons (sensors) -# The hidden layer(s) is(are) gonna be arranged by a GA - -from Neuron import Neuron -from Neuron import visitedNeurons -from Synapse import Synapse -import random - -class Network: - MUTATION_RATE = 0.5 - INNER_MUTATION_RATE = 0.3 - EPSILON = 0.1 - - def __init__(self, mutRate=1): - self.sensor_list = [] - self.inputNode_list = [] - self.outputNode_list = [] - self.inputOutputNode_list = [] - - self.actuator = Neuron() - - self.outputNode_list.append(self.actuator) - self.MUTATION_RATE = mutRate - - def propagate(self): - visitedNeurons.clear() - - prop = self.actuator.activate() - return prop - - def setInputNodeList(self, l): - self.inputNode_list = l - - def setOutputNodeList(self, l): - self.outputNode_list = l - - def setInputOutputNodeList(self, l): - self.inputOutputNode_list = l - - def setSensorList(self, l): - self.sensor_list = l - self.inputNode_list += l - - def setActuator(self, n): - self.actuator = n - self.outputNode_list.append(n) - - def setInputValues(self, l): - if len(self.sensor_list) != len(l): - print("########### ERROR setInputValues") - - i = 0 - for sensor in self.sensor_list: - sensor.setOutput(l[i]) - i += 1 - - def getRandomNeuron(self, l): - idx = random.randint(0, len(l) - 1) - return l[idx] - - def mutate(self): - r = random.random() - if r <= self.MUTATION_RATE: - self.mutateChangeWeights() - - if r <= self.INNER_MUTATION_RATE: - mut_type = random.randint(0, 3) - if mut_type == 0: - self.mutateAddNeuron() - elif mut_type == 1: - self.mutateRemoveNeuron() - elif mut_type == 2: - self.mutateChangeWeights() - elif mut_type == 3: - self.mutateAddSynapse() - - def mutateChangeWeights(self): - if len(self.inputOutputNode_list) > 0: - tmp = self.getRandomNeuron(self.inputOutputNode_list) - for syn in tmp.getInputList(): - syn.setWeight(random.random()) - - def mutateRemoveNeuron(self): - if len(self.inputOutputNode_list) > 0: - tmp = self.getRandomNeuron(self.inputOutputNode_list) - for syn in tmp.getOutputList(): - syn.getDestinationNeuron().getInputList().remove(syn) - for syn in tmp.getInputList(): - syn.getSourceNeuron().getOutputList().remove(syn) - - tmp.getInputList().clear() - tmp.getOutputList().clear() - - self.outputNode_list.remove(tmp) - self.inputNode_list.remove(tmp) - self.inputOutputNode_list.remove(tmp) - - def mutateAddNeuron(self): - tmp = Neuron() - - src = self.getRandomNeuron(self.inputNode_list) - synIn = Synapse(src, tmp, self.EPSILON * random.random()) - src.addOutput(synIn) - tmp.addInput(synIn) - - dst = self.getRandomNeuron(self.outputNode_list) - synOut = Synapse(tmp, dst, self.EPSILON * random.random()) - tmp.addOutput(synOut) - dst.addInput(synOut) - - self.inputNode_list.append(tmp) - self.outputNode_list.append(tmp) - self.inputOutputNode_list.append(tmp) - - def mutateAddSynapse(self): - src = self.getRandomNeuron(self.inputNode_list) - dst = src - while src == dst: - dst = self.getRandomNeuron(self.outputNode_list) - - syn = Synapse(src, dst, self.EPSILON * random.random()) - src.addOutput(syn) - dst.addInput(syn) - - def copy(self): - q = [] - m = {} - copyNetwork = Network() - - originalActuator = self.actuator - - copyActuator = Neuron() - copyNetwork.setActuator(copyActuator) - - q.append(originalActuator) - m[originalActuator] = copyActuator - copyActuator.k = originalActuator.k - - copyioNodes = [] - copyiNodes = [] - copyoNodes = [] - copySensors = [] - - for sensor in self.sensor_list: - tmp = Neuron() - m[sensor] = tmp - q.append(sensor) - copySensors.append(tmp) - copyiNodes.append(tmp) - - while len(q) > 0: - originalNode = q.pop() - - iSyns = originalNode.getInputList() - oSyns = originalNode.getOutputList() - - copyNode = m[originalNode] - copyNode.k = originalNode.k - - for syn in iSyns: - inputOriginal = syn.getSourceNeuron() - - if inputOriginal in list(m.keys()): - inputCopy = m[inputOriginal] - - exists = False - for n in inputCopy.getOutputList(): - if n.getSourceNeuron() == inputCopy and n.getDestinationNeuron() == copyNode: - exists = True - break - if exists == False: - copySyn = Synapse(inputCopy, copyNode, syn.getWeight()) - copyNode.addInput(copySyn) - inputCopy.addOutput(copySyn) - else: - inputCopy = Neuron() - inputCopy.k = inputOriginal.k - copyioNodes.append(inputCopy) - - copySyn = Synapse(inputCopy, copyNode, syn.getWeight()) - inputCopy.addOutput(copySyn) - copyNode.addInput(copySyn) - m[inputOriginal] = inputCopy - q.append(inputOriginal) - - for syn in oSyns: - outputOriginal = syn.getDestinationNeuron() - - if outputOriginal in list(m.keys()): - outputCopy = m[outputOriginal] - - exists = False - for n in outputCopy.getOutputList(): - if n.getSourceNeuron() == copyNode and n.getDestinationNeuron() == outputCopy: - exists = True - break - if exists == False: - copySyn = Synapse(copyNode, outputCopy, syn.getWeight()) - copyNode.addOutput(copySyn) - outputCopy.addInput(copySyn) - else: - outputCopy = Neuron() - outputCopy.k = outputOriginal.k - copyioNodes.append(outputCopy) - - copySyn = Synapse(copyNode, outputCopy, syn.getWeight()) - outputCopy.addInput(copySyn) - copyNode.addOutput(copySyn) - m[outputOriginal] = outputCopy - q.append(outputOriginal) - - for n in copyioNodes: - copyiNodes.append(n) - copyoNodes.append(n) - copyoNodes.append(copyActuator) - - copyNetwork.setSensorList(copySensors) - copyNetwork.setInputNodeList(copyiNodes) - copyNetwork.setOutputNodeList(copyoNodes) - copyNetwork.setInputOutputNodeList(copyioNodes) - - return copyNetwork diff --git a/src/Neuron.py b/src/Neuron.py deleted file mode 100644 index 0986d43..0000000 --- a/src/Neuron.py +++ /dev/null @@ -1,64 +0,0 @@ -import random -import math -import Network - -def sigmoid(x): - return 1 / (1 + math.exp(-x)) - -visitedNeurons = [] - -class Neuron: - def __init__(self): - self.z = 0 - self.bias = random.gauss(0, 1) - self.k = random.random() - - self.input_list = [] - self.output_list = [] - self.bias = 0 - self.y = 0 - - def activate(self, m=0): - m +=1 - #print("m =", m) - - if len(self.input_list) == 0: - return self.y - - self.z = 0 - for syn in self.input_list: - #print("Input List:", self.input_list) - if syn.getSourceNeuron() in visitedNeurons: - continue - - visitedNeurons.append(syn.getSourceNeuron()) - #print("Source Neuron:", syn.getSourceNeuron()) - - self.z += syn.getWeight() * syn.getSourceNeuron().activate(m) - #print("Visited Neurons:", visitedNeurons) - - return sigmoid(Network.Network.EPSILON * self.z) - - def addInput(self, s): - self.input_list.append(s) - - def addOutput(self, s): - self.output_list.append(s) - - def getInputList(self): - return self.input_list - - def getOutputList(self): - return self.output_list - - def getOutput(self): - return self.y - - def setOutput(self, val): - self.y = val - - def __key(self): - return (self.k) - - def __hash__(self): - return hash(self.__key()) diff --git a/src/Snake.py b/src/Snake.py deleted file mode 100644 index 3520451..0000000 --- a/src/Snake.py +++ /dev/null @@ -1,161 +0,0 @@ -from enum import Enum -import Game -from Network import Network -from Neuron import Neuron -import random - -class Direction(Enum): - UP = 0 - RIGHT = 1 - DOWN = 2 - LEFT = 3 - -class Snake: - STEP = 24 - PROP_CCW = 0.45 - PROP_CW = 0.65 - MUL = 2 - - updateCountMax = 2 - - def __init__(self, length, step, snakeN=1): - self.x = [] - self.y = [] - self.length = length - self.INITIAL_LENGTH = length - self.DIRECTION = Direction.RIGHT - - self.updateCount = 0 - self.STEP = step / self.MUL - for i in range(length): - xN = random.randint(0, Game.Game.MAX_COORD) - yN = random.randint(0, Game.Game.MAX_COORD) - self.x.append(self.STEP * self.MUL * (xN - i)) - self.y.append(self.STEP * self.MUL * yN) - - self.NETWORK = Network(mutRate=0.7) - self.snakeNumber = snakeN - - self.foodCount = 0 - - self.isDead = False - - self.fitness = 0 - self.input_list = [] - self.setupNetwork() - - self.snakeNumber = 1 - - # dir - # 0 -> Clockwise - # 1 -> Counterclockwise - def turn(self, dir = 0): - numDir = len(list(Direction)) - - if dir == 0: - nextDirValue = (self.DIRECTION.value + 1) % numDir - elif dir == 1: - if self.DIRECTION.value == 0: - nextDirValue = 3 - else: - nextDirValue = (self.DIRECTION.value - 1) - - self.DIRECTION = Direction(nextDirValue) - - def update(self): - self.updateCount = self.updateCount + 1 - if self.updateCount >= self.updateCountMax: - self.NETWORK.setInputValues([self.inputX, self.inputY, self.wallDistance]) - prop = self.NETWORK.propagate() - - #print("Snake #", self.snakeNumber, "prop = ", prop) - if prop <= self.PROP_CCW: - self.turn(dir=1) - elif prop >= self.PROP_CW: - self.turn(dir=0) - - for i in range(self.length-1, 0, -1): - self.x[i] = self.x[i-1] - self.y[i] = self.y[i-1] - - if self.DIRECTION == Direction.UP: - self.y[0] -= self.STEP - elif self.DIRECTION == Direction.RIGHT: - self.x[0] += self.STEP - elif self.DIRECTION == Direction.DOWN: - self.y[0] += self.STEP - elif self.DIRECTION == Direction.LEFT: - self.x[0] -= self.STEP - - self.updateCount = 0 - - def draw(self, surface, image): - if self.isDead == True: - return - - for i in range(self.length): - surface.blit(image, (self.x[i], self.y[i])) - - def collidedOnSelf(self): - x1 = self.x[0] - y1 = self.y[0] - for i in range(1, self.length): - if Game.checkCollision(self.STEP, x1, y1, self.x[i], self.y[i]) == True: - return True - - return False - - def foundFood(self): - i = self.length - self.length += 1 - self.foodCount += 1 - - self.x.append(self.x[i-1]) - self.y.append(self.y[i-1]) - - def setSnakeNetwork(self, net): - self.NETWORK = net - - def getSnakeNetwork(self): - return self.NETWORK - - def setupNetwork(self): - self.input_list.append(Neuron()) - self.input_list.append(Neuron()) - self.input_list.append(Neuron()) - - self.NETWORK.setSensorList(self.input_list) - self.NETWORK.setInputValues([Game.Game.WIDTH, Game.Game.HEIGHT, Game.Game.WIDTH - self.STEP]) - - - def setInputValues(self, x, y): - # Send the distance in X and Y as inputs to the Network - self.inputX = x - self.x[0] - self.inputY = y - self.y[0] - if self.DIRECTION == Direction.UP: - self.wallDistance = self.y[0] - elif self.DIRECTION == Direction.RIGHT: - self.wallDistance = Game.Game.WIDTH - self.x[0] - elif self.DIRECTION == Direction.DOWN: - self.wallDistance = Game.Game.HEIGHT - self.y[0] - elif self.DIRECTION == Direction.LEFT: - self.wallDistance = self.x[0] - - def calcFitness(self, penalty=0): - self.fitness = 10 / ( 0.5 + self.foodCount ) + 1 / penalty - if self.fitness <= 0: - self.fitness = 100 # Penalty - - def reset(self): - self.isDead = False; - self.x = []; - self.y = []; - self.length = self.INITIAL_LENGTH; - for i in range(self.length): - xN = random.randint(0, Game.Game.MAX_COORD) - yN = random.randint(0, Game.Game.MAX_COORD) - self.x.append(self.STEP * self.MUL * (xN - i)) - self.y.append(self.STEP * self.MUL * yN) - - self.updateCount = 0; - self.foodCount = 0; diff --git a/src/Synapse.py b/src/Synapse.py deleted file mode 100644 index 29a63e3..0000000 --- a/src/Synapse.py +++ /dev/null @@ -1,23 +0,0 @@ -class Synapse: - def __init__(self, sourceNeuron, destinationNeuron, weight): - self.sourceNeuron = sourceNeuron - self.destinationNeuron = destinationNeuron - self.weight = weight - - def getSourceNeuron(self): - return self.sourceNeuron - - def getDestinationNeuron(self): - return self.destinationNeuron - - def getWeight(self): - return self.weight - - def setWeight(self, w): - self.weight = w - - def setSourceNeuron(self, s): - self.sourceNeuron = s - - def setDestinationNeuron(self, d): - self.destinationNeuron = d \ No newline at end of file diff --git a/src/__main__.py b/src/__main__.py index 68a5324..b7ad073 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,5 +1,5 @@ -from Game import Game +from src.game import Game if __name__ == "__main__": - game = Game() - game.on_execute() \ No newline at end of file + game = Game() + game.on_execute() diff --git a/assets/block.png b/src/assets/block.png similarity index 100% rename from assets/block.png rename to src/assets/block.png diff --git a/assets/food.png b/src/assets/food.png similarity index 100% rename from assets/food.png rename to src/assets/food.png diff --git a/src/food.py b/src/food.py new file mode 100644 index 0000000..7b8ec83 --- /dev/null +++ b/src/food.py @@ -0,0 +1,16 @@ +class Food: + x = 0 + y = 0 + STEP = 24 + + def __init__(self, step, x=0, y=0): + self.x = x * self.STEP + self.y = y * self.STEP + self.step = step + + def draw(self, surface, image): + surface.blit(image, (self.x, self.y)) + + def rellocate(self, x=0, y=0): + self.x = x * self.STEP + self.y = y * self.STEP diff --git a/src/game.py b/src/game.py new file mode 100644 index 0000000..061b41e --- /dev/null +++ b/src/game.py @@ -0,0 +1,231 @@ +import operator +import os +import random +import time +from math import floor +from math import sqrt + +import pygame +from pygame.locals import * + +from src.food import Food +from src.neuron import visitedNeurons +from src.snake import Snake + + +def check_collision(step, x1, y1, x2, y2): + return (x2 <= x1 < x2 + step) and (y2 <= y1 < y2 + step) + + +def get_distance(x1, y1, x2, y2): + return sqrt((x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2)) + + +CUR_DIR = os.path.dirname(__file__) + + +class Game: + WIDTH = 984 + HEIGHT = 984 + STEP = 24 + MAX_SNAKES = 20 # make sure it's even + MAX_FOOD = 20 + MAX_LOOPS_PER_RUN = 500 + TIME_SLEEP = 0.001 + MAX_COORD = WIDTH / STEP - 1 + INITIAL_LENGTH = 4 + + run = 1 + start_time = 0 + min_fitness = 100 + mean_fitness = 0 + overall_min_fitness = 100 + + snakes_array = [] + food_array = [] + + loop = 0 + + def __init__(self): + self._running = True + self._display_surf = None + self._snake_surf = None + self._food_surf = None + self._start_time = time.time() + + for i in range(self.MAX_SNAKES): + self.snakes_array.append(Snake(self.INITIAL_LENGTH, self.STEP, i + 1)) + self.liveSnakes = self.MAX_SNAKES + + for i in range(self.MAX_FOOD): + self.food_array.append( + Food(self.STEP, random.randint(0, self.MAX_COORD), random.randint(0, self.MAX_COORD))) + + def on_init(self): + pygame.init() + self._display_surf = pygame.display.set_mode((self.WIDTH, self.HEIGHT), pygame.HWSURFACE) + + pygame.display.set_caption('Snake Learning AI') + self._running = True + self._snake_surf = pygame.image.load(os.path.join(CUR_DIR, "assets", "block.png")).convert() + self._food_surf = pygame.image.load(os.path.join(CUR_DIR, "assets", "food.png")).convert() + + def on_event(self, event): + if event.type == QUIT: + self._running = False + + def on_loop(self): + self.loop += 1 + for snake in self.snakes_array: + if self.liveSnakes <= 0: + self.reset_level() + + if snake.isDead: + continue + + snake.update() + + idx_min_distance = 0 + min_distance = self.WIDTH + i = 0 + for food in self.food_array: + dist = get_distance(snake.x[0], snake.y[0], food.x, food.y) + if dist < min_distance: + idx_min_distance = i + min_distance = dist + i += 1 + + if check_collision(self.STEP, snake.x[0], snake.y[0], food.x, food.y): + # Found food + snake.foundFood() + food.rellocate(random.randint(0, self.MAX_COORD), random.randint(0, self.MAX_COORD)) + # print("Got food! - Food Count:", snake.foodCount) + + snake.set_input_values(self.food_array[idx_min_distance].x, self.food_array[idx_min_distance].y) + + # Wall collision + if snake.x[0] > self.WIDTH or snake.x[0] < 0 or snake.y[0] > self.HEIGHT or snake.y[0] < 0: + snake.isDead = True + self.liveSnakes -= 1 + snake.calcFitness(-1 + time.time() - self.start_time) + self.mean_fitness += snake.fitness + + if snake.fitness < self.min_fitness: + self.min_fitness = snake.fitness + + # print("Wall collision! - Food Count:", snake.foodCount, "/ Fitness:", snake.fitness) + + # Self collision + if snake.collidedOnSelf(): + snake.isDead = True + self.liveSnakes -= 1 + snake.calcFitness(-3 + time.time() - self.start_time) + self.mean_fitness += snake.fitness + + if snake.fitness < self.min_fitness: + self.min_fitness = snake.fitness + + # print("Self collision! - Food Count:", snake.foodCount, "/ Fitness:", snake.fitness) + + if self.loop == self.MAX_LOOPS_PER_RUN: + self.reset_level() + + def on_render(self): + self._display_surf.fill((0, 0, 0)) + for snake in self.snakes_array: + snake.draw(self._display_surf, self._snake_surf) + for f in self.food_array: + f.draw(self._display_surf, self._food_surf) + + pygame.display.flip() + + # print("Live snakes: ", self.liveSnakes) + + def on_cleanup(self): + print("") + print("Elapsed time:", time.time() - self._start_time, "seconds.") + print("") + pygame.display.quit() + pygame.quit() + + def on_execute(self): + if not self.on_init(): + self._running = False + + self.start_time = time.time() + while self._running: + pygame.event.pump() + + for event in pygame.event.get(): + if event.type == pygame.QUIT: + self._running = False + self.on_cleanup() + return + elif event.type == pygame.KEYDOWN: + if event.key == pygame.K_ESCAPE: + self._running = False + self.on_cleanup() + """ + if event.key == pygame.K_RIGHT: + for snake in self.snakes_array: + snake.turn(dir = 0) + elif event.key == pygame.K_LEFT: + for snake in self.snakes_array: + snake.turn(dir = 1) + """ + + self.on_loop() + self.on_render() + + time.sleep(self.TIME_SLEEP) + + def reset_level(self): + fitness = {} + for idx, snake in enumerate(self.snakes_array): + if not snake.isDead: + snake.calcFitness(time.time() - self.start_time) + if self.min_fitness > snake.fitness: + self.min_fitness = snake.fitness + self.mean_fitness += snake.fitness + + fitness[idx] = snake.fitness + + fitness = sorted(fitness.items(), key=operator.itemgetter(1)) + new_list = [] + for i in range(self.MAX_SNAKES): + idx = fitness[i][0] + new_list.append(self.snakes_array[idx]) + + n = floor(self.MAX_SNAKES / 2) + if n == 0: + n = 1 + for i in range(n): + good_boy = new_list[i] + if self.MAX_SNAKES > 1: + bad_boy = new_list[i + n] + bad_boy.setSnakeNetwork(good_boy.getSnakeNetwork().copy()) + good_boy.getSnakeNetwork().mutate_change_weights() + good_boy.getSnakeNetwork().mutate() + + for snake in self.snakes_array: + snake.reset() + + self.mean_fitness /= self.MAX_SNAKES + if self.overall_min_fitness >= self.min_fitness: + self.overall_min_fitness = self.min_fitness + + print("") + print("Resetting level...") + print("Gen", self.run, "info:") + print("Time elapsed:", time.time() - self.start_time, "seconds") + print("Min fitness gen: ", self.min_fitness, "/ Mean fitness: ", self.mean_fitness) + print("Overall Min Fitness:", self.overall_min_fitness) + print("") + + visitedNeurons.clear() + self.loop = 0 + self.liveSnakes = self.MAX_SNAKES + self.run += 1 + self.min_fitness = 100 + self.mean_fitness = 0 + self.start_time = time.time() diff --git a/src/network.py b/src/network.py new file mode 100644 index 0000000..0781c3f --- /dev/null +++ b/src/network.py @@ -0,0 +1,226 @@ +# Neural network consisting of 1 output neuron and +# 2 input neurons (sensors) +# The hidden layer(s) is(are) gonna be arranged by a GA + +import random + +from src.neuron import Neuron, visitedNeurons +from src.synapse import Synapse + + +class Network: + MUTATION_RATE = 0.5 + INNER_MUTATION_RATE = 0.3 + EPSILON = 0.1 + + def __init__(self, mut_rate=1): + self._sensor_list = [] + self.input_node_list = [] + self.output_node_list = [] + self.input_output_node_list = [] + + self._actuator = Neuron() + + self.output_node_list.append(self._actuator) + self.MUTATION_RATE = mut_rate + + def propagate(self): + visitedNeurons.clear() + + prop = self._actuator.activate() + return prop + + @property + def sensor_list(self): + return self._sensor_list + + @sensor_list.setter + def sensor_list(self, l): + self._sensor_list = l + self.input_node_list += l + + @property + def actuator(self): + return self._actuator + + @actuator.setter + def actuator(self, n): + self._actuator = n + self.output_node_list.append(n) + + def set_input_values(self, l): + if len(self.sensor_list) != len(l): + print("########### ERROR setInputValues") + + i = 0 + for sensor in self.sensor_list: + sensor.setOutput(l[i]) + i += 1 + + @staticmethod + def get_random_neuron(l): + return random.choice(l) + + def mutate(self): + r = random.random() + if r <= self.MUTATION_RATE: + self.mutate_change_weights() + + if r <= self.INNER_MUTATION_RATE: + mut_type = random.randint(0, 3) + if mut_type == 0: + self.mutate_add_neuron() + elif mut_type == 1: + self.mutate_remove_neuron() + elif mut_type == 2: + self.mutate_change_weights() + elif mut_type == 3: + self.mutate_add_synapse() + + def mutate_change_weights(self): + if len(self.input_output_node_list) > 0: + tmp = self.get_random_neuron(self.input_output_node_list) + for syn in tmp.getInputList(): + syn.setWeight(random.random()) + + def mutate_remove_neuron(self): + if len(self.input_output_node_list) > 0: + tmp = self.get_random_neuron(self.input_output_node_list) + for syn in tmp.getOutputList(): + syn.getDestinationNeuron().getInputList().remove(syn) + for syn in tmp.getInputList(): + syn.getSourceNeuron().getOutputList().remove(syn) + + tmp.getInputList().clear() + tmp.getOutputList().clear() + + self.output_node_list.remove(tmp) + self.input_node_list.remove(tmp) + self.input_output_node_list.remove(tmp) + + def mutate_add_neuron(self): + tmp = Neuron() + + src = self.get_random_neuron(self.input_node_list) + syn_in = Synapse(src, tmp, self.EPSILON * random.random()) + src.addOutput(syn_in) + tmp.addInput(syn_in) + + dst = self.get_random_neuron(self.output_node_list) + syn_out = Synapse(tmp, dst, self.EPSILON * random.random()) + tmp.addOutput(syn_out) + dst.addInput(syn_out) + + self.input_node_list.append(tmp) + self.output_node_list.append(tmp) + self.input_output_node_list.append(tmp) + + def mutate_add_synapse(self): + src = self.get_random_neuron(self.input_node_list) + dst = src + while src == dst: + dst = self.get_random_neuron(self.output_node_list) + + syn = Synapse(src, dst, self.EPSILON * random.random()) + src.addOutput(syn) + dst.addInput(syn) + + def copy(self): + q = [] + m = {} + copy_network = Network() + + original_actuator = self.actuator + + copy_actuator = Neuron() + copy_network.actuator = copy_actuator + + q.append(original_actuator) + + m[original_actuator] = copy_actuator + copy_actuator.k = original_actuator.k + + copyio_nodes = [] + copyi_nodes = [] + copyo_nodes = [] + copy_sensors = [] + + for sensor in self.sensor_list: + tmp = Neuron() + m[sensor] = tmp + q.append(sensor) + copy_sensors.append(tmp) + copyi_nodes.append(tmp) + + while len(q) > 0: + original_node = q.pop() + + i_syns = original_node.getInputList() + o_syns = original_node.getOutputList() + + copy_node = m[original_node] + copy_node.k = original_node.k + + for syn in i_syns: + input_original = syn.getSourceNeuron() + + if input_original in list(m.keys()): + input_copy = m[input_original] + + exists = False + for n in input_copy.getOutputList(): + if n.getSourceNeuron() == input_copy and n.getDestinationNeuron() == copy_node: + exists = True + break + if not exists: + copy_syn = Synapse(input_copy, copy_node, syn.getWeight()) + copy_node.addInput(copy_syn) + input_copy.addOutput(copy_syn) + else: + input_copy = Neuron() + input_copy.k = input_original.k + copyio_nodes.append(input_copy) + + copy_syn = Synapse(input_copy, copy_node, syn.getWeight()) + input_copy.addOutput(copy_syn) + copy_node.addInput(copy_syn) + m[input_original] = input_copy + q.append(input_original) + + for syn in o_syns: + output_original = syn.getDestinationNeuron() + + if output_original in list(m.keys()): + output_copy = m[output_original] + + exists = False + for n in output_copy.getOutputList(): + if n.getSourceNeuron() == copy_node and n.getDestinationNeuron() == output_copy: + exists = True + break + if not exists: + copy_syn = Synapse(copy_node, output_copy, syn.getWeight()) + copy_node.addOutput(copy_syn) + output_copy.addInput(copy_syn) + else: + output_copy = Neuron() + output_copy.k = output_original.k + copyio_nodes.append(output_copy) + + copy_syn = Synapse(copy_node, output_copy, syn.getWeight()) + output_copy.addInput(copy_syn) + copy_node.addOutput(copy_syn) + m[output_original] = output_copy + q.append(output_original) + + for n in copyio_nodes: + copyi_nodes.append(n) + copyo_nodes.append(n) + copyo_nodes.append(copy_actuator) + + copy_network.sensor_list = copy_sensors + copy_network.input_node_list = copyi_nodes + copy_network.output_node_list = copyo_nodes + copy_network.input_output_node_list = copyio_nodes + + return copy_network diff --git a/src/neuron.py b/src/neuron.py new file mode 100644 index 0000000..2f9720b --- /dev/null +++ b/src/neuron.py @@ -0,0 +1,74 @@ +import math +import random + +from src.network import Network + + +def sigmoid(x): + return 1 / (1 + math.exp(-x)) + + +visitedNeurons = [] + + +class Neuron: + def __init__(self): + self.z = 0 + self.bias = random.gauss(0, 1) + self.k = random.random() + + self._input_list = [] + self._output_list = [] + self.bias = 0 + self._output = 0 + + def activate(self, m=0): + m += 1 + # print("m =", m) + + if len(self.input_list) == 0: + return self.output + + self.z = 0 + for syn in self.input_list: + # print("Input List:", self.input_list) + if syn.getSourceNeuron() in visitedNeurons: + continue + + visitedNeurons.append(syn.getSourceNeuron()) + # print("Source Neuron:", syn.getSourceNeuron()) + + self.z += syn.getWeight() * syn.getSourceNeuron().activate(m) + # print("Visited Neurons:", visitedNeurons) + + return sigmoid(Network.EPSILON * self.z) + + @property + def input_list(self): + return self._input_list + + @input_list.setter + def input_list(self, s): + self._input_list.append(s) + + @property + def output_list(self): + return self._output_list + + @output_list.setter + def output_list(self, s): + self._output_list.append(s) + + @property + def output(self): + return self._output + + @output.setter + def output(self, val): + self._output = val + + def __key(self): + return (self.k) + + def __hash__(self): + return hash(self.__key()) diff --git a/src/snake.py b/src/snake.py new file mode 100644 index 0000000..c871221 --- /dev/null +++ b/src/snake.py @@ -0,0 +1,163 @@ +import random +from enum import Enum + +from src.game import Game +from src.network import Network +from src.neuron import Neuron + + +class Direction(Enum): + UP = 0 + RIGHT = 1 + DOWN = 2 + LEFT = 3 + + +class Snake: + STEP = 24 + PROP_CCW = 0.45 + PROP_CW = 0.65 + MUL = 2 + + updateCountMax = 2 + + def __init__(self, length, step, snake_n=1): + self.x = [] + self.y = [] + self.length = length + self.INITIAL_LENGTH = length + self.DIRECTION = Direction.RIGHT + + self.updateCount = 0 + self.STEP = step / self.MUL + for i in range(length): + x_n = random.randint(0, Game.MAX_COORD) + y_n = random.randint(0, Game.MAX_COORD) + self.x.append(self.STEP * self.MUL * (x_n - i)) + self.y.append(self.STEP * self.MUL * y_n) + + self.NETWORK = Network(mut_rate=0.7) + self.snakeNumber = snake_n + + self.foodCount = 0 + + self.isDead = False + + self.fitness = 0 + self.input_list = [] + self.setupNetwork() + + self.snakeNumber = 1 + + # dir + # 0 -> Clockwise + # 1 -> Counterclockwise + def turn(self, dir=0): + numDir = len(list(Direction)) + + if dir == 0: + nextDirValue = (self.DIRECTION.value + 1) % numDir + elif dir == 1: + if self.DIRECTION.value == 0: + nextDirValue = 3 + else: + nextDirValue = (self.DIRECTION.value - 1) + + self.DIRECTION = Direction(nextDirValue) + + def update(self): + self.updateCount = self.updateCount + 1 + if self.updateCount >= self.updateCountMax: + self.NETWORK.set_input_values([self.inputX, self.inputY, self.wallDistance]) + prop = self.NETWORK.propagate() + + # print("Snake #", self.snakeNumber, "prop = ", prop) + if prop <= self.PROP_CCW: + self.turn(dir=1) + elif prop >= self.PROP_CW: + self.turn(dir=0) + + for i in range(self.length - 1, 0, -1): + self.x[i] = self.x[i - 1] + self.y[i] = self.y[i - 1] + + if self.DIRECTION == Direction.UP: + self.y[0] -= self.STEP + elif self.DIRECTION == Direction.RIGHT: + self.x[0] += self.STEP + elif self.DIRECTION == Direction.DOWN: + self.y[0] += self.STEP + elif self.DIRECTION == Direction.LEFT: + self.x[0] -= self.STEP + + self.updateCount = 0 + + def draw(self, surface, image): + if self.isDead == True: + return + + for i in range(self.length): + surface.blit(image, (self.x[i], self.y[i])) + + def collidedOnSelf(self): + x1 = self.x[0] + y1 = self.y[0] + for i in range(1, self.length): + if Game.checkCollision(self.STEP, x1, y1, self.x[i], self.y[i]) == True: + return True + + return False + + def foundFood(self): + i = self.length + self.length += 1 + self.foodCount += 1 + + self.x.append(self.x[i - 1]) + self.y.append(self.y[i - 1]) + + def setSnakeNetwork(self, net): + self.NETWORK = net + + def getSnakeNetwork(self): + return self.NETWORK + + def setupNetwork(self): + self.input_list.append(Neuron()) + self.input_list.append(Neuron()) + self.input_list.append(Neuron()) + + self.NETWORK.setSensorList(self.input_list) + self.NETWORK.set_input_values([Game.Game.WIDTH, Game.Game.HEIGHT, Game.Game.WIDTH - self.STEP]) + + def setInputValues(self, x, y): + # Send the distance in X and Y as inputs to the Network + self.inputX = x - self.x[0] + self.inputY = y - self.y[0] + if self.DIRECTION == Direction.UP: + self.wallDistance = self.y[0] + elif self.DIRECTION == Direction.RIGHT: + self.wallDistance = Game.Game.WIDTH - self.x[0] + elif self.DIRECTION == Direction.DOWN: + self.wallDistance = Game.Game.HEIGHT - self.y[0] + elif self.DIRECTION == Direction.LEFT: + self.wallDistance = self.x[0] + + def calcFitness(self, penalty=0): + self.fitness = 10 / (0.5 + self.foodCount) + 1 / penalty + if self.fitness <= 0: + self.fitness = 100 # Penalty + + def reset(self): + self.isDead = False; + self.x = []; + self.y = []; + self.length = self.INITIAL_LENGTH; + for i in range(self.length): + xN = random.randint(0, Game.Game.MAX_COORD) + yN = random.randint(0, Game.Game.MAX_COORD) + self.x.append(self.STEP * self.MUL * (xN - i)) + self.y.append(self.STEP * self.MUL * yN) + + self.updateCount = 0; + self.foodCount = 0; diff --git a/src/synapse.py b/src/synapse.py new file mode 100644 index 0000000..9758c75 --- /dev/null +++ b/src/synapse.py @@ -0,0 +1,5 @@ +class Synapse: + def __init__(self, source_neuron, destination_neuron, weight): + self.source_neuron = source_neuron + self.destination_neuron = destination_neuron + self.weight = weight