利用API构建AI驱动的自动化安全响应方案

本文详细介绍了如何通过Elastic API构建定制化AI驱动安全编排与响应系统,包含Python代码实现实时告警监控、AI辅助分析、Slack集成及自动化响应流程,提供完整的技术架构和实现方案。

通过Elastic API增强AI驱动的安全自动化工作流

安全团队面临着持续告警、复杂调查和资源有限的多重挑战。关键不仅在于检测威胁,更需快速有效地响应。某中心的安全解决方案长期提供检测、调查和响应的预置能力,但其真正的优势在于开放的API优先方法,使用户能够构建和自动化安全运营中心(SOC)的特定工作流。

本文将演示如何通过可扩展的API实现AI驱动的安全编排、自动化与响应(SOAR),通过常见案例说明如何结合某中心的安全平台与AI及Slack等协作工具,设计、自动化并持续优化响应方案。

工作流增强方案

某SOC检测工程团队面临关键告警响应时效不足的挑战。尽管已有工具改善平均响应时间(MTTR),团队仍难以应对告警量和紧急性。部分关键告警未能及时分类或解决。为此,团队寻求通过自动化、AI和某中心强大API进一步优化工作流。

设想一个工作流:关键告警被检测、分类并响应,分析师能够指导和监督每一步。以下是使用某中心灵活API和Slack等常见协作工具的解决方案实现步骤:

  1. 使用Elasticsearch Python客户端监控集群
  2. 监视标记为严重的告警
  3. 通过API将告警JSON发送至某中心安全AI助手
  4. 获取AI响应
  5. 将AI响应发布至Slack频道
  6. 在Slack消息中标记相关安全分析师以通知并支持直接操作

事件时间线

某中心安全收到受端点检测与响应保护的Windows服务器的严重告警,检测到Windows管理规范(WMI)被用于创建注册表键,实现对该关键服务器的持久访问。

Python代码可每分钟查询某中心以发现严重告警。发现告警时,提取完整JSON负载进行处理。以下为实时监控严重告警的Python示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
es = Elasticsearch(
    os.getenv("ELASTICSEARCH_URL"),
    api_key=ELASTICSEARCH_API_KEY
)

def monitor_alerts():
    from datetime import datetime, timedelta, timezone
    import time

    polling_interval = 60  # 每60秒轮询
    index_patterns = {
        "endpoint": {
            "pattern": ".ds-logs-endpoint.alerts*",
            "timestamp_field": "Responses.@timestamp"
        },
        "security": {
            "pattern": ".internal.alerts-security.alerts-default-*",
            "timestamp_field": "@timestamp"
        }
    }

    last_timestamps = {
        key: datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
        for key in index_patterns
    }

    while True:
        for key, config in index_patterns.items():
            pattern = config["pattern"]
            ts_field = config["timestamp_field"]

            query = {
                "query": {
                    "bool": {
                        "filter": [
                            {"range": {ts_field: {"gt": last_timestamps[key]}}},
                            {"term": {"kibana.alert.severity": "critical"}}
                        ]
                    }
                },
                "sort": [{ts_field: {"order": "asc"}}]
            }

            try:
                response = es.search(index=pattern, body=query)
                alerts = response['hits']['hits']
                if alerts:
                    print(f"Found {len(alerts)} critical alert(s) for {key} since {last_timestamps[key]}")
                    max_ts = last_timestamps[key]
                    for alert in alerts:
                        alert_id = alert.get('_id')
                        if alert_id in processed_alert_ids:
                            continue
                        processed_alert_ids.add(alert_id)

                        alert_json = alert['_source']
                        process_alert(alert_json)

                        alert_ts = get_custom_alert_timestamp(alert_json, ts_field)
                        if alert_ts and alert_ts > max_ts:
                            max_ts = alert_ts
                    dt = datetime.fromisoformat(max_ts.replace("Z", "+00:00"))
                    dt += timedelta(seconds=1)
                    last_timestamps[key] = dt.isoformat().replace("+00:00", "Z")
            except Exception as e:
                print(f"Error fetching critical alerts for {key}: {str(e)}")

        time.sleep(polling_interval)

