利用AI驱动的AWS CloudTrail分析:Strands Agent与Amazon Bedrock实现智能访问模式检测

本文介绍了一种结合AWS CloudTrail日志、Strands Agent框架和Amazon Bedrock生成式AI的解决方案,用于自动化分析AWS访问模式,检测安全威胁,并生成可操作的安全洞察报告。

背景/挑战

AWS CloudTrail日志记录了AWS账户中所有API调用的完整历史,提供了关于谁在何时访问了哪些资源的宝贵信息。然而,由于日志量大且复杂,手动分析这些日志非常困难。安全团队需要一种高效的方法来:

  • 识别异常访问模式
  • 检测潜在安全威胁
  • 理解资源使用模式
  • 从技术日志数据生成人类可读的报告

我的方法结合了AWS原生服务和生成式AI,将原始日志数据转化为可操作的安全洞察。通过利用Amazon Bedrock和Strands Agent框架的力量,我创建了一个可扩展的自动化系统,显著减少了CloudTrail分析所需的手动工作,同时提供了比传统方法更全面的结果。

解决方案概述

该解决方案利用AWS CloudTrail日志、Strands Agents和Amazon Bedrock的生成式AI能力,自动分析访问模式并生成有洞察力的报告。系统查询CloudTrail日志,执行模式分析,并使用Anthropic Claude(通过Amazon Bedrock)将原始数据转化为可操作的安全洞察。

先决条件

AWS资源

  • 启用CloudTrail的AWS账户
  • IAM权限(根据需要添加更多):
    • CloudTrail:LookupEvents
    • Bedrock:InvokeModel

Python环境

  • Python 3.12+
  • 所需包:
    • boto3
    • Strands Agents SDK(用于代理框架)

配置

  • 本地配置的AWS凭证(通过AWS CLI或环境变量)
  • Amazon Bedrock访问Claude模型(us.anthropic.claude-3-5-sonnet-20241022-v2:0)

解决方案架构概述

设置环境

按照快速入门指南创建Strands代理项目。环境准备就绪后,将agent.py替换为trailInsightAgent.py,并添加如下所示的文件。

解决方案包括两个主要组件:

1. 编排层(trailInsightAgent.py)

  • 使用Strands Agent框架管理工作流
  • 注册trail_analysis工具(在queryCloudTrail.py中用’@tool’装饰)
  • AI驱动的洞察生成执行分析并显示结果
  • 连接到Amazon Bedrock
  • 将分析数据发送给Claude,附带专门提示
  • 处理AI生成的响应
  • 返回格式化的洞察
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# trailInsightAgent.py

from strands import Agent, tool
from queryCloudTrail import trail_analysis

def main():
    # 使用trail_analysis工具初始化代理
    agent = Agent(tools=[trail_analysis])
    # 定义CloudTrail分析的提示
    prompt = """Review the cloudtrail logs for the last 3 days and provide a report in a tabular format. \
    Focus on identifying unusual access patterns and security concerns, and give remediation to address any findings."""

    # 使用消息执行代理
    response = agent(prompt)
    # 打印响应
    print(response)

if __name__ == "__main__":
    main()

2. CloudTrail日志检索(queryCloudTrail.py)

该组件包含三个函数:

第一个函数query_cloudtrail_logs使用AWS SDK(boto3)检索CloudTrail事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#queryCloudTrail.py

import boto3
from datetime import datetime, timedelta
from strands import tool
region="us-west-2"  #从环境变量读取区域

def query_cloudtrail_logs(
    days=7,
    max_results=10
):
    # 创建CloudTrail客户端
    client = boto3.client('cloudtrail', region_name=region)
    # 计算开始和结束时间
    end_time = datetime.now()
    start_time = end_time - timedelta(days=days)
    # 查询参数
    params = {
        'StartTime': start_time,
        'EndTime': end_time,
        'MaxResults': max_results
    }
    # 执行查询
    response = client.lookup_events(**params)
    return response['Events']

第二个函数analyze_access_patterns处理CloudTrail事件以识别模式:

  • 最频繁的API调用
  • 最活跃的用户
  • 最常访问的AWS服务
  • 最常访问的资源
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#访问模式分析(queryCloudTrail.py)

def analyze_access_patterns(events):
    # 初始化计数器
    event_counts = {}
    user_counts = {}
    resource_counts = {}
    service_counts = {}

    for event in events:
        # 按名称计数事件
        event_name = event.get('EventName', 'Unknown')
        event_counts[event_name] = event_counts.get(event_name, 0) + 1

        # 按用户计数事件
        username = event.get('Username', 'Unknown')
        user_counts[username] = user_counts.get(username, 0) + 1

        # 从事件源提取服务名称
        event_source = event.get('EventSource', '')
        service = event_source.split('.')[0] if '.' in event_source else event_source

        service_counts[service] = service_counts.get(service, 0) + 1

        # 计数访问的资源
        if 'Resources' in event:
            for resource in event['Resources']:
                resource_name = resource.get('ResourceName', 'Unknown')
                resource_counts[resource_name] = resource_counts.get(resource_name, 0) + 1

    return {
        'event_counts': event_counts,
        'user_counts': user_counts,
        'service_counts': service_counts,
        'resource_counts': resource_counts
    }

第三个函数trail_analysis将所有内容整合在一起:

  • 检索最近3天的CloudTrail日志
  • 分析访问模式
  • 返回格式化的洞察
  • 添加错误逻辑以扩展此功能
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Trail_analysis工具(queryCloudTrail.py)

@tool
def trail_analysis() -> str:
    # 查询CloudTrail日志(根据需要自定义参数)
    events = query_cloudtrail_logs(
        days=3,           # 回溯3天
        max_results=10    # 获取最多100个结果
    )

    # 分析访问模式
    analysis = analyze_access_patterns(events)
    return analysis

验证

要测试此解决方案,请在终端窗口中运行以下命令。确保您在logAgent目录中。

1
python3 trailInsightAgent.py

总结

在本文中,我展示了这种架构如何自动化AWS CloudTrail日志分析过程,减少手动工作并改善安全洞察。该解决方案结合了CloudTrail数据检索、模式分析和生成式AI,将复杂的日志数据转化为可操作的安全建议。通过利用Amazon Bedrock和Strands Agent框架,我创建了一个系统,解决了CloudTrail日志复杂性和量大的问题,同时提供了有意义的安全洞察。

尝试在您自己的AWS环境中使用这种方法,并在评论中分享您的反馈和问题。您可以通过在AWS Lambda中托管并使用API Gateway暴露它、添加计划执行、与安全信息和事件管理(SIEM)系统集成,或根据您的特定安全需求自定义分析来扩展此解决方案。

成本考虑

虽然此解决方案提供了自动化分析能力,但可以通过几种策略有效管理成本:

  • 调整查询频率:按适当间隔安排分析,而不是按需运行
  • 优化查询大小:限制max_results参数以仅检索必要数据
  • 微调bedrock使用:根据所需的详细级别调整令牌限制
  • 使用目标过滤器:应用特定过滤器(用户名、事件类型)以关注相关数据

主要成本驱动因素是:

  • CloudTrail存储
  • Amazon Bedrock API调用

请记住,如果您仅验证解决方案,请在实现此架构后删除所有资源,以避免产生不必要的成本。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计