尊龙凯时网址

阿里rocketmq试用记录 简单的spring集成 -尊龙凯时网址

2023-08-17,,

csdn学院招募微信小程序讲师啦

    

程序猿全指南,让【移动开发】更简单!
      
【观点】移动原生app开发 pk html 5开发

   

云端应用征文大赛,秀绝招,赢无人机!


    



标签:





5029人阅读
评论(0)

举报








分类:







目录(?)[ ]

    rocketmq试用简单的spring集成
      rocketmq
      核心原理
        1 数据结构
        2 刷盘策略
        3 内存机制
        4 工作模式

      环境安装

        1 java环境安装
        2 rocketmq安装

      测试网络拓扑
      启停操作
      运维指令
      基本测试
      宕机实验
      遗留问题
      rocketmqspring源码下载
      参考文献

经过2天的试用初步了解了一下rocketmq的基本用法,搜索了一下度娘,没有找到spring的例子,所以简单搞了一点代码感受一下。

1.rocketmq

rocketmq的前身是metaq,当metaq3.0发布时,产品名称改为rocketmq,有以下特点:
1) 能够保证严格的消息顺序
2) 提供丰富的消息拉取模式
3) 高效的订阅者水平扩展能力
4)实时的消息订阅机制
5)亿级消息堆积能力

2.核心原理

2.1. 数据结构

(1)所有数据单独储存到commit log ,完全顺序写,随机读

(2)对最终用户展现的队列实际只储存消息在commit log 的位置信息,并且串行方式刷盘

(3)按照messageid查询消息

(4)根据查询的key的hashcode%slotnum得到具体的槽位置

(5)根据slotvalue(slot对应位置的值)查找到索引项列表的最后一项

(6)遍历索引项列表返回查询时间范围内的结果集

2.2. 刷盘策略

rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取

使用简单的符号标识不同的标题,将某些文字标记为粗体或者斜体,创建一个链接等,详细语法参考帮助?。

本编辑器支持 markdown extra ,  扩展了很多好用的功能。具体请参考[github][2].

2.3. 内存机制

2.4. 工作模式

3. 环境安装

3.1. java环境安装

安装

rpm -ivh jdk-7u80-linux-x64.rpm
1
1

环境变量

java_home=/usr/java/jdk1.7.0_80
classpath=.:$java_home/lib.tools.jar
path=$java_home/bin:$path
export java_home classpath path
export rocketmq_home=/usr/local/service/alibaba-rocketmq
1 2 3 4 5 1 2 3 4 5

3.2. rocketmq安装

https://github.com/alibaba/rocketmq/releases下载3.2.6,解压

4. 测试网络拓扑

因为手里没有其他服务器,105那台缺少一个slave,在同步双写模式下,发送消息会返回 slave_not_available,不过消息已经发送成功,只是slave没有写成功。

5. 启停操作

这里只给出一个基本的示例,各个模式的启停在本文最后的参考文献中会有详细的说明。这里不再赘述。

启动nameserver

nohup ./mqnamesrv &
1
1

停止nameserver

./mqshutdown namesrv
1
1

启动broker(单master)(多master,多master slave)对应的(异步复制,同步双写)

nohup sh mqbroker -n 192.168.146.109:9876 -c $rocketmq_home/conf/2m-noslave/broker-a.properties &
1
1

停止broker

./mqshutdown broker
1
1

6. 运维指令

查看集群情况

./mqadmin clusterlist -n 127.0.0.1:9876
1
1

查看broker状态

./mqadmin brokerstatus -n 127.0.0.1:9876 -b 192.168.146.105:10911
1
1

查看topic列表

./mqadmin topiclist -n 127.0.0.1:9876
1
1

查看topic状态

./mqadmin topicstatus -n 127.0.0.1:9876 -t pushtopic
1
1

查看topic路由

./mqadmin topicroute  -n 127.0.0.1:9876 -t pushtopic
1
1

7. 基本测试

基本测试采用java直接编码的方式生产和消费消息,例子来源于参考文献的《rocketmq开发教程》。本文最后的代码示例,采用了spring的形式。

producer

