Download Chereads APP
Chereads App StoreGoogle Play
Chereads

Oh No! Stuck In Reality!

🇺🇸SentientSpork
--
chs / week
--
NOT RATINGS
180
Views
VIEW MORE

Chapter 1 - Networking

import socket

import threading

import json

import time

# Networking configurations

SERVER_IP = "127.0.0.1"

PORT = 5555

MAX_PLAYERS = 10000

TICK_RATE = 60 # Updates per second

# Game states

connected_players = {}

player_data = {}

# Server class to handle connections and data flow

class RealityServer:

def __init__(self):

self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

self.server_socket.bind((SERVER_IP, PORT))

self.server_socket.listen(MAX_PLAYERS)

print(f"[SERVER] Reality server started on {SERVER_IP}:{PORT}")

def start(self):

while True:

client_socket, addr = self.server_socket.accept()

print(f"[SERVER] New connection from {addr}")

threading.Thread(target=self.handle_client, args=(client_socket, addr)).start()

def handle_client(self, client_socket, addr):

player_id = f"{addr[0]}:{addr[1]}"

connected_players[player_id] = client_socket

player_data[player_id] = {"position": [0, 0, 0], "rotation": [0, 0, 0]}

self.send_initial_data(client_socket, player_id)

while True:

try:

message = client_socket.recv(1024).decode('utf-8')

if not message:

break

data = json.loads(message)

self.update_player_data(player_id, data)

self.broadcast_player_data()

except (ConnectionResetError, json.JSONDecodeError):

break

# Handle disconnection

print(f"[SERVER] Player {player_id} disconnected")

del connected_players[player_id]

del player_data[player_id]

client_socket.close()

self.broadcast_player_data()

def send_initial_data(self, client_socket, player_id):

data = {

"type": "init",

"player_id": player_id,

"other_players": player_data

}

client_socket.send(json.dumps(data).encode('utf-8'))

def update_player_data(self, player_id, data):

if "position" in data:

player_data[player_id]["position"] = data["position"]

if "rotation" in data:

player_data[player_id]["rotation"] = data["rotation"]

def broadcast_player_data(self):

data = {

"type": "update",

"player_data": player_data

}

for player_id, socket in connected_players.items():

try:

socket.send(json.dumps(data).encode('utf-8'))

except BrokenPipeError:

pass

# Client class to handle connection to the server and send/receive updates

class RealityClient:

def __init__(self, player_name):

self.player_name = player_name

self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

self.player_data = {"position": [0, 0, 0], "rotation": [0, 0, 0]}

def connect(self):

try:

self.client_socket.connect((SERVER_IP, PORT))

print("[CLIENT] Connected to Reality server")

threading.Thread(target=self.receive_data).start()

except ConnectionRefusedError:

print("[CLIENT] Unable to connect to Reality server")

def send_data(self):

while True:

data = json.dumps({

"position": self.player_data["position"],

"rotation": self.player_data["rotation"]

})

self.client_socket.send(data.encode('utf-8'))

time.sleep(1 / TICK_RATE)

def receive_data(self):

while True:

try:

message = self.client_socket.recv(1024).decode('utf-8')

if message:

data = json.loads(message)

self.process_server_data(data)

except (ConnectionResetError, json.JSONDecodeError):

print("[CLIENT] Disconnected from server")

break

def process_server_data(self, data):

if data["type"] == "init":

print(f"[CLIENT] Initialized as {data['player_id']}")

print("[CLIENT] Other players:", data["other_players"])

elif data["type"] == "update":

self.update_game_world(data["player_data"])

def update_game_world(self, player_data):

print(f"[CLIENT] Updating game world with player data: {player_data}")

# Running the server

if __name__ == "__main__":

server = RealityServer()

server_thread = threading.Thread(target=server.start)

server_thread.start()