JAVA实现Flume+Kafka+SparkStreaming最简单的统计⽤户⾏
为⽇志案例
前⾔:
我就是⼀个⼩⽩,学spark过程中,遇到很多坑,写个博客记录下来,如果有初学的同学,可以按照我的思路实现⼀遍。
然后再按照⾃⼰需求,进⾏改造。
1、准备⼯作
奥斯卡李冰冰阿⾥云服务器或者Linux 虚拟机 (⾄少8G内存,作者使⽤的是 阿⾥云 CentOS 8.2 64位 2 核 8 GiB)
Windows 环境下 装了IDEA的电脑
Linux 安装了 java8 环境
阿⾥云开放了可能⽤到的端⼝安全组策略
2、配置服务
2.1 logback 整合 Flume
创建 Springboot WEB 项⽬
引⼊下列依赖, Springboot start Web 省略
<dependencies>
<dependency>
房祖名被判几年<groupId&ambytes.logback</groupId>
<artifactId>logback-flume-appender_2.11</artifactId>
<version>0.0.9</version>
</dependency>
<!-- mvnrepository/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
配置 l , 替换 阿⾥云公⽹IP ,4141 端⼝可替换, 要与 flume 监听的⼀致
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.ConsoleAppender">
<!-- encoders are by default assigned the type
ch.qos.der.PatternLayoutEncoder -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="flumeTest" class="ambytes.logback.flume.FlumeLogstashV1Appender"> <flumeAgents>
闫凤娇是谁阿⾥云公⽹IP:4141
</flumeAgents>
<flumeProperties>
connect-timeout=4000;
request-timeout=8000
</flumeProperties>
<batchSize>100</batchSize>
柳岩的乳<reportingWindow>1000</reportingWindow>
<additionalAvroHeaders>
myHeader = myValue
</additionalAvroHeaders>
<application>Gloria's Application</application>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd} %d{HH:mm:ss} %msg%n</pattern>
</layout>
</appender>
<root level="WARN">
</root>
<logger name="FlumeLogger" level="INFO" addtivity="false">
<appender-ref ref="flumeTest"/>
</logger>
</configuration>
创建业务 controller 调⽤ logger
package com.gloria.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.*;
@Controller
public class LoggerController {
public static Logger logger = Logger(LoggerController.class);
@ResponseBody
@RequestMapping(value = "/api/Logger", method = RequestMethod.GET)
public String apiLogger(LoggerEntity entity) {
//打印⾏为⽇志内容并被flume采集到kafka上
logger.String());
return "succ";
}
}
}
创建⾏为⽇志实体类
package com.gloria.Logger;
import org.apachemons.lang3.StringUtils; import org.straints.NotBlank;
public class LoggerEntity {
/*
省略 get/set ,tostring ,构造⽅法
*/
int accessType = 0; // 访问类型 0是点击 1是搜索 String userId = ""; // ⽤户主键
String bussinessId = ""; // 点击商品的业务外键
String bussinessType = ""; // 业务类型
String networkType = ""; // ⽹络类型
String latitude = "";// 纬度
String longitude = "";// 经度
String city = ""; // 登录城市
李荣浩女朋友陆瑶String gender = ""; // 性别
String visitTime = ""; // 浏览时间
String searchContent = ""; // 搜索内容
}
2.2 Flume 采集框架
2.2.1 下载并解压Flume
(1)可以直接点击链接进⾏下载,然后传给linux 服务器 ,
(2)也可以使⽤命令,个⼈习惯放在 /opt ⽬录下, (PS:推荐使⽤这种⽅式,很快)
# 安装在 /opt ⽬录下并解压
cd /opt
wget mirror.bit.edu/apache/flume/1.9.0/apache-flume-1.9.
tar -xzvf apache-flume-1.9.
2.2.2 配置 Flume
1. 修改配置⽂件, 指定 source 为 avro , chanel 为 memory , sink 为 kafka ,命名的配置⽂件
cd /opt/apache-flume-1.9.0-bin/conf
f
2. 配置⽂件内容 source: 监听本地4141端⼝,与logback中配置的需要对应⼀致 chanel : 不设置持久化,在内存中传递 sink:
Kafka 采集到消息中间件上
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.pe = avro
avro-memory-kafka.sources.avro-source.bind= 0.0.0.0
avro-memory-kafka.sources.avro-source.port= 4141
avro-memory-kafka.pe = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 0.0.0.0:9092
avro-memory-kafka.sinks.pic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.quiredAcks = 1
-pe = memory
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
flume 根据不同的 sources 选项,此处配置了 ack 机制和 批次处理⼤⼩ 以及 主题名称
第⼀种选择是把acks参数设置为0,意思就是我的KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪
怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了。
第⼆种选择是设置 acks = 1,意思就是说只要Partition Leader接收到消息⽽且写⼊本地磁盘了,就认为成功了,不管他其他
的Follower有没有同步过去这条消息了。(默认情况)
最后⼀种情况,就是设置acks=all,这个意思就是说,Partition Leader接收到消息之后,还必须要求ISR列表⾥跟Leader保持
同步的那些Follower都要把消息同步过去,才能认为这条消息是写⼊成功了。
3. 启动 flume 服务
#进⼊ flume 根⽬录
cd /opt/apache-flume-1.9.0-bin
#后台启动,启动⽇志在当前⽬录 nohup.out 中
nohup /opt/apache-flume-1.9.0-bin/bin/flume-ng agent --name avro-memory-kafka -c conf -f /opt/apa
che-flume-1.9.0-bin/f -Dflum 2.3 Kafka 消息中间件
为什么要引⼊消息中间件?
我们很多⼈在在使⽤Flume和kafka时,都会问⼀句为什么要将Flume和Kafka集成?那⾸先就应该明⽩业务需求,⼀般使⽤
Flume+Kafka架构都是希望完成实时流式的⽇志处理,后⾯再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从⽽完
王心凌范植伟成⽇志实时解析的⽬标。第⼀、如果Flume直接对接实时计算框架,当数据采集速度⼤于数据处理速度,很容易发⽣数据堆积或者数
据丢失,⽽kafka可以当做⼀个消息缓存队列,从⼴义上理解,把它当做⼀个数据库,可以存放⼀段时间的数据。第⼆、Kafka属于中
间件,⼀个明显的优势就是使各层解耦,使得出错时不会⼲扰其他组件。
因此数据从数据源到flume再到Kafka时,数据⼀⽅⾯可以同步到HDFS做离线计算,另⼀⽅⾯可以做实时计算,可实现数据多分发。
2.3.1 消息中间件技术选型
发布评论