Skip to content

网络流量异常检测与态势感知平台构建

Published:
16 min read

在日趋复杂的网络安全环境下,仅依靠传统的边界防御和签名检测已远远不够。网络态势感知(Cyber Situational Awareness)代表了安全防御体系从被动响应向主动感知的演进,它通过持续监控网络流量、建立行为基线、检测异常偏移,帮助安全团队在攻击造成实质损害之前发现威胁。本文将深入讲解网络态势感知的概念与架构,介绍流量异常检测的核心技术,并通过 Zeek 网络安全监控框架和 Grafana 可视化的实战操作,带你构建一套基础的态势感知平台。

网络态势感知概念与体系架构

什么是网络态势感知

网络态势感知是指对网络环境中各类安全要素的感知、理解和预测能力。它包含三个层次:

体系架构

一套完整的态势感知平台通常包含以下层次:

┌──────────────────────────────────────────────────────────┐
│                    展示层 (Grafana/Kibana)                │
│              仪表盘 · 态势大屏 · 告警中心 · 报告           │
├──────────────────────────────────────────────────────────┤
│                    分析层 (分析引擎)                       │
│         关联分析 · 异常检测 · 威胁建模 · 机器学习          │
├──────────────────────────────────────────────────────────┤
│                    存储层 (Elasticsearch/ClickHouse)      │
│              流量日志 · 安全事件 · 威胁情报 · 资产信息      │
├──────────────────────────────────────────────────────────┤
│                    采集层 (Zeek/Suricata/Filebeat)        │
│           网络流量 · 系统日志 · 安全设备日志 · 应用日志     │
├──────────────────────────────────────────────────────────┤
│                    数据源 (网络环境)                       │
│           交换机镜像 · TAP设备 · Agent · 网络探针          │
└──────────────────────────────────────────────────────────┘

流量基线建立方法

异常检测的前提是知道”正常”是什么样子。流量基线(Traffic Baseline)是对网络正常运行状态下各项指标的统计描述。

基线关键指标

建立流量基线通常需要关注以下维度:

基线建立脚本

import json
from collections import defaultdict
from datetime import datetime

def parse_conn_log(filepath):
    """解析 Zeek conn.log 文件建立流量基线"""
    baseline = {
        'total_connections': 0,
        'protocols': defaultdict(int),
        'dest_ports': defaultdict(int),
        'hourly_connections': defaultdict(int),
        'bytes_sent': [],
        'bytes_received': [],
        'duration': [],
        'unique_dest_ips': set(),
        'unique_src_ips': set(),
    }

    with open(filepath, 'r') as f:
        for line in f:
            if line.startswith('#'):
                continue
            fields = line.strip().split('\t')
            if len(fields) < 20:
                continue

            try:
                ts = float(fields[0])
                src_ip = fields[2]
                src_port = fields[3]
                dst_ip = fields[4]
                dst_port = fields[5]
                proto = fields[6]
                duration = fields[8]
                orig_bytes = fields[9]
                resp_bytes = fields[10]
                conn_state = fields[11]

                # 统计连接数
                baseline['total_connections'] += 1

                # 协议分布
                baseline['protocols'][proto] += 1

                # 目的端口分布
                if dst_port != '-':
                    baseline['dest_ports'][dst_port] += 1

                # 按小时统计
                hour = datetime.utcfromtimestamp(ts).hour
                baseline['hourly_connections'][hour] += 1

                # 字节数统计
                if orig_bytes != '-' and orig_bytes != '0':
                    baseline['bytes_sent'].append(int(orig_bytes))
                if resp_bytes != '-' and resp_bytes != '0':
                    baseline['bytes_received'].append(int(resp_bytes))

                # 连接持续时间
                if duration != '-':
                    baseline['duration'].append(float(duration))

                # 唯一 IP 统计
                baseline['unique_dest_ips'].add(dst_ip)
                baseline['unique_src_ips'].add(src_ip)

            except (ValueError, IndexError):
                continue

    # 计算统计摘要
    summary = {
        'total_connections': baseline['total_connections'],
        'unique_src_ips': len(baseline['unique_src_ips']),
        'unique_dest_ips': len(baseline['unique_dest_ips']),
        'protocol_distribution': dict(baseline['protocols']),
        'top_dest_ports': dict(sorted(baseline['dest_ports'].items(),
                                       key=lambda x: x[1], reverse=True)[:20]),
        'hourly_pattern': dict(baseline['hourly_connections']),
        'avg_bytes_sent': sum(baseline['bytes_sent']) / len(baseline['bytes_sent']) if baseline['bytes_sent'] else 0,
        'avg_bytes_received': sum(baseline['bytes_received']) / len(baseline['bytes_received']) if baseline['bytes_received'] else 0,
        'avg_duration': sum(baseline['duration']) / len(baseline['duration']) if baseline['duration'] else 0,
    }

    return summary

