在日趋复杂的网络安全环境下,仅依靠传统的边界防御和签名检测已远远不够。网络态势感知(Cyber Situational Awareness)代表了安全防御体系从被动响应向主动感知的演进,它通过持续监控网络流量、建立行为基线、检测异常偏移,帮助安全团队在攻击造成实质损害之前发现威胁。本文将深入讲解网络态势感知的概念与架构,介绍流量异常检测的核心技术,并通过 Zeek 网络安全监控框架和 Grafana 可视化的实战操作,带你构建一套基础的态势感知平台。
网络态势感知概念与体系架构
什么是网络态势感知
网络态势感知是指对网络环境中各类安全要素的感知、理解和预测能力。它包含三个层次:
- 态势觉察(Perception):采集和识别网络中的安全相关事件和数据,如流量数据、日志、告警等
- 态势理解(Comprehension):关联分析采集到的数据,理解当前网络安全状态和威胁态势
- 态势预测(Projection):基于历史数据和当前态势,预测未来可能面临的安全威胁和风险
体系架构
一套完整的态势感知平台通常包含以下层次:
┌──────────────────────────────────────────────────────────┐
│ 展示层 (Grafana/Kibana) │
│ 仪表盘 · 态势大屏 · 告警中心 · 报告 │
├──────────────────────────────────────────────────────────┤
│ 分析层 (分析引擎) │
│ 关联分析 · 异常检测 · 威胁建模 · 机器学习 │
├──────────────────────────────────────────────────────────┤
│ 存储层 (Elasticsearch/ClickHouse) │
│ 流量日志 · 安全事件 · 威胁情报 · 资产信息 │
├──────────────────────────────────────────────────────────┤
│ 采集层 (Zeek/Suricata/Filebeat) │
│ 网络流量 · 系统日志 · 安全设备日志 · 应用日志 │
├──────────────────────────────────────────────────────────┤
│ 数据源 (网络环境) │
│ 交换机镜像 · TAP设备 · Agent · 网络探针 │
└──────────────────────────────────────────────────────────┘
流量基线建立方法
异常检测的前提是知道”正常”是什么样子。流量基线(Traffic Baseline)是对网络正常运行状态下各项指标的统计描述。
基线关键指标
建立流量基线通常需要关注以下维度:
- 流量总量:每小时/每天的入站和出站总流量(bytes/packets)
- 协议分布:TCP、UDP、ICMP 等协议的流量占比
- 端口分布:活跃端口的使用频率和流量分布
- 连接模式:新建连接数、活跃连接数、连接持续时间
- DNS 查询:查询频率、查询类型分布、唯一域名数量
- 时间模式:工作日与周末、工作时间与非工作时间的流量差异
基线建立脚本
import json
from collections import defaultdict
from datetime import datetime
def parse_conn_log(filepath):
"""解析 Zeek conn.log 文件建立流量基线"""
baseline = {
'total_connections': 0,
'protocols': defaultdict(int),
'dest_ports': defaultdict(int),
'hourly_connections': defaultdict(int),
'bytes_sent': [],
'bytes_received': [],
'duration': [],
'unique_dest_ips': set(),
'unique_src_ips': set(),
}
with open(filepath, 'r') as f:
for line in f:
if line.startswith('#'):
continue
fields = line.strip().split('\t')
if len(fields) < 20:
continue
try:
ts = float(fields[0])
src_ip = fields[2]
src_port = fields[3]
dst_ip = fields[4]
dst_port = fields[5]
proto = fields[6]
duration = fields[8]
orig_bytes = fields[9]
resp_bytes = fields[10]
conn_state = fields[11]
# 统计连接数
baseline['total_connections'] += 1
# 协议分布
baseline['protocols'][proto] += 1
# 目的端口分布
if dst_port != '-':
baseline['dest_ports'][dst_port] += 1
# 按小时统计
hour = datetime.utcfromtimestamp(ts).hour
baseline['hourly_connections'][hour] += 1
# 字节数统计
if orig_bytes != '-' and orig_bytes != '0':
baseline['bytes_sent'].append(int(orig_bytes))
if resp_bytes != '-' and resp_bytes != '0':
baseline['bytes_received'].append(int(resp_bytes))
# 连接持续时间
if duration != '-':
baseline['duration'].append(float(duration))
# 唯一 IP 统计
baseline['unique_dest_ips'].add(dst_ip)
baseline['unique_src_ips'].add(src_ip)
except (ValueError, IndexError):
continue
# 计算统计摘要
summary = {
'total_connections': baseline['total_connections'],
'unique_src_ips': len(baseline['unique_src_ips']),
'unique_dest_ips': len(baseline['unique_dest_ips']),
'protocol_distribution': dict(baseline['protocols']),
'top_dest_ports': dict(sorted(baseline['dest_ports'].items(),
key=lambda x: x[1], reverse=True)[:20]),
'hourly_pattern': dict(baseline['hourly_connections']),
'avg_bytes_sent': sum(baseline['bytes_sent']) / len(baseline['bytes_sent']) if baseline['bytes_sent'] else 0,
'avg_bytes_received': sum(baseline['bytes_received']) / len(baseline['bytes_received']) if baseline['bytes_received'] else 0,
'avg_duration': sum(baseline['duration']) / len(baseline['duration']) if baseline['duration'] else 0,
}
return summary
# 使用示例
baseline = parse_conn_log('/opt/zeek/logs/current/conn.log')
print(json.dumps(baseline, indent=2, default=str))
异常检测技术
统计分析方法
基于统计的异常检测通过计算观测值与基线的偏离程度来识别异常:
import numpy as np
from collections import defaultdict
class StatisticalAnomalyDetector:
"""基于统计方法的流量异常检测器"""
def __init__(self, threshold_sigma=3):
self.threshold = threshold_sigma
self.baselines = {}
def train(self, metric_name, values):
"""建立指标基线"""
values = np.array(values, dtype=float)
self.baselines[metric_name] = {
'mean': np.mean(values),
'std': np.std(values),
'median': np.median(values),
'q1': np.percentile(values, 25),
'q3': np.percentile(values, 75),
}
iqr = self.baselines[metric_name]['q3'] - self.baselines[metric_name]['q1']
self.baselines[metric_name]['iqr'] = iqr
self.baselines[metric_name]['lower_fence'] = self.baselines[metric_name]['q1'] - 1.5 * iqr
self.baselines[metric_name]['upper_fence'] = self.baselines[metric_name]['q3'] + 1.5 * iqr
print(f"[*] 基线建立完成: {metric_name}")
print(f" 均值={self.baselines[metric_name]['mean']:.2f}, "
f"标准差={self.baselines[metric_name]['std']:.2f}")
def detect(self, metric_name, value):
"""检测单个值是否异常"""
if metric_name not in self.baselines:
return False, "未建立基线"
b = self.baselines[metric_name]
z_score = abs(value - b['mean']) / b['std'] if b['std'] > 0 else 0
if z_score > self.threshold:
return True, f"Z-Score={z_score:.2f} 超过阈值 {self.threshold}"
if value < b['lower_fence'] or value > b['upper_fence']:
return True, f"值={value} 超出 IQR 围栏 [{b['lower_fence']:.2f}, {b['upper_fence']:.2f}]"
return False, f"正常 (Z-Score={z_score:.2f})"
# 使用示例
detector = StatisticalAnomalyDetector(threshold_sigma=3)
# 训练基线(以每小时连接数为例)
hourly_connections = [1200, 1350, 1180, 1400, 1250, 1300, 1220, 1380,
1290, 1340, 1260, 1310, 1270, 1350, 1230, 1400,
1190, 1320, 1280, 1360, 1240, 1300, 1210, 1370]
detector.train("hourly_connections", hourly_connections)
# 检测异常值
test_values = [1300, 5000, 1250, 50, 1340]
for val in test_values:
is_anomaly, reason = detector.detect("hourly_connections", val)
status = "异常" if is_anomaly else "正常"
print(f" 值={val}: [{status}] {reason}")
机器学习方法
对于更复杂的异常模式,可以使用无监督机器学习算法:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
def ml_anomaly_detection(features, contamination=0.05):
"""
使用 Isolation Forest 进行流量异常检测
features: 特征矩阵,每行是一个时间窗口的特征向量
contamination: 预期异常比例
"""
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
model = IsolationForest(
n_estimators=200,
contamination=contamination,
random_state=42,
n_jobs=-1
)
predictions = model.fit_predict(features_scaled)
scores = model.decision_function(features_scaled)
anomalies = np.where(predictions == -1)[0]
print(f"[*] 检测到 {len(anomalies)} 个异常时间窗口(共 {len(features)} 个)")
for idx in anomalies:
print(f" [!] 窗口 {idx}: 异常分数={scores[idx]:.4f}, "
f"特征={features[idx]}")
return predictions, scores
# 构造特征矩阵示例
# 特征: [连接数, 平均字节数, 唯一目的IP数, DNS查询数, 异常端口连接数]
normal_data = np.random.normal(loc=[1300, 5000, 150, 800, 10],
scale=[100, 500, 20, 80, 5],
size=(200, 5))
# 注入异常数据
anomaly_data = np.array([
[8000, 50000, 500, 3000, 100], # DDoS 特征
[1300, 5000, 150, 5000, 10], # DNS 隧道特征
[50, 100000, 5, 50, 200], # 数据泄露特征
])
all_data = np.vstack([normal_data, anomaly_data])
predictions, scores = ml_anomaly_detection(all_data, contamination=0.02)
Zeek 网络安全监控
Zeek(原名 Bro)是一个强大的开源网络分析框架,能够深度解析网络流量并生成结构化日志。与传统 IDS 不同,Zeek 侧重于网络活动的全面记录和自定义分析。
安装和配置
# Ubuntu/Debian 安装 Zeek
sudo apt update
sudo apt install -y cmake make gcc g++ flex bison libpcap-dev libssl-dev \
python3 python3-dev swig zlib1g-dev
# 使用官方仓库安装(推荐)
echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_22.04/ /' | \
sudo tee /etc/apt/sources.list.d/security:zeek.list
curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_22.04/Release.key | \
gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null
sudo apt update
sudo apt install -y zeek
# 配置 Zeek 监控接口
sudo vim /opt/zeek/etc/node.cfg
Zeek 节点配置示例:
# /opt/zeek/etc/node.cfg
[zeek]
type=standalone
host=localhost
interface=eth0
# 如果是集群模式:
# [manager]
# type=manager
# host=localhost
#
# [proxy-1]
# type=proxy
# host=localhost
#
# [worker-1]
# type=worker
# host=localhost
# interface=eth0
#
# [worker-2]
# type=worker
# host=localhost
# interface=eth1
# 配置 Zeek 网络范围
# /opt/zeek/etc/networks.cfg
# 定义内部网络范围
10.0.0.0/8 Private IP space
172.16.0.0/12 Private IP space
192.168.0.0/16 Private IP space
# 部署并启动 Zeek
sudo /opt/zeek/bin/zeekctl deploy
# 查看运行状态
sudo /opt/zeek/bin/zeekctl status
# Zeek 生成的日志位于 /opt/zeek/logs/current/
ls /opt/zeek/logs/current/
# conn.log dns.log http.log ssl.log files.log notice.log ...
conn.log 分析实战
conn.log 是 Zeek 最核心的日志,记录了所有网络连接的元数据:
# 查看连接日志结构
head -20 /opt/zeek/logs/current/conn.log
# 统计最活跃的外部目的IP(可能的C2通信)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.resp_h | \
sort | uniq -c | sort -rn | head -20
# 查找长时间连接(可能的持久化C2信道)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.orig_h id.resp_h id.resp_p duration | \
awk '$4 > 3600' | sort -t$'\t' -k4 -rn | head -20
# 查找大量数据外传(数据泄露检测)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.orig_h id.resp_h orig_bytes | \
awk '$3 > 100000000' | sort -t$'\t' -k3 -rn
# 统计DNS查询中的长域名(可能的DNS隧道)
cat /opt/zeek/logs/current/dns.log | zeek-cut query | \
awk '{ if (length($0) > 50) print length($0), $0 }' | sort -rn | head -20
# 检测非标准端口的HTTP流量
cat /opt/zeek/logs/current/http.log | zeek-cut id.resp_p host uri | \
awk '$1 != 80 && $1 != 443 && $1 != 8080'
自定义 Zeek 脚本
Zeek 内置了强大的脚本语言,可以编写自定义检测逻辑:
# /opt/zeek/share/zeek/site/detect_anomalies.zeek
@load base/frameworks/notice
module AnomalyDetection;
export {
redef enum Notice::Type += {
## 检测到DNS隧道可疑行为
DNS_Tunnel_Detected,
## 检测到大量数据外传
Large_Data_Exfiltration,
## 检测到异常端口扫描
Port_Scan_Detected,
## 检测到长时间持续连接
Long_Connection_Detected,
};
}
# DNS 隧道检测 — 域名长度异常
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count)
{
if ( |query| > 60 )
{
NOTICE([
$note=DNS_Tunnel_Detected,
$conn=c,
$msg=fmt("可疑DNS查询 — 域名长度 %d: %s", |query|, query),
$sub=query,
$identifier=cat(c$id$orig_h, query)
]);
}
}
# 大数据传输检测
event connection_state_remove(c: connection)
{
if ( c$conn?$orig_bytes && c$conn$orig_bytes > 104857600 ) # 100MB
{
NOTICE([
$note=Large_Data_Exfiltration,
$conn=c,
$msg=fmt("大量数据外传: %s -> %s, 发送 %d 字节",
c$id$orig_h, c$id$resp_h, c$conn$orig_bytes),
$identifier=cat(c$id$orig_h, c$id$resp_h)
]);
}
}
# 长连接检测(超过4小时)
event connection_state_remove(c: connection)
{
if ( c$conn?$duration && c$conn$duration > 4hr )
{
NOTICE([
$note=Long_Connection_Detected,
$conn=c,
$msg=fmt("长时间连接: %s -> %s:%s, 持续 %s",
c$id$orig_h, c$id$resp_h, c$id$resp_p, c$conn$duration),
$identifier=cat(c$id$orig_h, c$id$resp_h)
]);
}
}
启用自定义脚本:
# 在 /opt/zeek/share/zeek/site/local.zeek 中添加
echo '@load detect_anomalies.zeek' >> /opt/zeek/share/zeek/site/local.zeek
# 重新部署 Zeek
sudo /opt/zeek/bin/zeekctl deploy
Grafana 可视化仪表盘
将 Zeek 日志导入 Elasticsearch 后,可以使用 Grafana 构建直观的态势感知仪表盘:
# docker-compose-grafana.yml
version: '3.8'
services:
grafana:
image: grafana/grafana:10.2.0
container_name: grafana
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=SecureGrafana123!
- GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
ports:
- "3000:3000"
networks:
- monitoring
volumes:
grafana_data:
networks:
monitoring:
driver: bridge
Grafana 数据源配置:
# grafana/datasources/elasticsearch.yml
apiVersion: 1
datasources:
- name: Elasticsearch-Zeek
type: elasticsearch
access: proxy
url: http://elasticsearch:9200
database: "zeek-conn-*"
basicAuth: true
basicAuthUser: elastic
basicAuthPassword: YourSecurePassword123!
jsonData:
timeField: "@timestamp"
esVersion: "8.0.0"
logMessageField: message
logLevelField: severity
建议在 Grafana 中创建以下仪表盘面板:
- 网络流量总览:实时带宽使用曲线(入站/出站)
- 协议分布饼图:展示 TCP、UDP、ICMP 等协议占比
- Top 20 目的 IP/端口:识别最活跃的通信目标
- 地理分布地图:GeoIP 标注外部连接的全球分布
- 异常告警时间线:按时间轴展示各类安全告警
- DNS 查询趋势:DNS 请求量变化和异常域名告警
- 连接状态统计:正常/异常/被拒连接的比例变化
安全建议
- 全面覆盖:确保关键网络节点都部署了流量采集探针,特别是互联网出口、DMZ 区域和核心交换节点。
- 基线定期更新:网络环境是动态变化的,流量基线需要定期重新计算以适应业务变化。
- 多层检测互补:结合签名检测(Suricata)、行为分析(Zeek)和统计异常检测,形成多层防御。
- 告警精细化:根据资产重要性设置差异化的告警策略,减少误报对运营团队的干扰。
- 数据保留策略:原始流量数据存储成本高昂,合理规划存储架构,核心元数据长期保留,原始 PCAP 按需回溯。
- 与威胁情报联动:将外部威胁情报(IOC)与流量数据关联,实现已知威胁的快速发现。
总结
网络流量异常检测和态势感知是现代安全运营中心(SOC)的核心能力。通过本文的学习,我们从态势感知的概念和体系架构出发,理解了流量基线建立的方法论,掌握了统计分析和机器学习两类异常检测技术。在实战层面,Zeek 作为强大的网络安全监控框架,为我们提供了丰富的流量解析和自定义分析能力;Grafana 则将抽象的数据转化为直观的可视化态势图。将这些组件有机整合,就能构建一套从数据采集、分析检测到可视化展示的完整态势感知平台。建议读者从单节点部署开始实践,逐步扩展到分布式架构,不断完善检测规则和可视化面板,打造适合自身环境的安全监控体系。