博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
KakfaSpout自定义scheme
阅读量:4613 次
发布时间:2019-06-09

本文共 3959 字,大约阅读时间需要 13 分钟。

一.Mapper和Scheme

scheme:将kafka传到spout里的数据格式进行转化. record->tuple

mapper:将storm传到kafka的数据格式进行转化.tuple->record

二.为什么要自定义消息格式

在很多需求里, 从kafka传递过来的数据并不是单纯的string, 可以是任意对象.当我们需要根据对象的某个属性进行分组时, 默认的new Fields("bytes")就不太适合.但是消息传递的形式还是string.我们可以在传入kafka之前使用fastJson的转化方法将实体对象转化成jsonString.

到了scheme在转换成实体类对象.

三.怎么更改scheme

构建kafkaSpout时我们要配置很多参数, 可以看一下kafkaConfig代码.

public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息public final String topic;//从哪个topic读取消息public final String clientId; // SimpleConsumer所用的client idpublic int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offsetpublic long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使startOffsetTime

 

可以看到, 所有的配置项都是public, 所以当我们实例化一个spoutConfig之后, 可以通过直接引用的方式进行更改属性值.

我们可以看构建kafkaspout的代码:

ZkHosts zkHosts = new ZkHosts(zkHost);// zk对地址有唯一性标识String zkRoot = "/" + topic;String id = UUID.randomUUID().toString();// 构建spoutConfigSpoutConfig spoutConf = new SpoutConfig(zkHosts, topic, zkRoot, id);spoutConf.scheme = new SchemeAsMultiScheme(new SensorDataScheme());spoutConf.startOffsetTime = OffsetRequest.LatestTime();KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);

四.怎么自定义scheme

我们有这样一个需求,有一个实体类如下:

public class SensorData implements Serializable {    // 设备Id;    private String deviceId;    // 型号id    private String dmPropertiesId;     // 通道名称;    private String channelName;    // 采集的温度值    private double deviceTemp;    // 采集的时间;    private Date date;}

数据进来kafka到storm消费时, 根据deviceId进行分组.当然, 我们在写入的时候对数据json化, 使用fastjson把实体对象变成字符串, 而不是直接传实体类对象进入kafka(亲测会报错, 无法进行转换).最终数据会在scheme的declare的方法里处理.

Scheme接口:

public interface Scheme extends Serializable {    List deserialize(ByteBuffer ser);    public Fields getOutputFields();}

可以看到有两个需要实现的方法, 一个是传过来的byte数据进行转化, 一个是传入下一层bolt的时候以什么字段分组. 跟踪kafka的源码我们可以看到, 他的declare方法最终会调用scheme的方法来确认字段名.

看一下scheme的整体代码:

package dm.scheme;import java.nio.ByteBuffer;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.List;import org.apache.storm.kafka.StringScheme;import org.apache.storm.spout.Scheme;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import com.alibaba.fastjson.JSON;import dm.entity.SensorData;/** *  * KafkaRecord 映射 tuple 转化类; *  * @author chenwen * */public class SensorDataScheme implements Scheme {    /**     *      */    private static final long serialVersionUID = 1L;    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;    /**     *      * 反序列化     */    @Override    public List deserialize(ByteBuffer byteBuffer) {        // 将kafka消息转化成jsonString        String sensorDataJson = StringScheme.deserializeString(byteBuffer);        SensorData sensorData = JSON.parseObject(sensorDataJson, SensorData.class);        String id = sensorData.getDeviceId();        return new Values(id, sensorData);    }    public static String deserializeString(ByteBuffer byteBuffer) {        if (byteBuffer.hasArray()) {            int base = byteBuffer.arrayOffset();            return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining());        } else {            return new String(Utils.toByteArray(byteBuffer), UTF8_CHARSET);        }    }    @Override    public Fields getOutputFields() {        return new Fields("deviceId", "sensorData"); // 返回字段及其名称;    }}

 

转载于:https://www.cnblogs.com/unclecc/p/10001463.html

你可能感兴趣的文章
pip安装问题
查看>>
Mysql 出现Table‘xxx’is read only问题
查看>>
欧几里得&扩展欧几里得算法
查看>>
Block使用的注意事项
查看>>
HDU1700:Points on Cycle
查看>>
8个实用的Linux下Bash命令提示行(转)
查看>>
Python常用模块-shutil高级文件处理模块
查看>>
css hack
查看>>
POJ-1273Drainage Ditches(网络流入门题,最大流)
查看>>
桥接模式
查看>>
thrift 学习
查看>>
9. iptables 配置
查看>>
vc多线程编程
查看>>
TCP与UDP的异同(服务端接收数据,客户端发送数据)
查看>>
OkHttp 官方wiki 使用案例 MD
查看>>
【版本】API NDK 系统 分辨率 统计
查看>>
.net选择数据库表\列导出数据
查看>>
MVVM(Model-View-View-Model)简单分析(及代码示例)
查看>>
Mac 下 android/iOS https抓包
查看>>
安装VMwareTool
查看>>