# 使用示例
baseline = parse_conn_log('/opt/zeek/logs/current/conn.log')
print(json.dumps(baseline, indent=2, default=str))

异常检测技术

统计分析方法

基于统计的异常检测通过计算观测值与基线的偏离程度来识别异常:

import numpy as np
from collections import defaultdict

class StatisticalAnomalyDetector:
    """基于统计方法的流量异常检测器"""

    def __init__(self, threshold_sigma=3):
        self.threshold = threshold_sigma
        self.baselines = {}

    def train(self, metric_name, values):
        """建立指标基线"""
        values = np.array(values, dtype=float)
        self.baselines[metric_name] = {
            'mean': np.mean(values),
            'std': np.std(values),
            'median': np.median(values),
            'q1': np.percentile(values, 25),
            'q3': np.percentile(values, 75),
        }
        iqr = self.baselines[metric_name]['q3'] - self.baselines[metric_name]['q1']
        self.baselines[metric_name]['iqr'] = iqr
        self.baselines[metric_name]['lower_fence'] = self.baselines[metric_name]['q1'] - 1.5 * iqr
        self.baselines[metric_name]['upper_fence'] = self.baselines[metric_name]['q3'] + 1.5 * iqr
        print(f"[*] 基线建立完成: {metric_name}")
        print(f"    均值={self.baselines[metric_name]['mean']:.2f}, "
              f"标准差={self.baselines[metric_name]['std']:.2f}")

    def detect(self, metric_name, value):
        """检测单个值是否异常"""
        if metric_name not in self.baselines:
            return False, "未建立基线"

        b = self.baselines[metric_name]
        z_score = abs(value - b['mean']) / b['std'] if b['std'] > 0 else 0

        if z_score > self.threshold:
            return True, f"Z-Score={z_score:.2f} 超过阈值 {self.threshold}"

        if value < b['lower_fence'] or value > b['upper_fence']:
            return True, f"值={value} 超出 IQR 围栏 [{b['lower_fence']:.2f}, {b['upper_fence']:.2f}]"

        return False, f"正常 (Z-Score={z_score:.2f})"

# 使用示例
detector = StatisticalAnomalyDetector(threshold_sigma=3)

# 训练基线(以每小时连接数为例)
hourly_connections = [1200, 1350, 1180, 1400, 1250, 1300, 1220, 1380,
                      1290, 1340, 1260, 1310, 1270, 1350, 1230, 1400,
                      1190, 1320, 1280, 1360, 1240, 1300, 1210, 1370]
detector.train("hourly_connections", hourly_connections)

# 检测异常值
test_values = [1300, 5000, 1250, 50, 1340]
for val in test_values:
    is_anomaly, reason = detector.detect("hourly_connections", val)
    status = "异常" if is_anomaly else "正常"
    print(f"  值={val}: [{status}] {reason}")

机器学习方法

对于更复杂的异常模式,可以使用无监督机器学习算法:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np

def ml_anomaly_detection(features, contamination=0.05):
    """
    使用 Isolation Forest 进行流量异常检测
    features: 特征矩阵,每行是一个时间窗口的特征向量
    contamination: 预期异常比例
    """
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)

    model = IsolationForest(
        n_estimators=200,
        contamination=contamination,
        random_state=42,
        n_jobs=-1
    )
    predictions = model.fit_predict(features_scaled)
    scores = model.decision_function(features_scaled)

    anomalies = np.where(predictions == -1)[0]
    print(f"[*] 检测到 {len(anomalies)} 个异常时间窗口(共 {len(features)} 个)")

    for idx in anomalies:
        print(f"  [!] 窗口 {idx}: 异常分数={scores[idx]:.4f}, "
              f"特征={features[idx]}")

    return predictions, scores

