TCP劫持概念验证:使用Python与Scapy实现连接中断工具

本文详细介绍了一个基于Python和Scapy库的TCP连接中断工具,通过读取/proc/net/tcp获取连接信息,利用TCP序列号发送RST包强制终止连接,并支持与ipset集成管理防火墙规则。

TCP Hijacking Proof of Concept

概述

这个Python脚本是一个TCP连接中断工具,使用Scapy库来操纵网络数据包。它定义了用于连接处理的自定义异常,并提供通过读取/proc/net/tcp(列出Linux系统上当前TCP连接的文件)基于进程ID(PIDs)或TCP端口检索端点的功能。

TCPBreaker类是功能的核心,允许过滤和拦截与指定本地和远程TCP端口相关的数据包,捕获TCP会话的序列号以发送重置(RST)数据包。这有效地终止了连接。

该脚本还可以与ipset交互,添加或删除IP对以管理防火墙规则。它包括一个命令行界面供用户交互,支持指定PID、本地/远程端口以及选择数据包捕获方法(raw或nflog)等选项。

总体而言,该脚本旨在提供一种以编程方式控制和终止不需要的TCP连接的方法。

使用方法

1
python tcp_breaker.py [OPTIONS] [ipset_name]

选项

  • ipset_name: (可选)要临时插入本地IP的ipset名称。这可以在iptables过滤规则中使用,以使用TCP重置数据包拒绝匹配的出站数据包。
  • --pid: (可选)要终止连接的进程的进程ID(PID)。
  • --port: (可选)要关闭的连接的本地TCP端口。
  • -d, --remote-port: (可选)将端口参数视为远程端口而非本地端口。
  • -c, --capture: (可选)指定捕获方法。选项为"raw"或"nflog"(如果指定了ipset_name,则默认为"nflog",否则为"raw")。
  • --debug: (可选)启用详细操作模式以进行调试。

示例命令

通过PID终止连接:

1
python tcp_breaker.py --pid 1234 --port 8080

使用ipset终止连接:

1
python tcp_breaker.py my_ipset --pid 1234

通过指定本地和远程端口终止连接:

1
python tcp_breaker.py my_ipset --port 80 -d --remote-port

不使用ipset的原始捕获方法:

1
python tcp_breaker.py --pid 5678 --port 443 --capture raw

注意

确保您具有运行脚本的必要权限,因为它需要访问/proc/net/tcp,并且可能需要提升权限来修改ipset规则或终止连接。

代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python
from __future__ import print_function

from scapy.all import *
import itertools as it
import operator as op
import os
import re
import signal
import struct
import sys
from subprocess import Popen
import logging

class ConnMultipleMatches(Exception):
    pass

class ConnNotFound(Exception):
    pass

def get_endpoints(pid=None, port_local=None, port_remote=None):
    if pid:
        matches = list(get_endpoints_by_pid(pid))
    else:
        matches = list(get_endpoints_by_port(port_local, port_remote))

    if not matches:
        raise ConnNotFound()
    if len(matches) > 1:
        raise ConnMultipleMatches(matches)

    local, remote = matches[0]
    return (inet_ntoa(struct.pack('<I', local[0] & 0xffffffff)), local[1]),           (inet_ntoa(struct.pack('<I', remote[0] & 0xffffffff)), remote[1])

def get_endpoints_by_port(port_local=None, port_remote=None):
    assert port_local or port_remote, (port_local, port_remote)
    with open('/proc/net/tcp') as src:
        src = iter(src)
        next(src)  # skip header line
        for line in src:
            local, remote = (
                tuple(int(v, 16) for v in ep.split(':'))
                for ep in op.itemgetter(1, 2)(line.split()))
            for ep, port in [(local, port_local), (remote, port_remote)]:
                if ep[1] == port:
                    break
            else:
                continue
            yield local, remote

def get_endpoints_by_pid(pid):
    assert isinstance(pid, int), pid
    conns = dict()
    with open('/proc/net/tcp') as src:
        src = iter(src)
        next(src)  # skip header line
        for line in src:
            line = line.split()
            conns[line[9]] = line[1], line[2]
    pid_fds = f'/proc/{pid}/fd'
    for fd in os.listdir(pid_fds):
        match = re.search(r'^socket:\[(\d+)\]$', os.readlink(join(pid_fds, fd)))
        if not match:
            continue
        try:
            match = conns[match.group(1)]
        except KeyError:
            continue
        yield tuple(tuple(int(v, 16) for v in ep.split(':')) for ep in match)

def ipset_update(cmd, name, ip, port):
    if name is None:
        return
    assert cmd in ['add', 'del'], cmd
    log.debug(f'Updating ipset {name!r} ({cmd} {ip}:{port})')
    if Popen(['ipset', cmd, name, f'{ip},{port}']).wait():
        raise RuntimeError('Failed running ipset command.')

