
点击蓝字 关注我们

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。
Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone).

配置Flink
环境变量
1、编辑环境变量:
sudo vim ~/.bashrc增加Flink的路径

2、使环境变量生效:
#使环境变量生效source ~/.bashrc#查看环境变量echo $Flink_HOME
修改配
置文件
因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件
broker.id=0listeners=PLAINTEXT://0.0.0.0:9092#192.168.146.132修改成虚拟机ipadvertised.listeners=PLAINTEXT://192.168.146.132:9092

jobmanager.rpc.address: 0.0.0.0jobmanager.bind-host: 0.0.0.0jobmanager.cpu.cores: 1jobmanager.memory.process.size: 1600mtaskmanager.bind-host: 0.0.0.0taskmanager.host: 0.0.0.0taskmanager.memory.process.size: 2048mtaskmanager.cpu.cores: 1


dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh。查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:
#修改成自己的JAVA、Flink路径export JAVA_HOME=/data/jdk-11.0.29export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,
启动应用
启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。
#关闭防火墙sudo systemctl stop firewalld# 在 Flink 根目录下,执行以下命令启动 Flink 集群bin/start-cluster.sh# 启动 ZooKeeperbin/zookeeper-server-start.sh config/zookeeper.properties &# 启动 Kafka 服务器bin/Kafka-server-start.sh config/server.properties &#创建Kafka 主题bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1#使用命令行生产者发送消息bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092#消费bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092# 启动 Standalone Server 服务bash ./bin/dolphinscheduler-daemon.sh start standalone-server
测
试
测试Flink、Apache DolphinScheduler是否能访问成功。


编写
样例
用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:
pom.xml
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>Flink-Kafka-demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><Flink.version>1.18.1</Flink.version><scala.binary.version>2.12</scala.binary.version><Kafka.version>3.6.0</Kafka.version></properties><dependencies><!-- Flink核心依赖 --><dependency><groupId>org.apache.Flink</groupId><artifactId>Flink-java</artifactId><version>${Flink.version}</version></dependency><dependency><groupId>org.apache.Flink</groupId><artifactId>Flink-streaming-java</artifactId><version>${Flink.version}</version></dependency><dependency><groupId>org.apache.Flink</groupId><artifactId>Flink-clients</artifactId><version>${Flink.version}</version></dependency><!-- 连接器基础依赖 --><dependency><groupId>org.apache.Flink</groupId><artifactId>Flink-connector-base</artifactId><version>${Flink.version}</version></dependency><!-- Kafka连接器(关键修改点) --><dependency><groupId>org.apache.Flink</groupId><artifactId>Flink-connector-Kafka</artifactId><version>3.1.0-1.18</version></dependency><dependency><groupId>org.apache.Kafka</groupId><artifactId>Kafka-clients</artifactId><version>${Kafka.version}</version></dependency><!-- 日志依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.36</version><scope>runtime</scope></dependency></dependencies><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository><repository><id>apache-releases</id><url>https://repository.apache.org/content/repositories/releases/</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.Flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>
FlinkKafkaConsumerExample.java
import org.apache.Flink.api.common.functions.FlatMapFunction;import org.apache.Flink.api.java.tuple.Tuple2;import org.apache.Flink.api.java.utils.ParameterTool;import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.Flink.streaming.api.datastream.DataStream;import org.apache.Flink.streaming.api.functions.ProcessFunction;import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.Flink.util.Collector;import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;import org.apache.Flink.api.common.serialization.SimpleStringSchema;import org.apache.Kafka.clients.consumer.ConsumerConfig;import org.apache.Kafka.common.serialization.StringDeserializer;import java.util.Properties;import java.util.concurrent.CompletableFuture;public class FlinkKafkaConsumerExample {private static volatile int messageCount = 0;private static volatile boolean shouldStop = false;public static void main(String[] args) throws Exception {// 设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建 Kafka 消费者FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费DataStream<String> stream = env.addSource(KafkaConsumer);// 处理数据:分词和计数DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) {System.out.println(value);messageCount++;// 检查是否达到停止条件if (messageCount >= 2 && !shouldStop) {System.out.println("Processed 2 messages, stopping job.");shouldStop = true; // 设置标志位,表示应该停止}}});// 执行作业并获取 JobClientCompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {// 启动作业并获取 JobClientorg.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");System.out.println("Job ID: " + jobClient.getJobID());// 监测条件并取消作业while (!shouldStop) {Thread.sleep(100); // 每100毫秒检查一次}// 达到停止条件时取消作业if (shouldStop) {System.out.println("Cancelling the job...");jobClient.cancel().get(); // 取消作业}} catch (Exception e) {e.printStackTrace();}});// 在主线程中等待作业结束future.join(); // 等待作业完成}// Tokenizer 类用于将输入字符串转化为单词public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] tokens = value.toLowerCase().split("\\W+");for (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}}




在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:


原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553




用户案例

迁移实战

最新发版消息

加入社区
关注社区的方式有很多:
同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。
📂非代码方式包括:
完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。
👩💻代码方式包括:
查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。


你的好友秀秀子拍了拍你
并请你帮她点一下“分享”