package com.jd.wxz;
import com.alibaba.rocketmq.client.producer.defaultmqproducer;
import com.alibaba.rocketmq.client.producer.sendresult;
import com.alibaba.rocketmq.common.message.message; public class producer {
public static void main(string[] args){
defaultmqproducer producer = new defaultmqproducer("producer");
producer.setnamesrvaddr("192.168.146.109:9876");
try {
producer.start(); message msg = new message("pushtopic",
"push",
"1",
"just for test.".getbytes()); sendresult result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus()); msg = new message("pushtopic",
"push",
"2",
"just for test.".getbytes()); result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus()); msg = new message("pulltopic",
"pull",
"1",
"just for test.".getbytes()); result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus());
} catch (exception e) {
e.printstacktrace();
}finally{
producer.shutdown();
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

consumer


package com.sean; import java.util.list; import com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;
import com.alibaba.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import com.alibaba.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import com.alibaba.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import com.alibaba.rocketmq.common.consumer.consumefromwhere;
import com.alibaba.rocketmq.common.message.message;
import com.alibaba.rocketmq.common.message.messageext; public class consumer {
public static void main(string[] args){
defaultmqpushconsumer consumer =
new defaultmqpushconsumer("pushconsumer");
consumer.setnamesrvaddr("192.168.146.109:9876");
try {
//订阅pushtopic下tag为push的消息
consumer.subscribe("pushtopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setconsumefromwhere(
consumefromwhere.consume_from_first_offset);
consumer.registermessagelistener(
new messagelistenerconcurrently() {
public consumeconcurrentlystatus consumemessage(
list list,
consumeconcurrentlycontext context) {
message msg = list.get(0);
system.out.println(msg.tostring());
return consumeconcurrentlystatus.consume_success;
}
}
);
consumer.start();
} catch (exception e) {
e.printstacktrace();
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

运行结果:

服务端监控:

8.宕机实验

9.遗留问题

1)  关闭master 自动切换到slave无法实现,官方资料上没有明确指明,第三方文档里有(见文献3)。
2) 在开发机服务器上运行os.sh进行优化,导致网络无法连接,运维帮忙重启才恢复。

10.rocketmq spring源码下载

戳我

11.参考文献

1) 《rocketmq入门(上)》

2) 《rocketmq入门(下)》

3) 《rokectmq开发教程》

4) 《阿里rocketmq quict start》

5) 《rocketmq与kafka对比(18项差异)》

6) 《rocketmq命令整理》

7) 《rocketmq原理简介》


    


0


0







 



.blog-ass-articl dd {
color: #369;
width: 99%; /*修改行*/
float: left;
overflow: hidden;
font: normal normal 12px/23px "simsun";
height: 23px;
margin: 0;
padding: 0 0 0 10px;
margin-right: 30px;
background: no-repeat 0 10px;
}

参考知识库

linux知识库

9149关注|3461收录

java se知识库

20836关注|468收录

java ee知识库

13999关注|1215收录

java 知识库

22124关注|1436收录

软件测试知识库

3312关注|310收录

算法与数据结构知识库

12521关注|2320收录

更多资料请参考:

 
猜你在找









查看评论

  暂无评论

发表评论
用 户 名:
liuguoyun_123456
评论内容:

html/xmlobjective-cdelphirubyphpc#c javascriptvisual basicpythonjavacsssql其它

* 以上用户言论只代表其个人观点,不代表csdn网站的观点或立场



    

.tag_list
{
background: none repeat scroll 0 0 #ffffff;
border: 1px solid #d7cbc1;
color: #000000;
font-size: 12px;
line-height: 20px;
list-style: none outside none;
margin: 10px 2% 0 1%;
padding: 1px;
}
.tag_list h5
{
background: none repeat scroll 0 0 #e0dbd3;
color: #47381c;
font-size: 12px;
height: 24px;
line-height: 24px;
padding: 0 5px;
margin: 0;
}
.tag_list h5 a
{
color: #47381c;
}
.classify
{
margin: 10px 0;
padding: 4px 12px 8px;
}
.classify a
{
margin-right: 20px;
white-space: nowrap;
}




全部主题

hadoop

aws

移动游戏

java

android

ios

swift

智能硬件

docker

openstack

vpn

spark

erp

ie10

eclipse

crm

javascript

数据库

ubuntu

nfc

wap

jquery

bi

html5

spring

apache

.net

api

html

sdk

iis

fedora

xml

lbs

unity

splashtop

uml

components

windows mobile

rails

qemu

kde

cassandra

cloudstack

ftc

coremail

ophone

couchbase

云计算

ios6

rackspace

web app

springside

maemo

compuware

大数据

aptech

perl

tornado

ruby

hibernate

thinkphp

hbase

pure

solr

angular

cloud foundry

redis

scala

django

bootstrap

#popup_mask
{
position: absolute;
width: 100%;
height: 100%;
background: #000;
z-index: 9999;
left: 0px;
top: 0px;
opacity: 0.3;
filter: alpha(opacity=30);
display: none;
}

                    


阿里rocketmq试用记录 简单的spring集成的相关教程结束。

网站地图