捕获的JSON通过API请求发送至某中心安全AI助手。提示附加到JSON,为AI助手提供生成适当响应所需的上下文。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def process_alert(alert_json):
    alert_str = json.dumps(alert_json, indent=2)
    prompt = (
        f"Please look at this JSON:\n{alert_str}\n\n"
        "You are a security analyst and you want to run commands to start a forensic investigation for this alert. "
        "Make sure you summarize the alert in one paragraph. "
        "Summarize the alert and then give 5 commands that you would run to start your IR investigation. "
        "The next 3 commands should be dedicated to commands that would remediate the malware or malicious activity. "
        "The total commands should be 8. Make sure you include the affected host via the IP address in the alert summary. "
        "The commands should be in a list format and clearly separated into Investigation Commands and Remediation Commands. For example: \n"
        "Investigation Commands:\n1. command\n2. command\n... \nRemediation Commands:\n1. command\n2. command\n..."
    )    print("Alert Received")
    response = chat_complete(prompt)
    print("AI Responded")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def chat_complete(question, thread_ts=None, use_existing_conversation=False):
    url = f"{KIBANA_URL}/api/security_ai_assistant/chat/complete"
    headers = {
        "kbn-xsrf": "true",
        "Content-Type": "application/json",
    }
    
    payload = {
        "messages": [{"role": "user", "content": question}],
        "connectorId": CONNECTOR_ID,
        "persist": True
    }

    if use_existing_conversation and thread_ts and thread_ts in conversation_map:
        payload["conversationId"] = conversation_map[thread_ts]

    response = requests.post(
        url,
        auth=(USERNAME, PASSWORD),
        headers=headers,
        json=payload,
        stream=True
    )
    response.raise_for_status()

    full_response = ""
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            try:
                event = json.loads(decoded_line)
                if "data" in event:
                    full_response += event["data"]
                if "conversationId" in event and thread_ts:
                    conversation_map[thread_ts] = event["conversationId"]
            except json.JSONDecodeError:
                continue
    
    return full_response.strip()

AI助手接收API请求并使用大语言模型(LLM)生成响应,包括可操作的告警摘要以及推荐的调查和修复命令。

AI助手还可通过查询包含SOC分析师信息的自定义知识库,确定应在Slack消息中标记的分析师。此自定义知识能力极大增强了AI助手的有效性。

通过将机构知识集中到自定义知识库,AI助手最小化关键信息访问延迟,提高SOC操作工作流的速度和一致性。

AI助手返回响应后,使用Python将消息及适当分析师提及发布到特定Slack频道。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
response = chat_complete(prompt)
print("AI Responded")

if response:
    clean_response = response.replace("**", "") 

    inv_match = re.search(r"(?i)\s*Investigation Commands:\s*", clean_response)
    rem_match = re.search(r"(?i)\s*Remediation Commands:\s*", clean_response)

    if not inv_match or not rem_match:
        print("Could not find both 'Investigation Commands:' and 'Remediation Commands:' in the response.")
        return

    summary = clean_response[:inv_match.start()].strip()
    inv_commands_part = clean_response[inv_match.end():rem_match.start()].strip()
    rem_commands_part = clean_response[rem_match.end():].strip()

    inv_commands = re.findall(r"^\s*\d+\.\s+(.+)$", inv_commands_part, re.MULTILINE)
    rem_commands = re.findall(r"^\s*\d+\.\s+(.+)$", rem_commands_part, re.MULTILINE)

    # 从AI提取处理者(例如:"谁处理关键Windows告警?")
    handler_question = f"Who handles {operating_system.lower()} alerts? Respond with just their name."
    handler_response = chat_complete(handler_question).strip()
    handler_id = get_user_id_by_name(handler_response)
    handler_mention = f"<@{handler_id}>" if handler_id else handler_response

    full_message = (
        f"New Alert Detected\n\n{clean_response}\n\n"
        f"Alert assigned to:\n{handler_mention}\n\n"
        "Do you want me to run the investigation and remediation commands?"
    )

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": full_message
            }
        },
        {
            "type": "actions",
            "elements": [
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "Yes"},
                    "action_id": "run_commands",
                    "value": "run"
                },
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "No"},
                    "action_id": "do_not_run",
                    "value": "do_not_run"
                }
            ]
        }
    ]

    result = app.client.chat_postMessage(
        channel=ALERT_CHANNEL,
        blocks=blocks,
        text="New Alert Detected"
    )

    alert_data_map[result['ts']] = {
        "alert_json": alert_json,
        "investigation_commands": inv_commands,
        "remediation_commands": rem_commands,
        "host_ip": host_ip,
        "ai_summary": summary
    }

此时,Slack消息生成包含有意义的告警摘要、受影响主机及建议的调查和修复命令列表。分配给告警的分析师被@提及,并呈现Yes/No选项以批准在主机上执行命令。

在此场景中,分析师点击"Yes"批准自动化响应。

为执行命令,使用Python和Windows远程管理(WinRM)。从AI助手响应中提取受影响主机的IP地址和命令列表,并在目标机器上顺序执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import winrm