# 构造特征矩阵示例
# 特征: [连接数, 平均字节数, 唯一目的IP数, DNS查询数, 异常端口连接数]
normal_data = np.random.normal(loc=[1300, 5000, 150, 800, 10],
                                scale=[100, 500, 20, 80, 5],
                                size=(200, 5))
# 注入异常数据
anomaly_data = np.array([
    [8000, 50000, 500, 3000, 100],  # DDoS 特征
    [1300, 5000, 150, 5000, 10],    # DNS 隧道特征
    [50, 100000, 5, 50, 200],       # 数据泄露特征
])

all_data = np.vstack([normal_data, anomaly_data])
predictions, scores = ml_anomaly_detection(all_data, contamination=0.02)

Zeek 网络安全监控

Zeek(原名 Bro)是一个强大的开源网络分析框架,能够深度解析网络流量并生成结构化日志。与传统 IDS 不同,Zeek 侧重于网络活动的全面记录和自定义分析。

安装和配置

# Ubuntu/Debian 安装 Zeek
sudo apt update
sudo apt install -y cmake make gcc g++ flex bison libpcap-dev libssl-dev \
    python3 python3-dev swig zlib1g-dev

# 使用官方仓库安装(推荐)
echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_22.04/ /' | \
    sudo tee /etc/apt/sources.list.d/security:zeek.list
curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_22.04/Release.key | \
    gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null
sudo apt update
sudo apt install -y zeek

# 配置 Zeek 监控接口
sudo vim /opt/zeek/etc/node.cfg

Zeek 节点配置示例:

# /opt/zeek/etc/node.cfg
[zeek]
type=standalone
host=localhost
interface=eth0

# 如果是集群模式:
# [manager]
# type=manager
# host=localhost
#
# [proxy-1]
# type=proxy
# host=localhost
#
# [worker-1]
# type=worker
# host=localhost
# interface=eth0
#
# [worker-2]
# type=worker
# host=localhost
# interface=eth1
# 配置 Zeek 网络范围
# /opt/zeek/etc/networks.cfg
# 定义内部网络范围
10.0.0.0/8         Private IP space
172.16.0.0/12      Private IP space
192.168.0.0/16     Private IP space

# 部署并启动 Zeek
sudo /opt/zeek/bin/zeekctl deploy

# 查看运行状态
sudo /opt/zeek/bin/zeekctl status

# Zeek 生成的日志位于 /opt/zeek/logs/current/
ls /opt/zeek/logs/current/
# conn.log   dns.log   http.log   ssl.log   files.log   notice.log ...

conn.log 分析实战

conn.log 是 Zeek 最核心的日志,记录了所有网络连接的元数据:

# 查看连接日志结构
head -20 /opt/zeek/logs/current/conn.log

# 统计最活跃的外部目的IP(可能的C2通信)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.resp_h | \
    sort | uniq -c | sort -rn | head -20

# 查找长时间连接(可能的持久化C2信道)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.orig_h id.resp_h id.resp_p duration | \
    awk '$4 > 3600' | sort -t$'\t' -k4 -rn | head -20

# 查找大量数据外传(数据泄露检测)
cat /opt/zeek/logs/current/conn.log | zeek-cut id.orig_h id.resp_h orig_bytes | \
    awk '$3 > 100000000' | sort -t$'\t' -k3 -rn

# 统计DNS查询中的长域名(可能的DNS隧道)
cat /opt/zeek/logs/current/dns.log | zeek-cut query | \
    awk '{ if (length($0) > 50) print length($0), $0 }' | sort -rn | head -20

# 检测非标准端口的HTTP流量
cat /opt/zeek/logs/current/http.log | zeek-cut id.resp_p host uri | \
    awk '$1 != 80 && $1 != 443 && $1 != 8080'

自定义 Zeek 脚本

Zeek 内置了强大的脚本语言,可以编写自定义检测逻辑:

# /opt/zeek/share/zeek/site/detect_anomalies.zeek

@load base/frameworks/notice

module AnomalyDetection;

export {
    redef enum Notice::Type += {
        ## 检测到DNS隧道可疑行为
        DNS_Tunnel_Detected,
        ## 检测到大量数据外传
        Large_Data_Exfiltration,
        ## 检测到异常端口扫描
        Port_Scan_Detected,
        ## 检测到长时间持续连接
        Long_Connection_Detected,
    };
}

