通过Elastic API增强AI驱动的安全自动化工作流
安全团队面临着持续告警、复杂调查和资源有限的多重挑战。关键不仅在于检测威胁,更需快速有效地响应。某中心的安全解决方案长期提供检测、调查和响应的预置能力,但其真正的优势在于开放的API优先方法,使用户能够构建和自动化安全运营中心(SOC)的特定工作流。
本文将演示如何通过可扩展的API实现AI驱动的安全编排、自动化与响应(SOAR),通过常见案例说明如何结合某中心的安全平台与AI及Slack等协作工具,设计、自动化并持续优化响应方案。
工作流增强方案
某SOC检测工程团队面临关键告警响应时效不足的挑战。尽管已有工具改善平均响应时间(MTTR),团队仍难以应对告警量和紧急性。部分关键告警未能及时分类或解决。为此,团队寻求通过自动化、AI和某中心强大API进一步优化工作流。
设想一个工作流:关键告警被检测、分类并响应,分析师能够指导和监督每一步。以下是使用某中心灵活API和Slack等常见协作工具的解决方案实现步骤:
- 使用Elasticsearch Python客户端监控集群
- 监视标记为严重的告警
- 通过API将告警JSON发送至某中心安全AI助手
- 获取AI响应
- 将AI响应发布至Slack频道
- 在Slack消息中标记相关安全分析师以通知并支持直接操作
事件时间线
某中心安全收到受端点检测与响应保护的Windows服务器的严重告警,检测到Windows管理规范(WMI)被用于创建注册表键,实现对该关键服务器的持久访问。
Python代码可每分钟查询某中心以发现严重告警。发现告警时,提取完整JSON负载进行处理。以下为实时监控严重告警的Python示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
es = Elasticsearch(
os.getenv("ELASTICSEARCH_URL"),
api_key=ELASTICSEARCH_API_KEY
)
def monitor_alerts():
from datetime import datetime, timedelta, timezone
import time
polling_interval = 60 # 每60秒轮询
index_patterns = {
"endpoint": {
"pattern": ".ds-logs-endpoint.alerts*",
"timestamp_field": "Responses.@timestamp"
},
"security": {
"pattern": ".internal.alerts-security.alerts-default-*",
"timestamp_field": "@timestamp"
}
}
last_timestamps = {
key: datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
for key in index_patterns
}
while True:
for key, config in index_patterns.items():
pattern = config["pattern"]
ts_field = config["timestamp_field"]
query = {
"query": {
"bool": {
"filter": [
{"range": {ts_field: {"gt": last_timestamps[key]}}},
{"term": {"kibana.alert.severity": "critical"}}
]
}
},
"sort": [{ts_field: {"order": "asc"}}]
}
try:
response = es.search(index=pattern, body=query)
alerts = response['hits']['hits']
if alerts:
print(f"Found {len(alerts)} critical alert(s) for {key} since {last_timestamps[key]}")
max_ts = last_timestamps[key]
for alert in alerts:
alert_id = alert.get('_id')
if alert_id in processed_alert_ids:
continue
processed_alert_ids.add(alert_id)
alert_json = alert['_source']
process_alert(alert_json)
alert_ts = get_custom_alert_timestamp(alert_json, ts_field)
if alert_ts and alert_ts > max_ts:
max_ts = alert_ts
dt = datetime.fromisoformat(max_ts.replace("Z", "+00:00"))
dt += timedelta(seconds=1)
last_timestamps[key] = dt.isoformat().replace("+00:00", "Z")
except Exception as e:
print(f"Error fetching critical alerts for {key}: {str(e)}")
time.sleep(polling_interval)
|
捕获的JSON通过API请求发送至某中心安全AI助手。提示附加到JSON,为AI助手提供生成适当响应所需的上下文。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def process_alert(alert_json):
alert_str = json.dumps(alert_json, indent=2)
prompt = (
f"Please look at this JSON:\n{alert_str}\n\n"
"You are a security analyst and you want to run commands to start a forensic investigation for this alert. "
"Make sure you summarize the alert in one paragraph. "
"Summarize the alert and then give 5 commands that you would run to start your IR investigation. "
"The next 3 commands should be dedicated to commands that would remediate the malware or malicious activity. "
"The total commands should be 8. Make sure you include the affected host via the IP address in the alert summary. "
"The commands should be in a list format and clearly separated into Investigation Commands and Remediation Commands. For example: \n"
"Investigation Commands:\n1. command\n2. command\n... \nRemediation Commands:\n1. command\n2. command\n..."
) print("Alert Received")
response = chat_complete(prompt)
print("AI Responded")
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
def chat_complete(question, thread_ts=None, use_existing_conversation=False):
url = f"{KIBANA_URL}/api/security_ai_assistant/chat/complete"
headers = {
"kbn-xsrf": "true",
"Content-Type": "application/json",
}
payload = {
"messages": [{"role": "user", "content": question}],
"connectorId": CONNECTOR_ID,
"persist": True
}
if use_existing_conversation and thread_ts and thread_ts in conversation_map:
payload["conversationId"] = conversation_map[thread_ts]
response = requests.post(
url,
auth=(USERNAME, PASSWORD),
headers=headers,
json=payload,
stream=True
)
response.raise_for_status()
full_response = ""
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
try:
event = json.loads(decoded_line)
if "data" in event:
full_response += event["data"]
if "conversationId" in event and thread_ts:
conversation_map[thread_ts] = event["conversationId"]
except json.JSONDecodeError:
continue
return full_response.strip()
|
AI助手接收API请求并使用大语言模型(LLM)生成响应,包括可操作的告警摘要以及推荐的调查和修复命令。
AI助手还可通过查询包含SOC分析师信息的自定义知识库,确定应在Slack消息中标记的分析师。此自定义知识能力极大增强了AI助手的有效性。
通过将机构知识集中到自定义知识库,AI助手最小化关键信息访问延迟,提高SOC操作工作流的速度和一致性。
AI助手返回响应后,使用Python将消息及适当分析师提及发布到特定Slack频道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
response = chat_complete(prompt)
print("AI Responded")
if response:
clean_response = response.replace("**", "")
inv_match = re.search(r"(?i)\s*Investigation Commands:\s*", clean_response)
rem_match = re.search(r"(?i)\s*Remediation Commands:\s*", clean_response)
if not inv_match or not rem_match:
print("Could not find both 'Investigation Commands:' and 'Remediation Commands:' in the response.")
return
summary = clean_response[:inv_match.start()].strip()
inv_commands_part = clean_response[inv_match.end():rem_match.start()].strip()
rem_commands_part = clean_response[rem_match.end():].strip()
inv_commands = re.findall(r"^\s*\d+\.\s+(.+)$", inv_commands_part, re.MULTILINE)
rem_commands = re.findall(r"^\s*\d+\.\s+(.+)$", rem_commands_part, re.MULTILINE)
# 从AI提取处理者(例如:"谁处理关键Windows告警?")
handler_question = f"Who handles {operating_system.lower()} alerts? Respond with just their name."
handler_response = chat_complete(handler_question).strip()
handler_id = get_user_id_by_name(handler_response)
handler_mention = f"<@{handler_id}>" if handler_id else handler_response
full_message = (
f"New Alert Detected\n\n{clean_response}\n\n"
f"Alert assigned to:\n{handler_mention}\n\n"
"Do you want me to run the investigation and remediation commands?"
)
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": full_message
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Yes"},
"action_id": "run_commands",
"value": "run"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "No"},
"action_id": "do_not_run",
"value": "do_not_run"
}
]
}
]
result = app.client.chat_postMessage(
channel=ALERT_CHANNEL,
blocks=blocks,
text="New Alert Detected"
)
alert_data_map[result['ts']] = {
"alert_json": alert_json,
"investigation_commands": inv_commands,
"remediation_commands": rem_commands,
"host_ip": host_ip,
"ai_summary": summary
}
|
此时,Slack消息生成包含有意义的告警摘要、受影响主机及建议的调查和修复命令列表。分配给告警的分析师被@提及,并呈现Yes/No选项以批准在主机上执行命令。
在此场景中,分析师点击"Yes"批准自动化响应。
为执行命令,使用Python和Windows远程管理(WinRM)。从AI助手响应中提取受影响主机的IP地址和命令列表,并在目标机器上顺序执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import winrm
def execute_winrm_command(host_ip: str, command: str, winrm_domain: str, winrm_username: str, winrm_password: str) -> str:
session = winrm.Session(
host_ip,
auth=(f"{winrm_domain}\\{winrm_username}", winrm_password),
transport='ntlm'
)
full_cmd = (
"$ProgressPreference = 'SilentlyContinue'; "
"$VerbosePreference = 'SilentlyContinue'; "
"$WarningPreference = 'SilentlyContinue'; "
f"try {{ {command} | Format-List | Out-String -Width 120 }} "
"catch {{ 'Error: ' + $_.Exception.Message }}"
)
result = session.run_ps(full_cmd)
output = result.std_out.decode('utf-8', errors='ignore').strip()
return output if output else "No output returned."
def execute_winrm_commands(host_ip: str, commands: list, winrm_domain: str, winrm_username: str, winrm_password: str) -> dict:
outputs = {}
session = winrm.Session(
host_ip,
auth=(f"{winrm_domain}\\{winrm_username}", winrm_password),
transport='ntlm'
)
for command in commands:
full_cmd = (
"$ProgressPreference = 'SilentlyContinue'; "
"$VerbosePreference = 'SilentlyContinue'; "
"$WarningPreference = 'SilentlyContinue'; "
f"try {{ {command} | Format-List | Out-String -Width 120 }} "
"catch {{ 'Error: ' + $_.Exception.Message }}"
)
result = session.run_ps(full_cmd)
output = result.std_out.decode('utf-8', errors='ignore').strip()
outputs[command] = output if output else "No output returned."
return outputs
|
所有操作完全可跟踪和审计。应用程序为每个事件在某中心安全中创建新案例,并附加所有AI摘要、运行的命令及响应期间生成的输出,确保捕获所有证据以供将来参考或合规需求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
def create_case(alert_json, command_output, ai_summary):
description = ai_summary + "\n\n"
if command_output:
description += "Commands Executed:\n\n"
for cmd, output in command_output.items():
truncated_output = output[:500] + "..." if len(output) > 500 else output
description += f"Command: `{cmd}`\n\nOutput:\n```\n{truncated_output}\n```\n\n"
if len(description) > 30000:
description = description[:29997] + "..."
title = "Investigation for Alert"
case_payload = {
"title": title,
"description": description,
"tags": ["auto-generated", "investigation"],
"connector": {"id": "none", "name": "none", "type": ".none", "fields": None},
"owner": "securitySolution",
"settings": {"syncAlerts": True},
"severity": "medium"
}
url = f"{KIBANA_URL}/api/cases"
headers = {"kbn-xsrf": "true", "Content-Type": "application/json"}
try:
response = requests.post(
url,
auth=(USERNAME, PASSWORD),
headers=headers,
json=case_payload
)
if response.status_code == 200:
case_id = response.json().get('id')
print(f"Successfully created case: {case_id}")
return case_id
else:
print(f"Failed to create case: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
print(f"Request failed: {str(e)}")
return None
|
无与伦比的灵活性
某中心安全远超开箱即用的检测和响应,支持与企业AI和Slack等协作工具无缝集成的自定义自动化管道。
使用强大的API和开源基础,能够:
- 以编程方式监控严重告警
- 使用某中心安全AI助手生成上下文丰富、可操作的响应
- 以可控、可审计的方式自动化分类和修复
- 直接集成到分析师日常使用的协作工具中
开放的API优先架构为安全团队提供无与伦比的灵活性,以构建最适合的工作流。
这是构建于某中心之上的力量。无论是通过AI扩展响应工作、自定义告警工作流,还是将某中心嵌入现有SecOps堆栈,平台都设计为按需工作。
准备构建自己的方案?
开始使用某中心AI助手API文档构建工作流,或联系深入了解某中心开放平台如何满足独特的SOC需求。
文中描述的任何功能或特性的发布和时间安排均由某中心自行决定。当前不可用的任何功能或特性可能无法按时交付或根本不交付。
本文可能使用或引用了第三方生成式AI工具,这些工具由其各自所有者拥有和运营。某中心不对第三方工具有任何控制,对其内容、操作或使用不承担任何责任或义务,也不对因使用此类工具而可能产生的任何损失或损害承担责任。使用AI工具处理个人、敏感或机密信息时请谨慎。提交的任何数据可能用于AI训练或其他目的。不保证所提供信息的安全性或机密性。使用前应熟悉任何生成式AI工具的隐私实践和使用条款。
某中心、Elasticsearch及相关标记是某中心在某些国家地区的商标、徽标或注册商标。所有其他公司和产品名称均为其各自所有者的商标、徽标或注册商标。