介绍
将维表数据存储在Redis、HBase、MySQL等外部存储系统中,Flink接收到一个数据,就实时去外部存储中查询
通过 使用AsyncDataStream和AsyncFunction、RichAsyncFunction 进行异步处理 文章可知,Flink中应该使用异步IO来读写外部系统,
优点
维度数据量不受内存限制,可以存储很大的数据量。
缺点
对于频繁访问的数据,需要将查询结果缓存起来,这样性能更好
例子
注意:要求外部系统客户端支持异步IO,否则需要 自定义实现异步IO
使用异步就要涉及到三个问题:
- 超时:如果查询超时那么就认为是读写失败,需要按失败处理;
- 并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入。
- 返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序。
pom.xml依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<!--java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- kafka Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- hbase的java客户端 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.9</version>
</dependency>
</dependencies>
java代码
package flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class 异步IO_hbase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 定义kafka相关配置项
Properties pro = new Properties();
pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");
pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumer-group");
pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
FlinkKafkaConsumer<String> fkc = new FlinkKafkaConsumer<>("kafka2flink", new SimpleStringSchema(), pro);
DataStreamSource<String> dss = env.addSource(fkc);
SingleOutputStreamOperator<List<CellData>> soso = AsyncDataStream.orderedWait(
dss,
new MyRichAsyncFunction() ,
5,
TimeUnit.SECONDS);
soso.print("soso--");
env.execute();
}
static class MyRichAsyncFunction extends RichAsyncFunction<String,List<CellData>> {
// zookeeper的集群ip
static final String ADDR = "hadoop1,hadoop2,hadoop3";
// zookeeper 的端口号
static final String PORT = "2181";
//hbase工具类
HbaseUtils utils;
// 创建线程池
private ThreadPoolExecutor threadPoolExecutor;
@Override
public void open(Configuration parameters) throws Exception {
utils=new HbaseUtils();
utils.connect(ADDR,PORT);
//初始化线程池
threadPoolExecutor = new ThreadPoolExecutor(
10,
10,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue(1000));
}
/**
* 每来一条数据,就执行一次该方法
* @param id 输入数据
* @param resultFuture 通过该对象收集数据
* @throws Exception
*/
@Override
public void asyncInvoke(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
List<CellData> res = utils.get("book", id);
// 通过下面方法收集数据
resultFuture.complete(Collections.singleton(res));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 如果执行超时,就执行该方法
* @param id element coming from an upstream task
* @param resultFuture to be completed with the result data
* @throws Exception
*/
@Override
public void timeout(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
resultFuture.complete(Collections.singleton(new ArrayList<>()));
}
@Override
public void close() throws Exception {
// 关闭数据库连接
if(utils!=null) {
utils.close();
}
// 终止定时任务
if(threadPoolExecutor!=null){
threadPoolExecutor.shutdown();
}
}
}
}
启动zk
登录 hadoop1,执行下面命令:
/program/bin/zk.sh start
启动hadoop
登录 hadoop1,执行下面命令:
/program/bin/hadoop.sh start
启动hbase
登录 hadoop1,执行下面命令:
start-hbase.sh
建表
create "book","c1"
添加测试数据
put 'book','1020','c1:author','韩梅梅'
put 'book','1020','c1:price','121.90'
put 'book','1020','c1:title','scala从入门到精通'
启动kafka
登录 hadoop1,执行下面命令:
/program/bin/kafka.sh start
启动 flink
在 idea 中运行 flink 程序
启动生产者、指定主题
登录 hadoop1,执行下面命令:
/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink
然后输入:
1020
10211
执行结果
在 idea 控制台中打印结果如下:
soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}]
soso--> []
参考:
https://blog.csdn.net/Yuan_CSDF/article/details/117486259