本文详细分析了一个基于Python实现的P2P加密聊天客户端,包含AES和XOR双重加密机制、UDP协议通信、文件传输功能以及安全防护措施,展示了完整的端到端加密通信技术架构。
实际P2P OTR客户端 - 技术分析与披露
代码实现概述
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#!/usr/bin/python
# NSA P2P Chat Client
# -*- coding: utf-8 -*-
import sys
import socket
import base64
import random
import string
import hashlib
from Crypto.Cipher import AES
from Crypto import Random
from threading import Thread
import getpass
|
核心加密类实现
CryptoClient 类初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
class CryptoClient(object):
def __init__(self):
print("欢迎使用CryptoChat,一个安全的P2P聊天客户端")
print("你想要加密吗?好的,小伙子,开始吧!!!")
self.IP = input("请输入要聊天的IP地址: ")
self.PORT = int(input("请输入通信端口: "))
print()
print("现在为不同的加密方法输入密钥,请确保它们不同")
print("出于安全考虑,密钥不会显示")
print()
self.EncryptKeyXOR = getpass.getpass("输入XOR加密的密钥: ")
self.EncryptKeyAES = hashlib.md5(getpass.getpass("输入AES的安全密码: ").encode()).hexdigest()
print()
input("当两个客户端都准备好时按回车")
### AES填充处理
BS = 16
self.pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
self.unpad = lambda s: s[:-ord(s[len(s) - 1:])]
### 启动聊天服务器和客户端
try:
Thread(target=self.RecvMSG, args=()).start()
except socket.error as e:
print(self.IP + " 还没准备好!当 " + self.IP + " 准备好时按回车")
input()
Thread(target=self.RecvMSG, args=()).start()
self.SendMSG()
|
AES加密解密方法
1
2
3
4
5
6
7
8
9
10
11
|
def EncryptAES(self, raw):
raw = self.pad(raw)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.EncryptKeyAES.encode(), AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(raw.encode())).decode()
def DecryptAES(self, enc):
enc = base64.b64decode(enc)
iv = enc[:16]
cipher = AES.new(self.EncryptKeyAES.encode(), AES.MODE_CBC, iv)
return self.unpad(cipher.decrypt(enc[16:])).decode()
|
XOR加密方法
1
2
3
4
5
6
7
8
9
10
|
def XOR(self, securekey, data, mode):
if mode == 1:
securekey = securekey[::-1]
temp = ''
for char in securekey:
for x in data:
temp += chr(ord(x) ^ ord(char))
data = temp
temp = ''
return data
|
消息加密解密包装
1
2
3
4
5
6
7
8
9
|
def EncryptMSG(self, data):
data = self.XOR(self.EncryptKeyXOR, data, 0)
data = self.EncryptAES(data)
return data
def DecryptMSG(self, data):
data = self.DecryptAES(data)
data = self.XOR(self.EncryptKeyXOR, data, 1)
return data
|
消息发送功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def SendMSG(self):
clientsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
print()
print("你现在正在与 '" + self.IP + "' 对话")
while True:
message = input("")
if message.startswith("/send"): # 发送文件命令
self.SendFILE(message.split(" ")[1])
continue
if message == "/leave":
message = self.EncryptMSG("\x03")
clientsock.sendto(message.encode(), (self.IP, self.PORT))
sys.exit(0)
if message.startswith("/msg"):
self.IP = message.split(" ")[1]
print("[CLIENT] 你现在正在与 '" + self.IP + "' 对话")
continue
else:
message = self.EncryptMSG("\x01" + message)
clientsock.sendto(message.encode(), (self.IP, self.PORT))
|
文件传输功能
1
2
3
4
5
6
7
8
9
10
11
|
def SendFILE(self, file_):
if file_.startswith(".") or file_.startswith("/"): # 安全措施
print("[CLIENT] 出于安全和保护原因,不会发送以 '.' 或 '/' 开头的文件名。中止操作")
else:
clientsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
data = "\x02" + file_ + "\xFF"
with open(file_, "rb") as f:
data += f.read()
data = self.EncryptMSG(data)
clientsock.sendto(data.encode(), (self.IP, self.PORT))
print("[CLIENT] 文件已发送!")
|
随机字符串生成
1
2
|
def RandStr(self, length):
return ''.join(random.choice(string.ascii_letters) for _ in range(length))
|
消息接收处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
def RecvMSG(self):
serversocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
serversocket.bind(('', self.PORT))
while True:
data, addr = serversocket.recvfrom(1073741824) # 缓冲区大小为1GB(用于大文件/图像)
data = self.DecryptMSG(data.decode())
if data.startswith("\x02"):
filename = ''
data = list(data)
del data[0]
for i in data:
if i == "\xFF":
break
else:
filename += i
del data[0] # 删除协议字符
del data[:len(filename)] # 删除文件结束字符
if filename.startswith(".") or filename.startswith("/"): # 尝试利用!
print("[!!!警报!!!] " + addr[0] + " 试图覆盖你的 " + filename)
else:
print("[CLIENT] " + addr[0] + " 发送了 " + filename)
print("[CLIENT] 正在下载...") # 下载那个东西
data = ''.join(data)
with open(filename, "wb") as f:
f.write(data.encode())
print("[CLIENT] 已保存")
elif data.startswith("\x01"): # 所有消息以"\x01"开头以防止文件垃圾邮件
data = list(data)
del data[0]
data = ''.join(data)
print("[" + addr[0] + "] > | " + data)
elif data.startswith("\x03"):
print("[CLIENT] " + addr[0] + " 已离开")
sys.exit(0)
|
主程序入口
1
2
|
if __name__ == "__main__":
CryptoClient()
|
技术特性总结
- 双重加密机制:结合AES CBC模式和自定义XOR加密
- P2P通信架构:基于UDP协议的端到端直接通信
- 安全文件传输:支持加密文件传输,包含安全防护措施
- 多线程处理:独立的接收消息线程确保实时通信
- 协议标识符:使用特殊字符标识消息类型(文本、文件、离开)
- 安全防护:防止文件名注入攻击和路径遍历漏洞
该实现展示了完整的端到端加密通信系统,具有实际应用价值和技术参考意义。