# DNS 隧道检测 — 域名长度异常
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count)
{
    if ( |query| > 60 )
    {
        NOTICE([
            $note=DNS_Tunnel_Detected,
            $conn=c,
            $msg=fmt("可疑DNS查询 — 域名长度 %d: %s", |query|, query),
            $sub=query,
            $identifier=cat(c$id$orig_h, query)
        ]);
    }
}

# 大数据传输检测
event connection_state_remove(c: connection)
{
    if ( c$conn?$orig_bytes && c$conn$orig_bytes > 104857600 )  # 100MB
    {
        NOTICE([
            $note=Large_Data_Exfiltration,
            $conn=c,
            $msg=fmt("大量数据外传: %s -> %s, 发送 %d 字节",
                     c$id$orig_h, c$id$resp_h, c$conn$orig_bytes),
            $identifier=cat(c$id$orig_h, c$id$resp_h)
        ]);
    }
}

# 长连接检测(超过4小时)
event connection_state_remove(c: connection)
{
    if ( c$conn?$duration && c$conn$duration > 4hr )
    {
        NOTICE([
            $note=Long_Connection_Detected,
            $conn=c,
            $msg=fmt("长时间连接: %s -> %s:%s, 持续 %s",
                     c$id$orig_h, c$id$resp_h, c$id$resp_p, c$conn$duration),
            $identifier=cat(c$id$orig_h, c$id$resp_h)
        ]);
    }
}

启用自定义脚本:

# 在 /opt/zeek/share/zeek/site/local.zeek 中添加
echo '@load detect_anomalies.zeek' >> /opt/zeek/share/zeek/site/local.zeek

# 重新部署 Zeek
sudo /opt/zeek/bin/zeekctl deploy

Grafana 可视化仪表盘

将 Zeek 日志导入 Elasticsearch 后,可以使用 Grafana 构建直观的态势感知仪表盘:

# docker-compose-grafana.yml
version: '3.8'

services:
  grafana:
    image: grafana/grafana:10.2.0
    container_name: grafana
    environment:
      - GF_SECURITY_ADMIN_USER=admin
      - GF_SECURITY_ADMIN_PASSWORD=SecureGrafana123!
      - GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
    ports:
      - "3000:3000"
    networks:
      - monitoring

volumes:
  grafana_data:

networks:
  monitoring:
    driver: bridge

Grafana 数据源配置:

# grafana/datasources/elasticsearch.yml
apiVersion: 1

datasources:
  - name: Elasticsearch-Zeek
    type: elasticsearch
    access: proxy
    url: http://elasticsearch:9200
    database: "zeek-conn-*"
    basicAuth: true
    basicAuthUser: elastic
    basicAuthPassword: YourSecurePassword123!
    jsonData:
      timeField: "@timestamp"
      esVersion: "8.0.0"
      logMessageField: message
      logLevelField: severity

建议在 Grafana 中创建以下仪表盘面板:

安全建议

  1. 全面覆盖:确保关键网络节点都部署了流量采集探针,特别是互联网出口、DMZ 区域和核心交换节点。
  2. 基线定期更新:网络环境是动态变化的,流量基线需要定期重新计算以适应业务变化。
  3. 多层检测互补:结合签名检测(Suricata)、行为分析(Zeek)和统计异常检测,形成多层防御。
  4. 告警精细化:根据资产重要性设置差异化的告警策略,减少误报对运营团队的干扰。
  5. 数据保留策略:原始流量数据存储成本高昂,合理规划存储架构,核心元数据长期保留,原始 PCAP 按需回溯。
  6. 与威胁情报联动:将外部威胁情报(IOC)与流量数据关联,实现已知威胁的快速发现。

总结

网络流量异常检测和态势感知是现代安全运营中心(SOC)的核心能力。通过本文的学习,我们从态势感知的概念和体系架构出发,理解了流量基线建立的方法论,掌握了统计分析和机器学习两类异常检测技术。在实战层面,Zeek 作为强大的网络安全监控框架,为我们提供了丰富的流量解析和自定义分析能力;Grafana 则将抽象的数据转化为直观的可视化态势图。将这些组件有机整合,就能构建一套从数据采集、分析检测到可视化展示的完整态势感知平台。建议读者从单节点部署开始实践,逐步扩展到分布式架构,不断完善检测规则和可视化面板,打造适合自身环境的安全监控体系。