ELK+Kafka搭建分布式日志收集系统

目录

一、传统日志收集的弊端

二、ELK收集系统过程

三、搭建ELK系统

四、代码

五、验证效果


一、传统日志收集的弊端

        我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。

二、ELK收集系统过程

        基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。

        

 在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。

三、搭建ELK系统

Zookeeper搭建:搭建ZooKeeper3.7.0集群(传统方式&Docker方式)_熟透的蜗牛的博客-CSDN博客

Kafka搭建:Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka_熟透的蜗牛的博客-CSDN博客

ElasticSearch搭建:

Elasticsearch的安装(传统方式&docker方式)&整合Springboot_熟透的蜗牛的博客-CSDN博客

Kibana搭建:

Kibana的安装&整合ElasticSearch_熟透的蜗牛的博客-CSDN博客

Logstash搭建

LogStash的安装(传统方式&Docker)与使用_熟透的蜗牛的博客-CSDN博客

本文演示基于Docker-compose,所有的均为单机,如需集群请参考上述搭建方式

1、搭建docker-compose

#下载docker-compose文件
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

#授权
sudo chmod +x /usr/local/bin/docker-compose

2、创建目录

mkdir -p /usr/local/docker-compose/elk

3、在上面目录创建docker-compose.yml文件 

version: '2'
services:
  zookeeper:
    image: zookeeper:latest
    container_name: zookeper
    ports:
      - "2181:2181"                 
  kafka:
    image: wurstmeister/kafka:latest 
    container_name: kafka
    volumes: 
        - /etc/localtime:/etc/localtime 
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160   
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000                
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
    restart: always
    container_name: elasticsearch
    environment:
     - discovery.type=single-node #单点启动,实际生产不允许
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
    - 9200:9200
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.2
    restart: always
    container_name: kibana
    ports:
    - 5601:5601
    environment:
      - elasticsearch_url=http://192.168.139.160:9200
    depends_on:
      - elasticsearch
  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.2
    volumes:
        -  /data/logstash/pipeline/:/usr/share/logstash/pipeline/
        -  /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml 
        -  /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml 
    restart: always
    container_name: logstash
    ports:
    - 9600:9600
    depends_on:
      - elasticsearch

4、启动.

#进入docker-compose所在的目录执行
[root@localhost elk]# docker-compose  up

四、代码

切面类

package com.xiaojie.elk.aop;

import com.alibaba.fastjson.JSONObject;
import com.xiaojie.elk.pojo.RequestPojo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 日志切面类
 * @date 2021/12/5 16:51
 */
@Aspect
@Component
public class AopLogAspect {
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 申明一个切点 里面是 execution表达式
    @Pointcut("execution(* com.xiaojie.elk.service.*.*(..))")
    private void serviceAspect() {
    }

    @Autowired
    private LogContainer logContainer;

    // 请求method前打印内容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        RequestPojo requestPojo = new RequestPojo();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }

    // 在方法执行完结后打印返回内容
/*    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        respJSONObject.put("response", jsonObject);
        logContainer.put(respJSONObject.toJSONString());
    }*/

    /**
     * 异常通知
     *
     * @param point
     */
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint joinPoint, Exception e) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        RequestPojo requestPojo = new RequestPojo();
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        requestPojo.setError(e.toString());
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }

    public static String getIpAddr(HttpServletRequest request) {
        //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
        String ipAddress = request.getHeader("x-forwarded-for");
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getRemoteAddr();
            if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
                //根据网卡取本机配置的IP
                InetAddress inet = null;
                try {
                    inet = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                ipAddress = inet.getHostAddress();
            }
        }
        //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
        if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
            if (ipAddress.indexOf(",") > 0) {
                ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
            }
        }
        return ipAddress;
    }
}

异步线程

package com.xiaojie.elk.aop;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 开启异步线程发送日志
 * @date 2021/12/5 16:50
 */
@Component
public class LogContainer {
    private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public LogContainer() {
        // 初始化
        new LogThreadKafka().start();
    }
    /**
     * 存入日志
     *
     * @param log
     */
    public void put(String log) {
        logDeque.offer(log);
    }

    class LogThreadKafka extends Thread {
        @Override
        public void run() {
            while (true) {
                String log = logDeque.poll();
                if (!StringUtils.isEmpty(log)) {
                    // 将消息投递kafka中
                    kafkaTemplate.send("kafka-log", log);
                }
            }
        }
    }
}

五、验证效果

 

完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块


版权声明:本文为weixin_39555954原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>