# Structure for addons/godot_mcp/plugin.gd @tool extends EditorPlugin const SERVER_PORT = 6400 var server: TCPServer = null var active_connections = [] var command_handler func _enter_tree(): # Initialize the plugin print("Godot MCP Plugin activated") # Create command handler command_handler = preload("res://addons/godot_mcp/command_handler.gd").new() command_handler.set_editor_plugin(self) # Start the TCP server server = TCPServer.new() var error = server.listen(SERVER_PORT) if error != OK: push_error("Failed to start Godot MCP Server on port %d: %s" % [SERVER_PORT, error]) return print("Godot MCP Server listening on port %d" % SERVER_PORT) # Add UI add_control_to_bottom_panel( preload("res://addons/godot_mcp/ui/mcp_panel.tscn").instantiate(), "MCP" ) func _exit_tree(): # Clean up the plugin when disabled if server: server.stop() server = null for connection in active_connections: if connection.get_status() == StreamPeerTCP.STATUS_CONNECTED: connection.disconnect_from_host() active_connections.clear() # Remove UI remove_control_from_bottom_panel(get_editor_interface().get_base_control().get_node("MCPPanel")) print("Godot MCP Plugin deactivated") func _process(delta): # Check for new connections if server and server.is_connection_available(): var connection = server.take_connection() if connection: active_connections.append(connection) print("New MCP connection established") # Process existing connections var i = 0 while i < active_connections.size(): var connection = active_connections[i] # Check connection status if connection.get_status() != StreamPeerTCP.STATUS_CONNECTED: active_connections.remove_at(i) print("MCP connection closed") continue # Check for incoming messages if connection.get_available_bytes() > 0: var data = _read_message(connection) if data.size() > 0: # Process the command var response = _process_command(data) # Send the response _send_message(connection, response) i += 1 func _read_message(connection): # Read data from the connection var data = PackedByteArray() var bytes_available = connection.get_available_bytes() if bytes_available > 0: data = connection.get_data(bytes_available)[1] # Attempt to parse as JSON var json_string = data.get_string_from_utf8() var json = JSON.new() var error = json.parse(json_string) if error == OK: return json.get_data() else: print("Failed to parse JSON: ", json.get_error_message()) return {} func _send_message(connection, data): # Convert to JSON and send var json_string = JSON.stringify(data) connection.put_data(json_string.to_utf8_buffer()) func _process_command(data): # Process the command and return a response if data == null or typeof(data) != TYPE_DICTIONARY: return { "status": "error", "error": "Invalid command format. Expected a dictionary." } if not data.has("type") or not data.has("params"): return { "status": "error", "error": "Invalid command format. Expected 'type' and 'params' fields." } var command_type = data["type"] var params = data["params"] if command_type == "ping": return {"status": "success", "result": {"message": "pong"}} # Forward to command handler var result = command_handler.handle_command(command_type, params) # Check if result is valid if result == null: return { "status": "error", "error": "Command handler returned null result" } if result.has("error"): return {"status": "error", "error": result.error} else: return {"status": "success", "result": result}