Building Something That's Not a Todo App: An Online Multiplayer Snake Game

Oussama Ben Hassen - Aug 19 - - Dev Community

I recently stumbled upon a quote by tech influencer "the primeagen" I don't remember it exactly but as I remember it goes like this :

"If you're not failing at what you do, then you're not learning."

This got me thinking about my coding journey. I've become quite comfortable with building backends, to the point where even writing import express from 'express'; has become a chore.

Instead of going through the canon event of learning yet another JavaScript framework in order to build my millionth revolutionary todo app (because clearly, the world needs more of those), I decided to do something else. I'd been reading about the WebSocket protocol and found its ability to handle bidirectional asynchronous messages between server and client fascinating. I wanted to build something with it, but I needed a break from JavaScript.

After some consideration, I settled on a simple multiplayer 2D game. It would involve calculations (collision detection), data structures (linked lists, hashmaps), and player synchronization. A snake game seemed perfect, with a few simple rules:

  1. Eating a fruit makes you grow and adds 1 to your score

  2. Crashing into another player's body makes you shrink, resets your position randomly, and zeros your score

  3. Head-to-head collisions cause both players to shrink, reset their positions, and zero their scores

All these calculations happen server-side to prevent players from tampering with the game logic. We'll use Python 3 with Pygame for graphics and the websockets library for handling asynchronous messages through asyncio.

Now, let's dive into the code that you may find it messy because remember the first rule of programming:

"If it works, don't touch it."

You’ve read enough of my yapping, let’s move on to the fun part: Coding. But if you want to skip the chatter and dive straight in, just head over to the GitHub repo.
If you want to contribute , feel free to open an issue or submit a pull request. Any improvements or bug fixes are welcome!

First, we define our data structures:

class Object :
    def __init__(self , x : float , y :float ,  width:int , height :int):

        ####################################
        #  init object's size and postion  #           
        ####################################
        self.x = x
        self.y = y 
        self.height = height
        self.width = width

    def render(self , screen , color) :
        pygame.draw.rect(screen ,color ,pygame.Rect(self.x , self.y , self.width , self.height))

class Player(Object) :

    def __init__(self, x: float, y: float, width: int, height: int):
        super().__init__(x, y, width, height)
        self.next = None
        self.prev = None
        self.tail = self
        self.direction = 'LEFT'
        self.length = 1
        self.color = 'red'

    # move the Snake to a certain direction
    # the "changed" will be a way to tell  either to continue in the same direction 
    # or change the direction of the head to the new direction
    # it is used in the game

    def change_direction(self, keys):
        changed = False
        if self.direction in ['LEFT', 'RIGHT']:
            if keys[pygame.K_w] and self.direction != 'DOWN':
                self.direction = 'UP'
                changed = True
            elif keys[pygame.K_s] and self.direction != 'UP':
                self.direction = 'DOWN'
                changed = True
        elif self.direction in ['UP', 'DOWN']:
            if keys[pygame.K_a] and self.direction != 'RIGHT':
                self.direction = 'LEFT'
                changed = True
            elif keys[pygame.K_d] and self.direction != 'LEFT':
                self.direction = 'RIGHT'
                changed = True
        return changed  

    # move the Snake to a certain direction with a certain speed
    def move(self, screen, dt):
        speed = 150 * dt 
        if self.direction == 'UP':
            self.move_all(screen, 0, -speed)
        elif self.direction == 'DOWN':
            self.move_all(screen, 0, speed)
        elif self.direction == 'LEFT':
            self.move_all(screen, -speed, 0)
        elif self.direction == 'RIGHT':
            self.move_all(screen, speed, 0)
    def bound(self , screen) :
        if self.y < 0  :
            self.y = screen.get_height()
        if self.y > screen.get_height() :
            self.y = 0

        if self.x < 0  :
            self.x = screen.get_width()
        if self.x > screen.get_width() :
            self.x = 0

    def get_pos(self) :
        arr = []
        current = self 
        while current :
            arr.append([current.x , current.y])
            current = current.next
        return arr

    # move the snake and its body to some coordinates 
    def move_all(self, screen, dx, dy):
        # Store old positions
        old_positions = []
        current = self
        while current:
            old_positions.append((current.x, current.y))
            current = current.next

        # Move head
        self.x += dx
        self.y += dy
        self.bound(screen)
        # self.render(screen, self.color)

        # Move body
        current = self.next
        i = 0
        while current:
            current.x, current.y = old_positions[i]
            current.bound(screen)
            # current.render(screen, self.color)
            current = current.next
            i += 1           

    def add(self ):
        new = Player(self.tail.x+self.tail.width+10 ,
                     self.tail.y ,self.tail.width ,                    
                      self.tail.height)
        new.prev = self.tail 
        self.tail.next = new
        self.tail = new
        self.length +=1

    def shrink(self , x , y):
        self.next = None 
        self.tail = self 
        self.length = 1
        self.x = x 
        self.y = y

    # used for the and the opponent player when 
    # receiving its coordinates
    def setall(self , arr) :
        self.x = arr[0][0]
        self.y = arr[0][1]
        self.next = None
        self.tail = self
        current = self 
        for i in range(1 , len(arr)) :
            x = arr[i][0]
            y = arr[i][1]
            new = Player(x ,y ,self.width , self.height)
            current.next = new 
            self.tail = new
            current = current.next 

   # render the whole snake on the screen 
   # used for both the current player and the opponent
    def render_all(self, screen, color):
        current = self        
        if self.next : 
            self.render(screen,'white')
            current = self.next
        while current :
            current.render(screen , color) 
            current = current.next

