Flume自定义组件

flume source种类

flume官方支持很多源,比如:netcat source,avro source,Kafka source,exec source,taildir source或者自定义source等等。

source文档arrow-up-right

flume官方支持多种目标源,比如:hdfs sink,hive sink,logger sink,kafka sink等等,同样也支持自定义sink。

sink文档arrow-up-right

自定义拦截器(Interceptor)

  使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

  1. 创建一个 Maven 项目,并引入以下依赖

  1. 定义 CustomInterceptor 类并实现 Interceptor 接口。之后将 Maven 项目打成 jar 包上传到 /flume/lib 目录下。

  1. 创建 flume1.conf

    配置一个 Netcat Source,一个 Sink Group(2 个 Avro Sink),并配置相应 ChannelSelector 和 Interceptor。

  1. 创建 flume2.conf

配置一个 Avro Source 和 logger Sink。

  1. 创建 flume3.conf

配置一个 Avro Source 和 logger Sink。

  1. 开启监控

自定义 Source

  使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可以从 flume 配置文件中配置。

  1. 定义 MySource 类,继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。之后将 Maven 项目打成 jar 包上传到 /flume/lib 目录下。

  1. 创建配置文件 mySource.conf。

  1. 开启监控

  1. 查看结果

自定义 Sink

  使用 Flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 Flume 配置文件中配置。

  1. 定义 MySink 类,继承 AbstractSink 类并实现 Configurable 接口。之后将 Maven 项目打成 jar 包上传到 /flume/lib 目录下。

  1. 创建配置文件 mySink.conf

  1. 开启任务

Last updated