Flume自定义Source

模拟编写了一个Flume 1.7中TAILDIR的功能实现,通过手动控制文件的读取位置来达到对文件的读写,防止flume挂了之后重复消费的情况。
以下是代码实现,仅做参考,生产上直接用TAILDIR读取文件内容即可,若要读取一个目录下的子目录,可使用github上以实现的这个项目包:https://github.com/qwurey/flume-source-taildir-recursive

创新互联长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为满城企业提供专业的成都做网站、网站建设,满城网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。

package com.fwmagic.flume.source;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Description:自定义Source 1、读取指定目录下的文件,如nginx的access.log
 * 2、读取文件前先判断offset文件是否存在,不存在则创建它
 * 3、每次读取完都写一个offset文件记录读取到文件的什么位置,防止重启flume时发生重复消费的情况
 * 4、如何自定义?参考ExecSource
 * 

* (1):获取自定义配置文件属性 * (2):创建线程池,用channelProcessor发送数据给channel * (3):线程池提交(启动任务) * 任务内容: * (1):读取偏移量文件,没有则创建,有则获取偏移量,将读取的指针重置到指定偏移量 * (2):读取指定的日志文件,将读取的一行内容打包成Event,用Channel发送Event * (3):获取读取内容后的偏移量,重置偏移量 * (4):stop方法调用,关闭线程池,调用super.stop方法。 * @Date:Create in 2018/8/19 */ public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable { /*监听的文件*/ private String filePath; /*记录读取偏移量的文件*/ private String posiFile; /*若读取文件暂无内容,则等待数秒*/ private Long interval; /*读写文件的字符集*/ private String charset; /*读取文件内容的线程*/ private FileRunner fileRunner; /*线程池*/ private ExecutorService executor; private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class); /** * 初始化配置文件内容 * * @param context */ @Override public void configure(Context context) { filePath = context.getString("filePath"); posiFile = context.getString("posiFile"); interval = context.getLong("interval", 2000L); charset = context.getString("charset", "UTF-8"); } @Override public synchronized void start() { //启动一个线程,用于监听对应的日志文件 //创建一个线程池 executor = Executors.newSingleThreadExecutor(); //用channelProcessor发送数据给channel ChannelProcessor channelProcessor = super.getChannelProcessor(); fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor); executor.submit(fileRunner); super.start(); } @Override public synchronized void stop() { fileRunner.setFlag(Boolean.FALSE); while (!executor.isTerminated()) { logger.debug("waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); logger.debug("Interrupted while waiting for executor service to stop,Just exiting."); Thread.currentThread().interrupt(); } } super.stop(); } public static class FileRunner implements Runnable { private Long interval; private String charset; private Long offset = 0L; private File pFile; private RandomAccessFile raf; private ChannelProcessor channelProcessor; private Boolean flag = Boolean.TRUE; public void setFlag(Boolean flag) { this.flag = flag; } public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) { this.interval = interval; this.charset = charset; this.channelProcessor = channelProcessor; //1、判断是否有偏移量文件,有则读取偏移量,没有则创建 pFile = new File(posiFile); if (!pFile.exists()) { try { pFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); logger.error("create position file error!", e); } } //2、判断偏移量中的文件内容是否大于0 try { String offsetStr = FileUtils.readFileToString(pFile, this.charset); // 3、如果偏移量文件中有记录,则将内容转换为Long if (StringUtils.isNotBlank(offsetStr)) { offset = Long.parseLong(offsetStr); } // 4、如果有偏移量,则直接跳到文件的偏移量位置 raf = new RandomAccessFile(filePath, "r"); // 跳到指定的位置 raf.seek(offset); } catch (IOException e) { e.printStackTrace(); logger.error("read position file error!", e); } } @Override public void run() { //监听文件 while (flag) { // 读取文件中的内容 String line = null; try { line = raf.readLine(); if (StringUtils.isNotBlank(line)) { // 把数据打包成Event,发送到Channel line = new String(line.getBytes("ISO-8859-1"), "UTF-8"); Event event = EventBuilder.withBody(line.getBytes()); channelProcessor.processEvent(event); //更新偏移量文件,把偏移量写入文件 offset = raf.getFilePointer(); FileUtils.writeStringToFile(pFile, offset.toString()); } else { try { Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); logger.error("thread sleep error", e); } } } catch (IOException e) { e.printStackTrace(); } } } } }


名称栏目:Flume自定义Source
转载源于:http://scyanting.com/article/gicijp.html