Enter fullscreen mode Exit fullscreen mode

The Object class is a base class for game objects, while the Player class extends it with snake-specific functionality. The Player class includes methods for changing direction, moving, growing, shrinking, and rendering the snake.

Next, we have the game logic:

import pygame 
from objects import Player
import websockets 
import asyncio
import json

uri = 'ws://localhost:8765'

pygame.font.init() 


# Render the text on a transparent surface
font = pygame.font.Font(None, 36)

# playing the main theme (you should hear it)
def play() :
    pygame.mixer.init()
    pygame.mixer.music.load('unknown.mp3')
    pygame.mixer.music.play()

def stop():
    if pygame.mixer.music.get_busy():  # Check if music is playing
        pygame.mixer.music.stop()

# initialize players and the fruit
def init(obj) :
    print(obj)
    player = Player(*obj['my'][0])
    opp = Player(*obj['opp'][0] )
    food = Food(*obj['food'])
    return (player , opp , food)

async def main():

    async with websockets.connect(uri) as ws:
        choice = int(input('1 to create a room \n2 to join a room\n>>>'))
        room_name = input('enter room name: ')
        await ws.send(json.dumps({
            "choice" : choice ,
            "room" : room_name
        }))

        ## waiting for the other player to connecet
        res = {}
        while True:
            res = await ws.recv()
            try:
                res = json.loads(res)

                break 
            except Exception as e:
                pass

        player, opp, food = init(res) 
        pygame.init()
        screen = pygame.display.set_mode((600, 400))
        clock = pygame.time.Clock()
        running = True
        dt = 0
        play()
        while running:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False

            screen.fill("black")
            my_pos = {
                'pos': player.get_pos(), 
                'len': player.length
            }
            await ws.send(json.dumps(my_pos))
            response = await ws.recv()
            response = json.loads(response)

            # Update food position
            pygame.draw.rect(screen ,'green' ,pygame.Rect(response['food'][0] ,response['food'][1] ,20, 20))
            # Handle actions
            if response['act'] == 'grow':
                player.add()
            elif response['act'] == 'shrinkall':
                player.shrink(*response['my'][0][0:2])
                opp.shrink(*response['opp'][0][0:2])
            elif response['act'] == 'shrink':
                player.shrink(*response['my'][0][0:2])
                # restarting the song each time you bump into the other player
                stop()
                play()
            else:
                opp.setall(response['opp'])

            # Render everything once per frame 
            player.render_all(screen, 'red')
            opp.render_all(screen, 'blue')

            ## score
            ## x | y => x you , y opponent
            text = font.render(f'{response["my_score"]} | {response["other_score"]}', True, (255, 255, 255))

            screen.blit(text, (0, 0)) 
            pygame.display.flip()

            keys = pygame.key.get_pressed()
            changed = player.change_direction(keys)
            # keep moving in the same direction if it is not changed
            if not changed:
                player.move(screen, dt)

            dt = clock.tick(60) / 1000

        pygame.quit()

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

this will connect you the server and let you create and join a room , it will send an update of your position to the server and get the opponent's position and the fruit position and an act which will be an order to the player to either shrink or grow

Finally, the server code:

import asyncio
import websockets 
import random
import json

def generate_food_position(min_x, max_x, min_y, max_y):
    x = random.randint(min_x, max_x)
    y = random.randint(min_y, max_y)
    return [x, y, 20, 20]

rooms  = {}

