背景/挑战
AWS CloudTrail日志记录了AWS账户中所有API调用的完整历史,提供了关于谁在何时访问了哪些资源的宝贵信息。然而,由于日志量大且复杂,手动分析这些日志非常困难。安全团队需要一种高效的方法来:
- 识别异常访问模式
- 检测潜在安全威胁
- 理解资源使用模式
- 从技术日志数据生成人类可读的报告
我的方法结合了AWS原生服务和生成式AI,将原始日志数据转化为可操作的安全洞察。通过利用Amazon Bedrock和Strands Agent框架的力量,我创建了一个可扩展的自动化系统,显著减少了CloudTrail分析所需的手动工作,同时提供了比传统方法更全面的结果。
解决方案概述
该解决方案利用AWS CloudTrail日志、Strands Agents和Amazon Bedrock的生成式AI能力,自动分析访问模式并生成有洞察力的报告。系统查询CloudTrail日志,执行模式分析,并使用Anthropic Claude(通过Amazon Bedrock)将原始数据转化为可操作的安全洞察。
先决条件
AWS资源
- 启用CloudTrail的AWS账户
- IAM权限(根据需要添加更多):
- CloudTrail:LookupEvents
- Bedrock:InvokeModel
Python环境
- Python 3.12+
- 所需包:
- boto3
- Strands Agents SDK(用于代理框架)
配置
- 本地配置的AWS凭证(通过AWS CLI或环境变量)
- Amazon Bedrock访问Claude模型(us.anthropic.claude-3-5-sonnet-20241022-v2:0)
解决方案架构概述
设置环境
按照快速入门指南创建Strands代理项目。环境准备就绪后,将agent.py替换为trailInsightAgent.py,并添加如下所示的文件。
解决方案包括两个主要组件:
1. 编排层(trailInsightAgent.py)
- 使用Strands Agent框架管理工作流
- 注册
trail_analysis
工具(在queryCloudTrail.py中用’@tool’装饰)
- AI驱动的洞察生成执行分析并显示结果
- 连接到Amazon Bedrock
- 将分析数据发送给Claude,附带专门提示
- 处理AI生成的响应
- 返回格式化的洞察
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# trailInsightAgent.py
from strands import Agent, tool
from queryCloudTrail import trail_analysis
def main():
# 使用trail_analysis工具初始化代理
agent = Agent(tools=[trail_analysis])
# 定义CloudTrail分析的提示
prompt = """Review the cloudtrail logs for the last 3 days and provide a report in a tabular format. \
Focus on identifying unusual access patterns and security concerns, and give remediation to address any findings."""
# 使用消息执行代理
response = agent(prompt)
# 打印响应
print(response)
if __name__ == "__main__":
main()
|
2. CloudTrail日志检索(queryCloudTrail.py)
该组件包含三个函数:
第一个函数query_cloudtrail_logs
使用AWS SDK(boto3)检索CloudTrail事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#queryCloudTrail.py
import boto3
from datetime import datetime, timedelta
from strands import tool
region="us-west-2" #从环境变量读取区域
def query_cloudtrail_logs(
days=7,
max_results=10
):
# 创建CloudTrail客户端
client = boto3.client('cloudtrail', region_name=region)
# 计算开始和结束时间
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# 查询参数
params = {
'StartTime': start_time,
'EndTime': end_time,
'MaxResults': max_results
}
# 执行查询
response = client.lookup_events(**params)
return response['Events']
|
第二个函数analyze_access_patterns
处理CloudTrail事件以识别模式:
- 最频繁的API调用
- 最活跃的用户
- 最常访问的AWS服务
- 最常访问的资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#访问模式分析(queryCloudTrail.py)
def analyze_access_patterns(events):
# 初始化计数器
event_counts = {}
user_counts = {}
resource_counts = {}
service_counts = {}
for event in events:
# 按名称计数事件
event_name = event.get('EventName', 'Unknown')
event_counts[event_name] = event_counts.get(event_name, 0) + 1
# 按用户计数事件
username = event.get('Username', 'Unknown')
user_counts[username] = user_counts.get(username, 0) + 1
# 从事件源提取服务名称
event_source = event.get('EventSource', '')
service = event_source.split('.')[0] if '.' in event_source else event_source
service_counts[service] = service_counts.get(service, 0) + 1
# 计数访问的资源
if 'Resources' in event:
for resource in event['Resources']:
resource_name = resource.get('ResourceName', 'Unknown')
resource_counts[resource_name] = resource_counts.get(resource_name, 0) + 1
return {
'event_counts': event_counts,
'user_counts': user_counts,
'service_counts': service_counts,
'resource_counts': resource_counts
}
|
第三个函数trail_analysis
将所有内容整合在一起:
- 检索最近3天的CloudTrail日志
- 分析访问模式
- 返回格式化的洞察
- 添加错误逻辑以扩展此功能
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Trail_analysis工具(queryCloudTrail.py)
@tool
def trail_analysis() -> str:
# 查询CloudTrail日志(根据需要自定义参数)
events = query_cloudtrail_logs(
days=3, # 回溯3天
max_results=10 # 获取最多100个结果
)
# 分析访问模式
analysis = analyze_access_patterns(events)
return analysis
|
验证
要测试此解决方案,请在终端窗口中运行以下命令。确保您在logAgent目录中。
1
|
python3 trailInsightAgent.py
|
总结
在本文中,我展示了这种架构如何自动化AWS CloudTrail日志分析过程,减少手动工作并改善安全洞察。该解决方案结合了CloudTrail数据检索、模式分析和生成式AI,将复杂的日志数据转化为可操作的安全建议。通过利用Amazon Bedrock和Strands Agent框架,我创建了一个系统,解决了CloudTrail日志复杂性和量大的问题,同时提供了有意义的安全洞察。
尝试在您自己的AWS环境中使用这种方法,并在评论中分享您的反馈和问题。您可以通过在AWS Lambda中托管并使用API Gateway暴露它、添加计划执行、与安全信息和事件管理(SIEM)系统集成,或根据您的特定安全需求自定义分析来扩展此解决方案。
成本考虑
虽然此解决方案提供了自动化分析能力,但可以通过几种策略有效管理成本:
- 调整查询频率:按适当间隔安排分析,而不是按需运行
- 优化查询大小:限制
max_results
参数以仅检索必要数据
- 微调bedrock使用:根据所需的详细级别调整令牌限制
- 使用目标过滤器:应用特定过滤器(用户名、事件类型)以关注相关数据
主要成本驱动因素是:
- CloudTrail存储
- Amazon Bedrock API调用
请记住,如果您仅验证解决方案,请在实现此架构后删除所有资源,以避免产生不必要的成本。