博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Flume 1.7.0 自定义输入输出
阅读量:6292 次
发布时间:2019-06-22

本文共 5302 字,大约阅读时间需要 17 分钟。

自定义http source

config

a1.sources.r1.type=httpa1.sources.r1.bind=localhosta1.sources.r1.port=8081a1.sources.r1.channels=c1#自定义source Handlera1.sources.r1.handler = org.apache.flume.sw.source.http.JSONHandlera1.sources.r1.handler.configHome = /home/www/logs/datareport

handler

public class JSONHandler implements HTTPSourceHandler {      private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);    public static final String PARA_SIGN = "sign";  public static final String PARA_PROJECT_ID = "projectId";  public static final String PARA_REPORT_MSG = "reportMsg";    private final Type mapType = new TypeToken
>() {}.getType(); private final Gson gson; //可以获取外部参数 private Context context = null; public JSONHandler() { gson = new GsonBuilder().disableHtmlEscaping().create(); } /** * {
@inheritDoc} */ @Override public List
getEvents(HttpServletRequest request) throws Exception { BufferedReader reader = request.getReader(); String charset = request.getCharacterEncoding(); //UTF-8 is default for JSON. If no charset is specified, UTF-8 is to //be assumed. if (charset == null) { LOG.debug("Charset is null, default charset of UTF-8 will be used."); charset = "UTF-8"; } else if (!(charset.equalsIgnoreCase("utf-8") || charset.equalsIgnoreCase("utf-16") || charset.equalsIgnoreCase("utf-32"))) { LOG.error("Unsupported character set in request {}. " + "JSON handler supports UTF-8, " + "UTF-16 and UTF-32 only.", charset); throw new UnsupportedCharsetException("JSON handler supports UTF-8, " + "UTF-16 and UTF-32 only."); } /* * Gson throws Exception if the data is not parseable to JSON. * Need not catch it since the source will catch it and return error. */ LinkedHashMap
map = new LinkedHashMap
(); try { map = gson.fromJson(reader, mapType); } catch (JsonSyntaxException ex) { throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex); } String configHome = this.context.getString("configHome"); LOG.info(configHome); String projectId = map.get(PARA_PROJECT_ID).toString(); String reportMsg = map.get(PARA_REPORT_MSG).toString(); Map
headers = new HashMap
(); headers.put(PARA_PROJECT_ID, projectId); headers.put(PARA_SIGN, ""); JSONEvent jsonEvent = new JSONEvent(); jsonEvent.setHeaders(headers); jsonEvent.setBody(reportMsg.getBytes()); return getSimpleEvents(jsonEvent); } @Override public void configure(Context context) { this.context = context; } private List
getSimpleEvents(Event e) { List
newEvents = new ArrayList
(1); newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders())); return newEvents; }}

自定义Sink

config

#自定义Sinka1.sinks.k1.type = org.apache.flume.sw.sink.RollingFileSinka1.sinks.k1.channel = c1a1.sinks.k1.sink.rollInterval = 15a1.sinks.k1.sink.directory = D:/var/log/flume#自定义pathManager类型a1.sinks.k1.sink.pathManager = CUSTOM#文件创建频率 (null or yyyyMMddHHmmss), 默认值null->不创建a1.sinks.k1.sink.pathManager.dirNameFormatter = yyyyMMdda1.sinks.k1.sink.pathManager.prefix = log_a1.sinks.k1.sink.pathManager.extension = txt

自定义RollingFileSink

if(pathManagerType.equals("CUSTOM")) {      //如果外部配置的PathManager是CUSTOM,则直接new出自定义的SimplePathManager      pathController = new SimplePathManager(pathManagerContext);    } else {      pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);    }

自定义pathManager类型

public class SimplePathManager extends DefaultPathManager {  private static final Logger logger = LoggerFactory      .getLogger(SimplePathManager.class);  private final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMddHHmmss");  private DateTimeFormatter dirNameFormatter = null;  private String lastRoll;  public SimplePathManager(Context context) {    super(context);        String dirNameFormatterStr = context.getString("dirNameFormatter");    if(dirNameFormatterStr == null || "null".equals(dirNameFormatterStr)){      dirNameFormatter = null;    } else {      dirNameFormatter = DateTimeFormat.forPattern(dirNameFormatterStr);    }      }  @Override  public File nextFile() {    LocalDateTime now = LocalDateTime.now();    StringBuilder sb = new StringBuilder();    String date = formatter.print(now);    if (!date.equals(lastRoll)) {      getFileIndex().set(0);      lastRoll = date;    }    sb.append(getPrefix()).append(date).append("-");    sb.append(getFileIndex().incrementAndGet());    if (getExtension().length() > 0) {      sb.append(".").append(getExtension());    }        File dir = dirNameFormatter != null ? new File(getBaseDirectory(), dirNameFormatter.print(now)) :       getBaseDirectory();        try {      FileUtils.forceMkdir(dir);      currentFile = new File(dir, sb.toString());    } catch (IOException e) {      currentFile = new File(getBaseDirectory(), sb.toString());      logger.error(e.toString(), e);    }          return currentFile;  }  public static class Builder implements PathManager.Builder {    @Override    public PathManager build(Context context) {      return new SimplePathManager(context);    }  }}

 

转载于:https://www.cnblogs.com/chenpi/p/7218912.html

你可能感兴趣的文章
正文提取算法
查看>>
轻松学PHP
查看>>
Linux中的网络监控命令
查看>>
this的用法
查看>>
windows下安装redis
查看>>
CentOS7 yum 安装git
查看>>
启动日志中频繁出现以下信息
查看>>
httpd – 对Apache的DFOREGROUND感到困惑
查看>>
分布式锁的一点理解
查看>>
idea的maven项目,install下载重复下载本地库中已有的jar包,而且下载后jar包都是lastupdated问题...
查看>>
2019测试指南-web应用程序安全测试(二)指纹Web服务器
查看>>
树莓派3链接wifi
查看>>
js面向对象编程
查看>>
Ruby中类 模块 单例方法 总结
查看>>
jQuery的validate插件
查看>>
5-4 8 管道符 作业控制 shell变量 环境变量配置
查看>>
Enumberable
查看>>
开发者论坛一周精粹(第五十四期) 求购备案服务号1枚!
查看>>
validate表单验证及自定义方法
查看>>
javascript 中出现missing ) after argument list的错误
查看>>