def execute_winrm_command(host_ip: str, command: str, winrm_domain: str, winrm_username: str, winrm_password: str) -> str:
    session = winrm.Session(
        host_ip,
        auth=(f"{winrm_domain}\\{winrm_username}", winrm_password),
        transport='ntlm'
    )
    full_cmd = (
        "$ProgressPreference = 'SilentlyContinue'; "
        "$VerbosePreference = 'SilentlyContinue'; "
        "$WarningPreference = 'SilentlyContinue'; "
        f"try {{ {command} | Format-List | Out-String -Width 120 }} "
        "catch {{ 'Error: ' + $_.Exception.Message }}"
    )
    result = session.run_ps(full_cmd)
    output = result.std_out.decode('utf-8', errors='ignore').strip()
    return output if output else "No output returned."

def execute_winrm_commands(host_ip: str, commands: list, winrm_domain: str, winrm_username: str, winrm_password: str) -> dict:

    outputs = {}
    session = winrm.Session(
        host_ip,
        auth=(f"{winrm_domain}\\{winrm_username}", winrm_password),
        transport='ntlm'
    )
    for command in commands:
        full_cmd = (
            "$ProgressPreference = 'SilentlyContinue'; "
            "$VerbosePreference = 'SilentlyContinue'; "
            "$WarningPreference = 'SilentlyContinue'; "
            f"try {{ {command} | Format-List | Out-String -Width 120 }} "
            "catch {{ 'Error: ' + $_.Exception.Message }}"
        )
        result = session.run_ps(full_cmd)
        output = result.std_out.decode('utf-8', errors='ignore').strip()
        outputs[command] = output if output else "No output returned."
    return outputs

所有操作完全可跟踪和审计。应用程序为每个事件在某中心安全中创建新案例,并附加所有AI摘要、运行的命令及响应期间生成的输出,确保捕获所有证据以供将来参考或合规需求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def create_case(alert_json, command_output, ai_summary):
    description = ai_summary + "\n\n"
    if command_output:
        description += "Commands Executed:\n\n"
        for cmd, output in command_output.items():
            truncated_output = output[:500] + "..." if len(output) > 500 else output
            description += f"Command: `{cmd}`\n\nOutput:\n```\n{truncated_output}\n```\n\n"

    if len(description) > 30000:
        description = description[:29997] + "..."

    title = "Investigation for Alert"

    case_payload = {
        "title": title,
        "description": description,
        "tags": ["auto-generated", "investigation"],
        "connector": {"id": "none", "name": "none", "type": ".none", "fields": None},
        "owner": "securitySolution",
        "settings": {"syncAlerts": True},
        "severity": "medium"
    }

    url = f"{KIBANA_URL}/api/cases"
    headers = {"kbn-xsrf": "true", "Content-Type": "application/json"}
    
    try:
        response = requests.post(
            url,
            auth=(USERNAME, PASSWORD),
            headers=headers,
            json=case_payload
        )
        if response.status_code == 200:
            case_id = response.json().get('id')
            print(f"Successfully created case: {case_id}")
            return case_id
        else:
            print(f"Failed to create case: {response.status_code} - {response.text}")
            return None
    except requests.RequestException as e:
        print(f"Request failed: {str(e)}")
        return None

无与伦比的灵活性

某中心安全远超开箱即用的检测和响应,支持与企业AI和Slack等协作工具无缝集成的自定义自动化管道。

使用强大的API和开源基础,能够:

  • 以编程方式监控严重告警
  • 使用某中心安全AI助手生成上下文丰富、可操作的响应
  • 以可控、可审计的方式自动化分类和修复
  • 直接集成到分析师日常使用的协作工具中

开放的API优先架构为安全团队提供无与伦比的灵活性,以构建最适合的工作流。

这是构建于某中心之上的力量。无论是通过AI扩展响应工作、自定义告警工作流,还是将某中心嵌入现有SecOps堆栈,平台都设计为按需工作。

准备构建自己的方案?

开始使用某中心AI助手API文档构建工作流,或联系深入了解某中心开放平台如何满足独特的SOC需求。

文中描述的任何功能或特性的发布和时间安排均由某中心自行决定。当前不可用的任何功能或特性可能无法按时交付或根本不交付。

本文可能使用或引用了第三方生成式AI工具,这些工具由其各自所有者拥有和运营。某中心不对第三方工具有任何控制,对其内容、操作或使用不承担任何责任或义务,也不对因使用此类工具而可能产生的任何损失或损害承担责任。使用AI工具处理个人、敏感或机密信息时请谨慎。提交的任何数据可能用于AI训练或其他目的。不保证所提供信息的安全性或机密性。使用前应熟悉任何生成式AI工具的隐私实践和使用条款。

某中心、Elasticsearch及相关标记是某中心在某些国家地区的商标、徽标或注册商标。所有其他公司和产品名称均为其各自所有者的商标、徽标或注册商标。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计