Skip to content

日志分析与 SIEM 平台搭建 — ELK Stack 实战

Published:
12 min read

安全事件的发现与溯源离不开日志。日志记录了系统中发生的每一个关键事件,是安全分析师的”黑匣子”。然而,面对海量日志数据,人工逐条审查既不现实也不高效。SIEM(安全信息和事件管理)系统应运而生,它将日志收集、存储、分析和告警集于一体。本文将深入讲解日志分析基础、SIEM 核心概念,并通过 ELK Stack(Elasticsearch + Logstash + Kibana)的完整部署实战,带你从零搭建一套功能完备的日志分析平台。

日志的重要性与类型

日志是系统运行过程中自动产生的时间序列记录,在安全领域承担着多重角色:事件检测、入侵调查、合规审计和取证分析。没有完善的日志体系,安全团队就如同蒙着眼睛战斗。

常见日志类型

系统日志(System Logs):记录操作系统层面的事件,如服务启停、用户登录、系统错误。Linux 下典型的有 /var/log/syslog/var/log/messages;Windows 下则是事件查看器中的系统日志。

安全日志(Security Logs):专门记录与安全相关的事件,如认证成功/失败、权限变更、策略修改。Linux 下为 /var/log/auth.log/var/log/secure;Windows 安全日志记录登录事件(Event ID 4624/4625)、账户管理等关键操作。

应用日志(Application Logs):各类应用程序产生的日志,如 Web 服务器访问日志(Nginx/Apache access.log)、数据库查询日志、中间件运行日志。格式各异,是 Web 安全分析的重要数据源。

网络日志(Network Logs):防火墙日志、IDS/IPS 告警日志、DNS 查询日志、NetFlow 流量记录。这些日志对于检测网络层面的异常行为至关重要。

审计日志(Audit Logs):记录用户对关键资源的操作行为,满足合规要求(如 PCI DSS、等保2.0)。Linux 的 auditd 和 Windows 的高级审核策略都提供细粒度的审计能力。

SIEM 概念与核心功能

SIEM(Security Information and Event Management)是将安全信息管理(SIM)和安全事件管理(SEM)融合在一起的综合平台。其核心功能包括:

ELK Stack 组件详解

ELK Stack 是开源日志分析领域最流行的技术栈,由三个核心组件构成:

Elasticsearch

Elasticsearch 是基于 Apache Lucene 的分布式搜索和分析引擎。它是 ELK 的核心存储和检索层,具备以下特点:

Logstash

Logstash 是服务端数据处理管道,负责日志的采集、转换和输出。它的处理流程分为三个阶段:

Kibana

Kibana 是 ELK 的可视化前端,提供:

Docker Compose 部署 ELK

以下是一套生产就绪的 Docker Compose 配置,用于快速部署完整的 ELK Stack:

# docker-compose.yml
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
    container_name: elasticsearch
    environment:
      - node.name=es-node-1
      - cluster.name=security-elk
      - discovery.type=single-node
      - xpack.security.enabled=true
      - ELASTIC_PASSWORD=YourSecurePassword123!
      - xpack.security.http.ssl.enabled=false
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - elk-network
    healthcheck:
      test: ["CMD-SHELL", "curl -s -u elastic:YourSecurePassword123! http://localhost:9200/_cluster/health | grep -q '\"status\":\"green\"\\|\"status\":\"yellow\"'"]
      interval: 30s
      timeout: 10s
      retries: 5
    deploy:
      resources:
        limits:
          memory: 4G

  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    container_name: logstash
    depends_on:
      elasticsearch:
        condition: service_healthy
    volumes:
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/:ro
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    ports:
      - "5044:5044"     # Beats input
      - "5514:5514/udp" # Syslog input
      - "9600:9600"     # Monitoring API
    environment:
      - "LS_JAVA_OPTS=-Xms1g -Xmx1g"
    networks:
      - elk-network
    deploy:
      resources:
        limits:
          memory: 2G

  kibana:
    image: docker.elastic.co/kibana/kibana:8.12.0
    container_name: kibana
    depends_on:
      elasticsearch:
        condition: service_healthy
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=YourSecurePassword123!
      - xpack.security.enabled=true
    ports:
      - "5601:5601"
    networks:
      - elk-network
    deploy:
      resources:
        limits:
          memory: 1G

volumes:
  es_data:
    driver: local

networks:
  elk-network:
    driver: bridge

启动部署:

# 创建必要的目录结构
mkdir -p logstash/pipeline logstash/config

# 启动 ELK Stack
docker compose up -d

# 检查服务状态
docker compose ps

# 验证 Elasticsearch 运行状态
curl -u elastic:YourSecurePassword123! http://localhost:9200/_cluster/health?pretty

# 设置 kibana_system 用户密码
curl -u elastic:YourSecurePassword123! -X POST "http://localhost:9200/_security/user/kibana_system/_password" \
  -H "Content-Type: application/json" \
  -d '{"password":"YourSecurePassword123!"}'

Logstash 配置实战

解析 Syslog 日志

# logstash/pipeline/01-syslog.conf
input {
  udp {
    port => 5514
    type => "syslog"
  }
  tcp {
    port => 5514
    type => "syslog"
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => {
        "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:source_host} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}"
      }
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      target => "@timestamp"
    }
    # 检测SSH暴力破解
    if [program] == "sshd" and [log_message] =~ /Failed password/ {
      grok {
        match => {
          "log_message" => "Failed password for (?:invalid user )?%{USERNAME:auth_user} from %{IP:attacker_ip} port %{NUMBER:src_port}"
        }
      }
      mutate {
        add_tag => ["ssh_brute_force"]
        add_field => { "alert_type" => "SSH_BRUTEFORCE" }
      }
    }
  }
}