def collide(a , b , width):
    return (abs(a[0] - b[0]) < width and
            abs(a[1] - b[1]) < width)

# detecting possible collides : 
def collides(a , b , food ) :
    head_to_head = collide(a[0] , b[0] ,30) ;
    head_to_food = collide(a[0] , food ,25  ) 
    head_to_body = False 
    this_head = a[0]
    for part in b :
        if collide(this_head , part  ,30) :
            head_to_body = True 
            break 

    return (head_to_head , head_to_body , head_to_food)

# return response as (act ,opponents position(s) , my position(s) , the food and the scores)
def formulate_response(id , oid , food , roomName) :
    this= rooms[roomName][id]['pos']
    other= rooms[roomName][oid]['pos']
    hh , hb , hf = collides(this ,other ,food)
    act = 'None'
    if hh :
        act = 'shrink' 
        rooms[roomName][id]['pos'] = initPlayer()
        rooms[roomName][id]['score'] =0 
        # rooms[roomName][oid]['pos'] = initPlayer()
        # rooms[roomName][oid]['respawn'] = True

    elif hb :
        act = 'shrink' 
        rooms[roomName][id]['pos'] = initPlayer()
        rooms[roomName][id]['score'] = 0   
    elif hf :
        act = 'grow'
        rooms[roomName]['food'] =  generate_food_position(20, 580, 20, 380) 
        rooms[roomName][id]['score']+=1
    return  {
        'act' : act , 
        'opp' : rooms[roomName][oid]['pos'] , 
        'my'  : rooms[roomName][id]['pos'] ,
        'food': rooms[roomName]['food'] ,
        'my_score' : rooms[roomName][id]['score'] ,
        'other_score' : rooms[roomName][oid]['score']
    }




def initPlayer():
    return [[random.randint(30 , 600 ) , random.randint(30 , 400 ) , 30 , 30 ]] 


async def handler(websocket) :

    handshake = await websocket.recv()
    handshake = json.loads(handshake)  

    roomName = handshake["room"]
    if handshake['choice'] == 1 :
        rooms[roomName] = {}
        rooms[roomName]['food'] =generate_food_position(30 ,570 ,30 ,370)

    rooms[roomName][websocket.id] = {
            'socket' : websocket , 
            'pos' : initPlayer() , 
            'respawn' : False ,
            'score' :0
    }
    if len(rooms[roomName]) >= 3 :
        await broadcast(rooms[roomName])
    id = websocket.id

    while True :
        room = rooms[roomName]
        this_pos = await websocket.recv()

        ## synchrnisation issue with this 
        ## after couple of times they collide head to head
        ## the cordinates shown on the screen aren't same 
        ## as the server 

        if room[id]['respawn']==True :
            rooms[roomName][id]['respawn'] = False


            # generate response :
            response = {
                'act' :  'shrinkall', 
                'my'  :  room[id]['pos']  , 
                'opp' :  get_other_pos(get_other_id(id ,room),room),
                'food': room['food']
            } 
            await websocket.send(json.dumps(response))

        else :
            # update player position  

            this_pos = json.loads(this_pos)

            rooms[roomName][id]['pos'] = this_pos['pos']
            rooms[roomName][id]['len'] = this_pos['len']

            other_id = get_other_id(id , room)
            food = room['food']
            response = formulate_response(id ,other_id ,food  ,roomName)  


        await websocket.send(json.dumps(response))

def get_other_id(id , room) :
    for thing in room.keys():
        if thing != 'food' and thing != id :
            return thing
def get_other_pos(id , room) : 
    return room[id]['pos']

async def broadcast(room) :
    for thing in room.keys() :
        if thing!= 'food' :
            init = {
                'my' : room[thing]['pos'] , 
                'opp' : room[get_opp(thing, room)]['pos'] ,
                'food': room['food']
            }
            await room[thing]['socket'].send(json.dumps(init))

def get_opp(id  , room) :
     for thing in room.keys() :
        if thing!= 'food' and thing != id:
            return thing


async def main():
    async with websockets.serve(handler , 'localhost' ,8765 ):
        await asyncio.Future()



if __name__ == '__main__' :
    print('listenning  ... ')
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

it handles player movements, collisions, and interactions. It generates food, detects collisions, and synchronizes game state between players. The server also manages player connections and pushing game updates

game preview
And there you have it a multiplayer snake game that could be the next big thing in the world of gaming. Who knows, it might even make its way to the next big tech conference!
For now, why not take a spin and see what you can add? Check out the GitHub repo and make your mark on the next big thing in snake games.
Happy coding!

.