class TCPBreaker(Automaton):
    ss_filter_port_local = ss_filter_port_remote = None
    ss_remote, ss_intercept = None, None

    def __init__(self, port_local, port_remote, timeout=False, **atmt_kwz):
        self.ss_filter_port_local, self.ss_filter_port_remote = port_local, port_remote
        self.ss_timeout = timeout
        atmt_kwz.setdefault('store', False)
        atmt_kwz.setdefault('filter',
            f'tcp and ((src port {port_local} and dst port {port_remote})'
            f' or (dst port {port_local} and src port {port_remote}))')
        super(TCPBreaker, self).__init__(**atmt_kwz)

    def pkt_filter(self, pkt):
        if {pkt[TCP].sport, pkt[TCP].dport} == {self.ss_filter_port_local, self.ss_filter_port_remote}:
            return pkt[IP].dst if pkt[TCP].sport == self.ss_filter_port_local else pkt[IP].src
        raise KeyError

    @ATMT.state(initial=1)
    def st_seek(self):
        log.debug('Waiting for noise on the session')

    @ATMT.receive_condition(st_seek)
    def check_packet(self, pkt):
        log.debug('Session check, packet: {}'.format(pkt.summary()))
        try:
            remote = self.pkt_filter(pkt)
        except KeyError:
            return
        raise self.st_collect(pkt).action_parameters(remote)

    @ATMT.action(check_packet)
    def break_link(self, remote):
        if not self.ss_remote:
            log.debug('Found session (remote: {})'.format(remote))
            self.ss_remote = remote

    @ATMT.timeout(st_seek, 1)  # 1s timeout
    def interception_done(self):
        if self.ss_remote and self.ss_intercept:
            log.debug('Captured seq, proceeding to termination')
            raise self.st_rst_send()

    @ATMT.state()
    def st_collect(self, pkt):
        log.debug('Collected: {!r}'.format(pkt))
        self.ss_intercept = pkt  # only last one matters
        raise (self.st_rst_send if not self.ss_timeout else self.st_seek)()

    @ATMT.state()
    def st_rst_send(self):
        pkt_ip, pkt_tcp = op.itemgetter(IP, TCP)(self.ss_intercept)
        ordered = lambda k1, v1, k2, v2, pkt_dir=(pkt_ip.dst == self.ss_remote):             dict(it.izip((k1, k2), (v1, v2) if pkt_dir else (v2, v1)))
        rst = IP(**ordered('src', pkt_ip.src, 'dst', pkt_ip.dst))             / TCP(**dict(it.chain.from_iterable(p.viewitems() for p in (
                ordered('sport', pkt_tcp.sport, 'dport', pkt_tcp.dport),
                ordered('seq', pkt_tcp.seq, 'ack', pkt_tcp.ack),
                dict(flags=b'R', window=pkt_tcp.window)))))
        rst[TCP].ack = 0
        log.debug('Sending RST: {!r}'.format(rst))
        send(rst)

        # Finish
        os.kill(os.getpid(), signal.SIGINT)
        self.stop()

def main(argv=None):
    import argparse
    parser = argparse.ArgumentParser(
        description='TCP connection breaking tool. Finds connection with specified parameters...'
    )

    parser.add_argument('ipset_name', nargs='?',
                        help='Name of the ipset to temporarily insert local ip:port to...')
    parser.add_argument('--pid', type=int,
                        help='Process ID to terminate connection of.')
    parser.add_argument('--port', type=int,
                        help='TCP port of local connection to close.')
    parser.add_argument('-d', '--remote-port', action='store_true',
                        help='Treat port argument as remote, not local one.')
    parser.add_argument('-c', '--capture', metavar='driver',
                        help='Capture driver to use ("raw" or "nflog").')
    parser.add_argument('--debug', action='store_true', help='Verbose operation mode.')
    optz = parser.parse_args(argv if argv else sys.argv[1:])

    logging.basicConfig(level=logging.DEBUG if optz.debug else logging.INFO)
    global log
    log = logging.getLogger()

    if not optz.capture:
        optz.capture = 'nflog' if optz.ipset_name else 'raw'
        log.debug('Using {} packet capture interface'.format(optz.capture))
    elif optz.capture not in ['nflog', 'raw']:
        parser.error('Only "nflog" or "raw" values are supported with --capture option.')

    # Traffic to/from the same machine - no promisc-mode necessary
    conf.promisc = conf.sniff_promisc = False
    conf.verb = False  # Disables "Sent 1 packets." line

    if optz.capture == 'nflog':
        from scapy_nflog import install_nflog_listener
        install_nflog_listener()

    if optz.port:
        local, remote = get_endpoints(**{'port_local' if not optz.remote_port else 'port_remote': optz.port})
    elif optz.pid:
        local, remote = get_endpoints(pid=optz.pid)
    else:
        parser.error('No connection-picking criteria specified.')

    log.debug('Found connection: {0[0]}:{0[1]} -> {1[0]}:{1[1]}'.format(local, remote))

    ipset_update('add', optz.ipset_name, *local)
    try:
        TCPBreaker(port_local=local[1], port_remote=remote[1], timeout=True).run()
    finally:
        ipset_update('del', optz.ipset_name, *local)

if __name__ == '__main__':
    sys.exit(main())

作者:ClumsyLulz aka SleepTheGod Taylor Christian Newsome

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计