output {
  if [type] == "syslog" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      user => "elastic"
      password => "YourSecurePassword123!"
      index => "syslog-%{+YYYY.MM.dd}"
    }
  }
}

解析 Nginx 访问日志

# logstash/pipeline/02-nginx.conf
input {
  beats {
    port => 5044
  }
}

filter {
  if [fields][log_type] == "nginx_access" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user_name} \[%{HTTPDATE:access_time}\] "%{WORD:http_method} %{URIPATHPARAM:request_uri} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:body_bytes} "%{DATA:referrer}" "%{DATA:user_agent}"'
      }
    }
    date {
      match => [ "access_time", "dd/MMM/yyyy:HH:mm:ss Z" ]
      target => "@timestamp"
    }
    geoip {
      source => "client_ip"
      target => "geoip"
    }
    useragent {
      source => "user_agent"
      target => "ua"
    }
    # 检测可疑请求
    if [request_uri] =~ /(\.\.\/|union\s+select|<script|\/etc\/passwd|cmd\.exe)/i {
      mutate {
        add_tag => ["web_attack_attempt"]
        add_field => { "alert_type" => "WEB_ATTACK" }
      }
    }
    # 标记异常状态码
    if [response_code] =~ /^(403|404|500|502|503)$/ {
      mutate {
        add_tag => ["http_error"]
      }
    }
  }
}

output {
  if [fields][log_type] == "nginx_access" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      user => "elastic"
      password => "YourSecurePassword123!"
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  }
}

解析 Windows 安全日志

# logstash/pipeline/03-windows-security.conf
input {
  beats {
    port => 5044
  }
}

filter {
  if [fields][log_type] == "windows_security" {
    # 标记关键安全事件
    if [winlog][event_id] == 4625 {
      mutate {
        add_tag => ["login_failure"]
        add_field => { "alert_type" => "WINDOWS_LOGIN_FAILURE" }
      }
    }
    if [winlog][event_id] == 4624 and [winlog][event_data][LogonType] == "10" {
      mutate {
        add_tag => ["rdp_login"]
        add_field => { "alert_type" => "RDP_LOGIN" }
      }
    }
    if [winlog][event_id] == 4720 {
      mutate {
        add_tag => ["user_created"]
        add_field => { "alert_type" => "NEW_USER_CREATED" }
      }
    }
    if [winlog][event_id] == 4732 {
      mutate {
        add_tag => ["group_membership_changed"]
        add_field => { "alert_type" => "PRIVILEGE_CHANGE" }
      }
    }
  }
}

output {
  if [fields][log_type] == "windows_security" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      user => "elastic"
      password => "YourSecurePassword123!"
      index => "windows-security-%{+YYYY.MM.dd}"
    }
  }
}

Kibana 可视化与告警配置

部署完成后,访问 http://localhost:5601 进入 Kibana 界面,进行以下配置:

# 创建索引模式(通过 Kibana API)
curl -u elastic:YourSecurePassword123! -X POST "http://localhost:5601/api/data_views/data_view" \
  -H "kbn-xsrf: true" \
  -H "Content-Type: application/json" \
  -d '{
    "data_view": {
      "title": "syslog-*",
      "timeFieldName": "@timestamp"
    }
  }'

# 创建告警规则 — SSH暴力破解检测(5分钟内同一IP失败超过10次)
curl -u elastic:YourSecurePassword123! -X POST "http://localhost:5601/api/alerting/rule" \
  -H "kbn-xsrf: true" \
  -H "Content-Type: application/json" \
  -d '{
    "rule_type_id": ".es-query",
    "name": "SSH Brute Force Detection",
    "consumer": "alerts",
    "schedule": { "interval": "1m" },
    "params": {
      "index": ["syslog-*"],
      "timeField": "@timestamp",
      "esQuery": "{\"query\":{\"bool\":{\"must\":[{\"match\":{\"alert_type\":\"SSH_BRUTEFORCE\"}}],\"filter\":[{\"range\":{\"@timestamp\":{\"gte\":\"now-5m\"}}}]}},\"aggs\":{\"by_ip\":{\"terms\":{\"field\":\"attacker_ip.keyword\",\"min_doc_count\":10}}}}",
      "threshold": [10],
      "thresholdComparator": ">="
    },
    "actions": []
  }'

在 Kibana Dashboard 中,建议创建以下可视化面板:

安全建议

  1. 日志完整性保护:使用远程日志服务器或不可变存储确保日志不被攻击者篡改或删除。
  2. 日志保留策略:根据合规要求制定日志保留期限(通常不少于 180 天),使用 ILM(Index Lifecycle Management)自动管理索引生命周期。
  3. 敏感信息脱敏:在日志采集过程中对密码、身份证号等敏感信息进行脱敏处理。
  4. ELK 平台自身安全:启用 X-Pack 安全特性,配置 TLS/SSL 加密通信,实施最小权限的 RBAC 策略。
  5. 日志告警分级:合理设置告警阈值和优先级,避免告警疲劳(Alert Fatigue)。
  6. 定期检查日志覆盖范围:确保关键系统和应用都接入了集中日志平台,消除监控盲区。

总结

日志分析是安全运营的基石,而 SIEM 平台则是将海量日志转化为可操作安全情报的核心工具。通过本文的 ELK Stack 部署实战,我们从日志类型认知出发,深入了解了 SIEM 的核心功能,并完成了 Elasticsearch + Logstash + Kibana 的容器化部署。通过编写针对 Syslog、Nginx 和 Windows 安全日志的 Logstash 解析配置,实现了多源日志的标准化处理和安全事件标记。配合 Kibana 的可视化和告警能力,一套基础但功能完善的安全日志分析平台就此建成。建议读者在此基础上进一步探索 Elastic SIEM 模块和 Elastic Agent,打造更强大的安全监控体系。