spark streaming消费rocketmq的几种方式

spark streaming消费rocketmq的几种方式

    正在检查是否收录...

在 Spark 里接入

RocketMQ

,主要有两大类方式:


🔹 1. 基于

老的 Spark Streaming (DStream API)

RocketMQ 社区提供过

rocketmq-spark

connector(在 apache/rocketmq-externals 里),可以像 Kafka 一样创建 DStream:

方式 A:Receiver 模式

  • 使用自定义的 Receiver 从 RocketMQ 拉取消息。

  • 每条消息进入 Spark Streaming 的 ReceiverInputDStream

  • 优点:实现简单。

  • 缺点:消息会先缓存在 Spark executor 的内存里,容错依赖 Spark 的 WAL(Write Ahead Log),性能和可靠性一般。

a. 不使用WAL(Write Ahead Log)

核心思路是:

  • 在 Spark 里实现一个自定义 Receiver<T>,内部运行

    RocketMQ PushConsumer

  • PushConsumer 收到消息后,调用 store(msg) 把数据写入 Spark Streaming 的内存队列。

  • Spark Streaming 后续把这些数据打包成 RDD 处理。

核心流程:

  1. RocketMQ → PushConsumer

    • 消息推送到 Spark 进程。

  2. Spark Receiver → store()

    • Receiver 缓存消息,存到 Spark executor 的 BlockManager。

  3. Spark Streaming Job

    • 定时将数据生成 RDD 进行处理。

示例代码

1️⃣ Order 类

 1 import java.io.Serializable;  2  3 public class Order implements Serializable {  4 private String orderNo;  5 private Long cost;  6  7 public Order(String orderNo, Long cost) {  8 this.orderNo = orderNo;  9 this.cost = cost; 10  } 11 12 public String getOrderNo() { return orderNo; } 13 public Long getCost() { return cost; } 14 15  @Override 16 public String toString() { 17 return "Order{" + 18 "orderNo='" + orderNo + '\'' + 19 ", cost=" + cost + 20 '}'; 21  } 22 }

