博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
<Spark Streaming><Flume><Integration>
阅读量:5054 次
发布时间:2019-06-12

本文共 2160 字,大约阅读时间需要 7 分钟。

Overview

  • Flume:一个分布式的,可靠的,可用的服务,用于有效地收集、聚合、移动大规模日志数据
  • 我们搭建一个flume + Spark Streaming的平台来从Flume获取数据,并处理它。
  • 有两种方法实现:使用flume-style的push-based方法,或者使用自定义的sink来实现pull-based方法。

Approach 1: Flume-style Push-based Approach

  • flume被设计用来在Flume agents之间推信息,在这种方式下,Spark Streaming安装一个receiver that acts like an Avro agent for Flume, to which Flume can push the data.

General Requirement

  • 当你启动flume + spark streaming应用时,该机器上必须运行一个Spark workers。
  • flume可以向该机器的某一个port push数据。
  • 基于这种push机制,streaming应用必须有一个receiver scheduled and listening on the chosen port.

Configuring Flume

  • 配置flume以向Avro sink发送数据
  • agent.sinks = avroSinkagent.sinks.avroSink.type = avroagent.sinks.avroSink.channel = memoryChannelagent.sinks.avroSink.hostname = 
    agent.sinks.avroSink.port =

Configuring Spark Streaming Application

  1. Linking: 在maven项目中配置依赖
org.apache.spark
spark-streaming-flume-sink_2.10
2.1.0

  2. Programming:import FlumeUtils, 创建input DStream

import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
  • 注意:应该与cluster中的resourceManager使用同一个hostname,这样的话资源分配可以匹配names,并在正确的机器上launch receiver
  • 一个简单的Spark Streaming统计Flume event个数的demo代码:
  • object FlumeEventCount {  def main(args: Array[String]) {    if (args.length < 2) {      System.err.println(        "Usage: FlumeEventCount 
    ") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() }}

     

转载于:https://www.cnblogs.com/wttttt/p/6841864.html

你可能感兴趣的文章
YARN学习总结
查看>>
C#基础温习(2):温习控制台程序(二)
查看>>
一些文章
查看>>
注解@ResponseBody的作用
查看>>
java main函数不执行?
查看>>
iOS 更好用的打Log方式-显示文件名、行数
查看>>
从MS SQL删除大数据说开去
查看>>
NOVO SOP (SOP简介及历史)
查看>>
windows7+docker添加php扩展
查看>>
V2019 Super DSP3 Odometer Correction Vehicle List
查看>>
Python 3.X 练习集100题 05
查看>>
今时不同往日:VS2010十大绝技让VS6叹服
查看>>
设计器 和后台代码的转换 快捷键
查看>>
在线视频播放软件
查看>>
用代码生成器生成的DAL数据访问操作类 基本满足需求了
查看>>
28初识线程
查看>>
Monkey测试结果分析
查看>>
Sublime Text 3 设置
查看>>
X64操作系统组件Jmail无法正常服务问题
查看>>
div 居中
查看>>