基于Spring Boot和Flume构建日志收集和分析系统

随着企业系统规模的不断扩大,系统的日志越来越庞大,如果没有一个可靠的日志收集和分析系统,就很难有效地监控和维护系统。本文将介绍如何基于spring bootflume构建一个高效的日志收集和分析系统。

前置条件

在开始之前,需要安装和设置以下软件:

JDK 8 或以上版本Maven 3.3 或以上版本Apache Flume 1.9.0 或以上版本Elasticsearch 7.6.2 或以上版本Kibana 7.6.2 或以上版本Spring Boot应用配置

首先,我们需要创建一个Spring Boot应用,并添加所需的依赖:

org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-log4j2

登录后复制

在application.properties文件中,添加以下配置:

# 应用端口号server.port=8080# log4j2配置logging.config=classpath:log4j2.xml# flume配置flume.agentName=myflumeflume.sourceType=avroflume.clientType=load-balancingflume.hosts=localhost:41414# elasticsearch配置spring.elasticsearch.rest.uris=http://localhost:9200

登录后复制

以上配置中,我们指定了应用程序的端口号、log4j2配置文件、Flume的相关配置和Elasticsearch的访问URI。

日志收集器

为了将应用程序日志发送到Flume,我们需要创建一个自定义的log4j2 Appender。

@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)public class FlumeAppender extends AbstractAppender {    private static final ObjectMapper MAPPER = new ObjectMapper();    private final FlumeClient client;    private final String sourceType;    protected FlumeAppender(String name, Filter filter, Layout extends Serializable> layout,                            FlumeClient client, String sourceType) {        super(name, filter, layout, true);        this.client = client;        this.sourceType = sourceType;    }    @PluginFactory    public static FlumeAppender createAppender(@PluginAttr("name") String name,                                               @PluginElement("Filters") Filter filter,                                               @PluginElement("Layout") Layout extends Serializable> layout,                                               @PluginAttr("sourceType") String sourceType,                                               @PluginAttr("hosts") String hosts) {        if (name == null) {            LOGGER.error("FlumeAppender missing name");            return null;        }        if (client == null) {            LOGGER.error("FlumeAppender missing client");            return null;        }        return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);    }    private static FlumeClient createClient(String hosts) {        LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();        String[] hostArray = hosts.split(",");        for (String host : hostArray) {            String[] hostParts = host.split(":");            rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));        }        Properties props = new Properties();        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);        props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");        AvroEventSerializer serializer = new AvroEventSerializer();        serializer.configure(props, false);        return new FlumeClient(rpcClient, serializer);    }    @Override    public void append(LogEvent event) {        try {            byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);            Map headers = new HashMap();            headers.put("timestamp", Long.toString(event.getTimeMillis()));            headers.put("source", "log4j");            headers.put("sourceType", sourceType);            Event flumeEvent = EventBuilder.withBody(body, headers);            client.sendEvent(flumeEvent);        } catch (Exception e) {            LOGGER.error("Failed to send event to Flume", e);        }    }}

登录后复制

以上代码中,我们实现了一个log4j2 Appender,它会将日志事件打包成一个Flume Event,并发送到Flume服务器。

创建一个log4j2配置文件,配置FlumeAppender。


登录后复制

在这个log4j2配置文件中,我们定义了一个FlumeAppender,并在Root Logger中引用它。

Flume配置

我们需要配置Flume,在Flume Agent中接收从应用程序发送的日志消息,并将它们发送到Elasticsearch。

创建一个Flume配置文件,如下所示。

# Define the agent name and the agent sources and sinksmyflume.sources = mysourcemyflume.sinks = mysinkmyflume.channels = channel1# Define the sourcemyflume.sources.mysource.type = avromyflume.sources.mysource.bind = 0.0.0.0myflume.sources.mysource.port = 41414# Define the channelmyflume.channels.channel1.type = memorymyflume.channels.channel1.capacity = 10000myflume.channels.channel1.transactionCapacity = 1000# Define the sinkmyflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSinkmyflume.sinks.mysink.hostNames = localhost:9200myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}myflume.sinks.mysink.batchSize = 1000myflume.sinks.mysink.typeName = ${type}# Link the source and sink with the channelmyflume.sources.mysource.channels = channel1myflume.sinks.mysink.channel = channel1

登录后复制

在Flume配置文件中,我们定义了一个agent,一个source和一个sink。source是一个avro类型,绑定到41414端口上,channel1是一个memory类型,capacity为10000,transactionCapacity为1000。sink是一个ElasticsearchSink类型,在本地主机的9200端口上创建一个名为type的索引,在1000个事件达到时批量提交到Elasticsearch。

Elasticsearch和Kibana配置

最后,我们需要配置Elasticsearch和Kibana。在Elasticsearch中,我们需要创建一个与Flume配置文件中定义的索引名称匹配的索引。

在Kibana中,我们需要创建一个索引模式。在Kibana的主菜单中,选择”Management”,然后选择”Kibana”。在Kibana索引模式中,选择”Create Index Pattern”。输入Flume配置文件中定义的索引名称,并按照提示进行配置。

我们还需要为Kibana创建一个Dashboard,以便查看应用程序的日志消息。在Kibana的主菜单中,选择”Dashboard”,然后选择”Create Dashboard”。在”Visualizations”选项卡中,选择”Add a visualization”。选择”Data Table”,然后配置所需的字段和可视化选项。

结论

在本文中,我们介绍了如何使用Spring Boot和Flume构建一个高效的日志收集和分析系统。我们实现了一个自定义的log4j2 Appender,将应用程序的日志事件发送到Flume服务器,并使用Elasticsearch和Kibana进行日志分析和可视化。希望本文能够对你构建自己的日志收集和分析系统有所帮助。

以上就是基于Spring Boot和Flume构建日志收集和分析系统的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2625084.html

(0)
上一篇 2025年3月7日 00:32:58
下一篇 2025年3月7日 00:33:07

AD推荐 黄金广告位招租... 更多推荐

相关推荐

发表回复

登录后才能评论