2️⃣ 自定义 RocketMQReceiver

 1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  5 import org.apache.rocketmq.common.message.MessageExt;  6 import org.apache.spark.storage.StorageLevel;  7 import org.apache.spark.streaming.receiver.Receiver;  8  9 import java.io.ByteArrayInputStream; 10 import java.io.ObjectInputStream; 11 import java.util.List; 12 13 public class RocketMQReceiver extends Receiver<Order> { 14 private final String namesrvAddr; 15 private final String topic; 16 private final String group; 17 18 private transient DefaultMQPushConsumer consumer; 19 20 public RocketMQReceiver(String namesrvAddr, String topic, String group) { 21 super(StorageLevel.MEMORY_AND_DISK_2()); 22 this.namesrvAddr = namesrvAddr; 23 this.topic = topic; 24 this.group = group; 25  } 26 27  @Override 28 public void onStart() { 29 new Thread(this::initConsumer).start(); 30  } 31 32 private void initConsumer() { 33 try { 34 consumer = new DefaultMQPushConsumer(group); 35  consumer.setNamesrvAddr(namesrvAddr); 36 consumer.subscribe(topic, "*"); 37 38 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 39 for (MessageExt msg : msgs) { 40 Order order = deserialize(msg.getBody()); 41 if (order != null) { 42 store(order); // 推送到 Spark 43  } 44  } 45 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 46  }); 47 48  consumer.start(); 49 System.out.println("RocketMQReceiver started."); 50 } catch (Exception e) { 51 restart("Error starting RocketMQReceiver", e); 52  } 53  } 54 55  @Override 56 public void onStop() { 57 if (consumer != null) { 58  consumer.shutdown(); 59  } 60  } 61 62 private Order deserialize(byte[] body) { 63 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) { 64 return (Order) ois.readObject(); 65 } catch (Exception e) { 66 return null; 67  } 68  } 69 }

3️⃣ Spark Streaming 主程序

 1 import org.apache.spark.SparkConf;  2 import org.apache.spark.streaming.Durations;  3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;  4 import org.apache.spark.streaming.api.java.JavaStreamingContext;  5  6 public class RocketMQStreamingApp {  7 public static void main(String[] args) throws Exception {  8 SparkConf conf = new SparkConf().setAppName("RocketMQReceiverExample").setMaster("local[2]");  9 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); 10 11 // 创建 Receiver 12 JavaReceiverInputDStream<Order> stream = 13 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group")); 14 15 // 简单处理:打印订单 16 stream.foreachRDD(rdd -> { 17 rdd.foreach(order -> System.out.println("Got order: " + order)); 18  }); 19 20  ssc.start(); 21  ssc.awaitTermination(); 22  } 23 }

b. 

使用WAL(Write Ahead Log)

在 Spark Streaming 里,开启 WAL 很简单:

  1. 设置 checkpoint 目录

    (必须是 HDFS 或可靠存储);

  2. Receiver 要用 StorageLevel.MEMORY_AND_DISK_SER_2()(支持 WAL 持久化);

  3. Spark 自动把每条接收到的数据先写到 WAL,再交给 BlockManager。

🚀 完整示例代码(带 WAL)

1️⃣ Order 类(和之前相同)

 1 import java.io.Serializable;  2  3 public class Order implements Serializable {  4 private String orderNo;  5 private Long cost;  6  7 public Order(String orderNo, Long cost) {  8 this.orderNo = orderNo;  9 this.cost = cost; 10  } 11 12 public String getOrderNo() { return orderNo; } 13 public Long getCost() { return cost; } 14 15  @Override 16 public String toString() { 17 return "Order{" + 18 "orderNo='" + orderNo + '\'' + 19 ", cost=" + cost + 20 '}'; 21  } 22 }

2️⃣ RocketMQReceiver(改用支持 WAL 的 StorageLevel)

 1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  5 import org.apache.rocketmq.common.message.MessageExt;  6 import org.apache.spark.storage.StorageLevel;  7 import org.apache.spark.streaming.receiver.Receiver;  8  9 import java.io.ByteArrayInputStream; 10 import java.io.ObjectInputStream; 11 12 public class RocketMQReceiver extends Receiver<Order> { 13 private final String namesrvAddr; 14 private final String topic; 15 private final String group; 16 17 private transient DefaultMQPushConsumer consumer; 18 19 public RocketMQReceiver(String namesrvAddr, String topic, String group) { 20 // 使用支持 WAL 的存储级别 21 super(StorageLevel.MEMORY_AND_DISK_SER_2()); 22 this.namesrvAddr = namesrvAddr; 23 this.topic = topic; 24 this.group = group; 25  } 26 27  @Override 28 public void onStart() { 29 new Thread(this::initConsumer).start(); 30  } 31 32 private void initConsumer() { 33 try { 34 consumer = new DefaultMQPushConsumer(group); 35  consumer.setNamesrvAddr(namesrvAddr); 36 consumer.subscribe(topic, "*"); 37 38 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 39 for (MessageExt msg : msgs) { 40 Order order = deserialize(msg.getBody()); 41 if (order != null) { 42 // Spark 会先写 WAL,再写 BlockManager 43  store(order); 44  } 45  } 46 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 47  }); 48 49  consumer.start(); 50 System.out.println("RocketMQReceiver started with WAL."); 51 } catch (Exception e) { 52 restart("Error starting RocketMQReceiver", e); 53  } 54  } 55 56  @Override 57 public void onStop() { 58 if (consumer != null) { 59  consumer.shutdown(); 60  } 61  } 62 63 private Order deserialize(byte[] body) { 64 try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body))) { 65 return (Order) ois.readObject(); 66 } catch (Exception e) { 67 return null; 68  } 69  } 70 }

3️⃣ 主程序(开启 WAL 需要 checkpoint)

 1 import org.apache.spark.SparkConf;  2 import org.apache.spark.streaming.Durations;  3 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;  4 import org.apache.spark.streaming.api.java.JavaStreamingContext;  5  6 public class RocketMQStreamingApp {  7 public static void main(String[] args) throws Exception {  8 SparkConf conf = new SparkConf()  9 .setAppName("RocketMQReceiverWithWAL") 10 .setMaster("local[2]"); 11 12 // 每 5 秒一个 batch 13 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); 14 15 // 设置 checkpoint 目录(必须是可靠存储,如 HDFS) 16 ssc.checkpoint("hdfs://namenode:8020/spark-checkpoints/rocketmq"); 17 18 // 创建带 WAL 的 Receiver 19 JavaReceiverInputDStream<Order> stream = 20 ssc.receiverStream(new RocketMQReceiver("localhost:9876", "OrderTopic", "spark_group")); 21 22 // 简单处理:打印订单 23 stream.foreachRDD(rdd -> { 24 rdd.foreach(order -> System.out.println("Got order (with WAL): " + order)); 25  }); 26 27  ssc.start(); 28  ssc.awaitTermination(); 29  } 30 }

注意事项

  1. checkpoint 必须是 HDFS/S3/OSS

    等分布式存储,本地路径只适合测试。

  2. WAL 会写日志文件,保证

    至少一次(at-least-once)

    语义,但仍可能有重复消息,需要业务端去重。

  3. Receiver 模式 + WAL 性能比 Direct 模式差(多一次磁盘 IO)。

  4. 若想要

    exactly-once

    ,通常推荐 Structured Streaming(自动 checkpoint + sink 支持事务)。

⚠️ Receiver 模式特点

优点

  • 实现简单:直接用 RocketMQ PushConsumer 推消息到 Spark。

  • 不需要手动管理 offset。

缺点

  • Spark Receiver 先把数据存到内存(BlockManager),如果 Spark 崩溃,数据可能丢失。

  • 容错要依赖

    WAL(Write Ahead Log)

    ,但 WAL 会写 HDFS,性能比 Direct 模式差。

  • 难以保证严格

    exactly-once

 

方式 B:Direct 模式

  • 类似 Kafka Direct Stream,Spark Streaming 直接从 RocketMQ 拉取数据,不依赖 Spark Receiver。

  • 消息 offset 由用户管理,可以手动提交(通常写到 Zookeeper 或外部存储)。

  • 优点:性能更好,保证数据至少一次处理。

  • 缺点:需要自己管理 offset 提交,开发复杂一些。

而Direct 模式的特点是:

  • 不依赖 Spark Receiver(没有 WAL 开销)。

  • Spark Driver 直接从 RocketMQ 拉取消息。

  • 消费的 offset 由

    Spark Driver 维护

    ,通常要手动存储到外部(比如 HDFS、MySQL、Zookeeper)以便恢复。

实现思路

  1. 准备 RocketMQ Consumer API

    • Spark 没有内置 RocketMQ Direct API(像 Kafka 那样),需要借助

      rocketmq-spark connector

      或者自定义 Consumer。

    • 原理和 Kafka DirectStream 一样:

      • 在每个 micro-batch 触发时,去 RocketMQ 拉取一段消息(指定起始 offset、结束 offset)。

      • 转换成 RDD,交给 Spark 执行。

  2. 关键点

    • 手动管理 offset

      :RocketMQ 不会自动帮 Spark 提交,需要你把 offset 存到外部存储(比如 HDFS/ZK/MySQL)。

    • 并行度

      :可以按 Topic 的 Queue(partition 类似)拆分 RDD,分发到不同 task。

    • 容错

      :作业失败后,重新从存储的 offset 位置恢复。

🚀 代码示例(Direct 模式伪实现)

假设有 OrderTopic,包含多个 MessageQueue:

 1 package com.example;  2  3 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;  4 import org.apache.rocketmq.client.consumer.PullResult;  5 import org.apache.rocketmq.client.consumer.PullStatus;  6 import org.apache.rocketmq.common.message.MessageExt;  7 import org.apache.rocketmq.common.message.MessageQueue;  8 import org.apache.spark.SparkConf;  9 import org.apache.spark.api.java.JavaRDD; 10 import org.apache.spark.api.java.JavaSparkContext; 11 import org.apache.spark.streaming.Durations; 12 import org.apache.spark.streaming.api.java.JavaInputDStream; 13 import org.apache.spark.streaming.api.java.JavaStreamingContext; 14 15 import java.util.*; 16 17 public class RocketMQDirectStreamExample { 18 public static void main(String[] args) throws Exception { 19 SparkConf conf = new SparkConf() 20 .setAppName("RocketMQDirectStreamExample") 21 .setMaster("local[2]"); 22 23 JavaSparkContext sc = new JavaSparkContext(conf); 24 JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5)); 25 26 // RocketMQ Consumer 27 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("spark_consumer_group"); 28 consumer.setNamesrvAddr("localhost:9876"); 29  consumer.start(); 30 31 // 获取 Topic 下所有 Queue 32 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("OrderTopic"); 33 34 // 手动维护每个 queue 的 offset 35 Map<MessageQueue, Long> offsetTable = new HashMap<>(); 36 for (MessageQueue mq : mqs) { 37 offsetTable.put(mq, 0L); // 可以从外部存储恢复 38  } 39 40 // 每个 micro-batch 拉取数据 41 ssc.foreachRDD(time -> { 42 List<Order> orders = new ArrayList<>(); 43 44 for (MessageQueue mq : mqs) { 45 long offset = offsetTable.get(mq); 46 47 // 拉取消息 48 PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32); 49 50 if (pullResult.getPullStatus() == PullStatus.FOUND) { 51 for (MessageExt msg : pullResult.getMsgFoundList()) { 52 // 反序列化消息体 53 Order order = deserialize(msg.getBody()); 54 if (order != null) { 55  orders.add(order); 56  } 57  } 58 // 更新 offset 59  offsetTable.put(mq, pullResult.getNextBeginOffset()); 60  } 61  } 62 63 // 转换为 RDD 64 JavaRDD<Order> rdd = sc.parallelize(orders); 65 rdd.foreach(o -> System.out.println("Got Order: " + o)); 66 67 // TODO: 把 offsetTable 持久化到外部存储(保证容错) 68  }); 69 70  ssc.start(); 71  ssc.awaitTermination(); 72  } 73 74 private static Order deserialize(byte[] body) { 75 try (java.io.ObjectInputStream ois = 76 new java.io.ObjectInputStream(new java.io.ByteArrayInputStream(body))) { 77 return (Order) ois.readObject(); 78 } catch (Exception e) { 79 return null; 80  } 81  } 82 }

关键点说明

  1. offset 管理

    • offsetTable 记录每个 MessageQueue 的消费位置。

    • 每个 batch 消费后更新,并写入外部存储(比如 MySQL/HDFS)。

    • 程序重启时先从外部恢复 offset。

  2. 并行度

    • 可以用 sc.parallelize(orders),但更高效的是:每个 MessageQueue 映射成一个 RDD 分片,利用 Spark 分布式并行消费。

  3. 语义保证

    • 默认 at-least-once,可能会有重复消费,需要结合业务去重。

    • 如果 offset 和结果同时事务写入,可以做到 effectively-once。

📌 总结:

  • Receiver 模式

    → 简单但性能差。

  • Direct 模式

    → 手动拉取 offset,性能高,但需要自己管理 offset。

  • Structured Streaming

    → 推荐的现代方案,自动 offset 管理,SQL API,更容易保证 exactly-once。


🔹 2. 基于

Structured Streaming (DataFrame/Dataset API)

Structured Streaming 是 Spark 2.x 之后推荐的流处理 API。RocketMQ Connector 也支持 Structured Streaming:

方式 A:作为 Source

直接通过:

1 spark.readStream() .format("org.apache.rocketmq.spark") .option("namesrvAddr", "localhost:9876") .option("consumerGroup", "test_group") .option("topics", "OrderTopic") .load();

得到一个 DataFrame,包含:

  • key

  • body

  • topic

  • tags

  • offset

  • timestamp 等字段。

offset 自动 checkpoint

,不需要手动提交。


方式 B:作为 Sink

Structured Streaming 也能把结果写回 RocketMQ:

1 df.writeStream() .format("org.apache.rocketmq.spark") .option("namesrvAddr", "localhost:9876") .option("producerGroup", "result_group") .option("topic", "ResultTopic") .start();

这样 Spark 的计算结果会被写到另一个 RocketMQ topic。


🔹 3. 自己实现 Consumer → Spark

如果不想用官方 connector,也可以自己写:

  1. 在 Spark 里启动一个

    RocketMQ Java Consumer

  2. 消费消息后,把数据写到 Spark Streaming 的队列(例如 queueStream)。

  3. Spark Streaming 从这个队列里生成 DStream 进行计算。

👉 这种方式灵活,但 offset 管理和 exactly-once 语义都要自己处理,一般不推荐,除非你有特殊需求(比如自定义序列化/解码)。


🔎 总结

  • 老的 Spark Streaming (DStream)

    • Receiver 模式(简单,但容错差)。

    • Direct 模式(性能好,可手动提交 offset)。

  • Structured Streaming

    • 推荐方式,作为 RocketMQ Source(offset 自动管理,SQL API 简洁)。

    • 可以写回 RocketMQ。

  • 自研 Consumer + queueStream

    • 灵活,但 offset、容错全靠自己。

 

转发请注明出处:https://www.cnblogs.com/fnlingnzb-learner/p/19073518

  • 本文作者:WAP站长网
  • 本文链接: https://wapzz.net/post-27816.html
  • 版权声明:本博客所有文章除特别声明外,均默认采用 CC BY-NC-SA 4.0 许可协议。
本站部分内容来源于网络转载,仅供学习交流使用。如涉及版权问题,请及时联系我们,我们将第一时间处理。
文章很赞!支持一下吧 还没有人为TA充电
为TA充电
还没有人为TA充电
0
0
  • 支付宝打赏
    支付宝扫一扫
  • 微信打赏
    微信扫一扫
感谢支持
文章很赞!支持一下吧
关于作者
2.8W+
9
1
2
WAP站长官方

稳定币落地前夜:财政司司长牵头,100+CEO与这家AI公司搞大事

上一篇

《n8n 入门:从 0 到 1 学会自动化工作流》--分享1

下一篇
评论区
内容为空

这一切,似未曾拥有

  • 复制图片
按住ctrl可打开默认菜单