Login
网站首页 > 文章中心 > 其它

Redis◆Hbase◆RocketMQ_实际使用问题案例分享

作者:小编 更新时间:2023-09-23 17:25:28 浏览量:58人看过

需求

将Hbase数据,解析后推送到RocketMQ.

redis使用list数据类型,存储了需要推送的数据的RowKey及表名.

Redis◆Hbase◆RocketMQ_实际使用问题案例分享

分析及确定方案

Redis

明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;

一次取多个元素加快效率;

取了之后放入重试队列,并删除原来的元素;

处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;

明确从list中取值所使用的redis命令;范围获取LRANGE;范围删除(留下指定范围的数据)LTRIM;判断list长度LLEN;加入listRPUSH;删除LREM等等;

从Hbase获取数据失败和发送到mq失败都令重试次数加一;

每次碰到重试次数不为0的数据都休眠1s;

设置最大重试次数,达到限制后丢弃;

考虑客户redis部署方式,单机、主从、集群、哨兵等;

编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;

确定批量获取数据方式为批量Get,没用scan;

了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;

考虑所有都没数据时休眠10s;

RocketMQ

有现成的发送代码,公司封装好的;

调整服务端的内存、线程数等参数;

实现

配置


#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制.
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射.xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back


部分代码

获取配置,其余的直接@Value("${}"):


@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {

/**
 * key:topic; value:redis的key
 */
private Map topicKeyMap = new HashMap<>();

/**
 * 一次从redis中读取数据量限制
 */
private long redisDataSize = 50;

/**
 * 失败数据重试次数
 */
private int retryNum = 10;

}


开启接入:


@Component
public class AdapterRunner implements ApplicationRunner {

@Resource
private DataAccessService dataAccessService;

@Override
public void run(ApplicationArguments args) {
    System.out.println("项目已启动,开始接入数据到RocketMQ......");
    dataAccessService.accessData2Mq();
}
}


其他代码其实也在分析里了.

踩坑

mq发送问题


org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)


上面分析也说了,注意发送速度,有多少资源就接入多快.还有注意相关三个端口是否开放.

总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等.

以上就是土嘎嘎小编为大家整理的Redis◆Hbase◆RocketMQ_实际使用问题案例分享相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章