flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase

上接:flink 维表关联、翻译字典

介绍

将维表数据存储在Redis、HBase、MySQL等外部存储系统中,Flink接收到一个数据,就实时去外部存储中查询

通过 使用AsyncDataStream和AsyncFunction、RichAsyncFunction 进行异步处理 文章可知,Flink中应该使用异步IO来读写外部系统,

优点

维度数据量不受内存限制,可以存储很大的数据量。

缺点

对于频繁访问的数据,需要将查询结果缓存起来,这样性能更好

例子

注意:要求外部系统客户端支持异步IO,否则需要 自定义实现异步IO

使用异步就要涉及到三个问题:

  • 超时:如果查询超时那么就认为是读写失败,需要按失败处理;
  • 并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入。
  • 返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序。

pom.xml依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.0</flink.version>
</properties>

<dependencies>
    <!--java-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- kafka Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- hbase的java客户端 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.9</version>
    </dependency>
</dependencies>

java代码

package flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class 异步IO_hbase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 定义kafka相关配置项
        Properties pro = new Properties();
        pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");
        pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumer-group");
        pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
        FlinkKafkaConsumer<String> fkc = new FlinkKafkaConsumer<>("kafka2flink", new SimpleStringSchema(), pro);
        DataStreamSource<String> dss = env.addSource(fkc);

        SingleOutputStreamOperator<List<CellData>> soso = AsyncDataStream.orderedWait(
                dss,
                new MyRichAsyncFunction() ,
                5,
                TimeUnit.SECONDS);

        soso.print("soso--");
        env.execute();
    }
    static class MyRichAsyncFunction extends RichAsyncFunction<String,List<CellData>> {
        // zookeeper的集群ip
        static final String ADDR = "hadoop1,hadoop2,hadoop3";
        // zookeeper 的端口号
        static final String PORT = "2181";

        //hbase工具类
        HbaseUtils utils;
        // 创建线程池
        private ThreadPoolExecutor threadPoolExecutor;
        @Override
        public void open(Configuration parameters) throws Exception {
            utils=new HbaseUtils();
            utils.connect(ADDR,PORT);
            //初始化线程池
            threadPoolExecutor =  new ThreadPoolExecutor(
                    10,
                    10,
                    0,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(1000));
        }
        /**
         * 每来一条数据,就执行一次该方法
         * @param id 输入数据
         * @param resultFuture 通过该对象收集数据
         * @throws Exception
         */
        @Override
        public void asyncInvoke(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        List<CellData> res = utils.get("book", id);

                        // 通过下面方法收集数据
                        resultFuture.complete(Collections.singleton(res));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        /**
         * 如果执行超时,就执行该方法
         * @param id element coming from an upstream task
         * @param resultFuture to be completed with the result data
         * @throws Exception
         */
        @Override
        public void timeout(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
            resultFuture.complete(Collections.singleton(new ArrayList<>()));
        }

        @Override
        public void close() throws Exception {
            // 关闭数据库连接
            if(utils!=null) {
                utils.close();
            }
            // 终止定时任务
            if(threadPoolExecutor!=null){
                threadPoolExecutor.shutdown();
            }
        }
    }
}

启动zk

登录 hadoop1,执行下面命令:

/program/bin/zk.sh start

启动hadoop

登录 hadoop1,执行下面命令:

/program/bin/hadoop.sh start

启动hbase

登录 hadoop1,执行下面命令:

start-hbase.sh
建表
create "book","c1"
添加测试数据
put 'book','1020','c1:author','韩梅梅'
put 'book','1020','c1:price','121.90'
put 'book','1020','c1:title','scala从入门到精通'

启动kafka

登录 hadoop1,执行下面命令:

/program/bin/kafka.sh start

在 idea 中运行 flink 程序

启动生产者、指定主题

登录 hadoop1,执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink

然后输入:

1020
10211

执行结果

在 idea 控制台中打印结果如下:

soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}]
soso--> []

参考:
https://blog.csdn.net/Yuan_CSDF/article/details/117486259


原文出处:https://www.malaoshi.top/show_1IX4nPUb1tKI.html