一个RocketMQ文件存储的简单实现

无尘79148781

1.RocketMQ文件简介

RocketMQ具有其强大的存储能力和强大的消息索引能力,从众多消息中间件产品中脱颖而出,其原理很值得学习。

RocketMQ存储用的是本地文件存储系统,效率高也可靠。储文件主要分为CommitLog,Comsumequeue,Index 三类文件,





文件类型

说明

CommitLog

消息存储文件,所有消息主题的消息都存储在 CommitLog 文件中。 CommitLog单个文件大小默认1G,文件文件名是起始偏移量,总共20位,左边补零,起始偏移量是0。

比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824

ConsumeQueue

消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

ConsumeQueue对应每个Topic和QueuId下面的文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件;

IndexFile

消息索引文件,主要存储消息 Key 与 Offset 的对应关系。


消息消费队列是RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息队 列检索消息的速度

config 文件夹中

config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。

topics.json : topic 配置属性

subscriptionGroup.json :消息消费组配置信息。

delayOffset.json :延时消息队列拉取进度。

consumerOffset.json :集群消费模式消息消进度。

consumerFilter.json :主题消息过滤信息。



存储目录:


优点:对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。

缺点:写虽然完全是顺序写,但是读却变成了完全的随机读。读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度


2.RocketMQ文件结构说明

ConsumeQueue

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

根据topic和queueId来组织文件,如果TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。




8byte(commitlog offset)

4byte (msgLength)

8byte (tagCode)


ConfigoffsetTable.offset(json中保存)

和commitLog的offset不是一回事,这个offset是ConsumeQueue文件的(已经消费的)下标/行数,可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文

这个offset是消息消费进度的核心

{

"offsetTable":{

"zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43

},

"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250

},

"%RETRY%zxp_test_group2@zxp_test_group2":{0:3

}

"order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3

}

}

}

物理队列只有一个,采用固定大小的文件顺序存储消息。逻辑队列有多个,每个逻辑队列对应一个ConsumeQueue索引文件

OffsetStore分为以下2种,分别存储在客户端和服务器端:

本地文件类型

BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地,因为每条消息会被消费组内所有的消费者消费,同消费组的消费者相互独立,消费进度要单独存储,会以文本文件的形式存储在客户端,对应的数据结构为LocalFileOffsetStore

Broker代存储类型

在集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,另外,消费者发生异常或重启为了保证可以从上一次消费的地方继续进行消费,这时的offset是统一保存到broker服务端的。对应的数据结构为RemoteBrokerOffsetStore。

Commitlog

消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。

文件的默认位置如下,仍然可通过配置文件修改:

${user.home} store${commitlog}${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。

message1

totalSize

queueId

queueOffset

PhysicalOffset

body

topic

其它

message2

totalSize

queueId

queueOffset

PhysicalOffset

body

topic

其它

message3

totalSize

queueId

queueOffset

PhysicalOffset

body

topic

其它


IndexFile

用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;

indexFile存放的位置:${rocketmq.home}/store/index/indexFile(年月日时分秒等组成文件名)



我们发送的消息体中,包含 Message Key 或 Unique Key ,那么就会给它们每一个都构建索引。

    根据消息 Key 计算 Hash 槽的位置

    根据 Hash 槽的数量和 Index 索引来计算 Index 条目的起始位置

将当前 Index 条目的索引值,写在 Hash 槽 absSlotPos 位置上;将 Index 条目的具体信息 (hashcode/消息偏移量/时间差值/hash槽的值) ,从起始偏移量 absIndexPos 开始,顺序按字节写入。


由于出现了多个偏移量的概念,所以我总结一下:

    CommitLog中的offset(消息体偏移量)  体现在commitlog文件名称中,对应这个CommitLog文件所有消息在整个topic的队列中起始偏移量(方便通过ConsumeQueue.commitlogOffset找到当前要消费的消息存在于哪个commitlog文件)

    ConsumeQueue中的commitlogOffset(消息体偏移量)  定位了当前这条消息在commitlog中的偏移量

    offsettable.offset(下标)  定位了当前已经消费的ConsumeQueue的下标是哪条消息


3.MappedByteBuffer简介

以前我们操作大文件都是用BufferedInputStream、BufferedOutputStream等带缓冲的IO流处理。

MappedByteBuffer是Java提供的基于操作系统虚拟内存映射(MMAP)技术的文件读写API,采用direct buffer的方式读写文件内容,底层不再通过read、write、seek等系统调用实现文件的读写,所以效率非常高。主要用于操作大文件,如上百M、上GB的大文件


一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

    read:读取本地文件内容;

    write:将读取的内容通过网络发送出去。

普通文件读写

这两个操作发生了两次系统调用,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态,也就是消息发送过程中一共发生了 4 次用户态与内核态的上下文切换。另外还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的,分别是:


    DMA把数据从磁盘拷贝到内核态缓冲区;

    CPU把数据从内核态缓冲区拷贝到用户缓冲区;

    CPU把数据从用户缓冲区拷贝到内核的网络驱动的 socket 缓冲区;

    DMA把数据从网络驱动的 socket 缓冲区拷贝到网卡的缓冲区中。


mmap

系统调用函数在调用进程的虚拟地址空间中创建一个新映射。这个映射会直接把内核缓冲区里的数据映射到用户空间,这样就不用从内核空间到用户空间来回复制数据了。


应用进程调用 mmap(),DMA 把数据从磁盘拷贝到内核缓冲区里;

应用进程调用 write(),CPU直接将内核缓冲区的数据拷贝到 socket 缓冲区中;

DMA把数据从内核的 socket 缓冲区拷贝到网卡的缓冲区里。

通过上面的分析,我们可以发现,比起原始版本,mmap + write 的方式依然需要4 次用户态与内核态的上下文切换,但是少了一次内存拷贝。


FileChannel的map方法有三个参数:

    MapMode:映射模式,可取值有READ_ONLY(只读映射)、READ_WRITE(读写映射)、PRIVATE(私有映射),READ_ONLY只支持读,READ_WRITE支持读写,而PRIVATE只支持在内存中修改,不会写回磁盘;

    position和size:映射区域,可以是整个文件,也可以是文件的某一部分,单位为字节。

4.一个简单RocketMQ文件存储实现

1.简单索引文件读写

public class FileWrite {public static void main(String[] args) throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);fileChannel.close();for(int i =0;i<10;i++){mappedByteBuffer.position(i*20);ByteBufferb = ByteBuffer.allocate(20);b.putLong(100);//8byte(commitlog offset)b.putInt(1000);//4byte (msgLength)b.putLong(20);//8byte (tagCode)b.flip();mappedByteBuffer.put(b);}mappedByteBuffer.force();}}public class FileRead {public static void main(String[] args) throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);fileChannel.close();for(int i =0;i<10;i++){mappedByteBuffer.position(i*20);long commitlogOffset = mappedByteBuffer.getLong();long msgLen = mappedByteBuffer.getInt();long tagCode = mappedByteBuffer.getLong();System.out.println("文件读取:commitlogOffset:"+commitlogOffset+",msgLen:"+msgLen+",tagCode:"+tagCode);}}}

运行结果:







2.基于conuumeQueue和CommitLog的读写

public class CommitLogWriteTest {private static Long commitLogOffset = 0L;//8byte(commitlog offset)private static Long allTotalSize = 0L;public static void main(String[] args) throws IOException {List<ConsumerQueueData> list = createCommitLog();createConsumerQueue(list);}private static List<ConsumerQueueData> createCommitLog() throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);fileChannel.close();List<ConsumerQueueData> list = new ArrayList<>();Random random = new Random();int count = 0;for (int i = 0; i < 100; i++) {long commitLogOffset = allTotalSize;String topic = "Topic-test";String msgId = UUID.randomUUID().toString();String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48));//long queueOffset =i;//索引偏移量String transactionId = UUID.randomUUID().toString();/* 数据格式,位置固定int totalSize;//消息长度String msgId;String topic;long queueOffset;//索引偏移量long bodySize;//消息长度byte[] body;//消息内容String transactionId;long commitLogOffset;//从第一个文件开始算的偏移量*/int totalSize = 8 //totalSize长度+ 64//msgId长度+ 64 //topic长度+ 8 //索引偏移量长度+ 8 //消息长度长度+ msgBody.getBytes(StandardCharsets.UTF_8).length //消息内容长度+ 64//transactionId长度+ 64//commitLogOffset长度;;allTotalSize = totalSize + allTotalSize;ByteBuffer b = ByteBuffer.allocate(totalSize);// //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300mappedByteBuffer.position(Integer.valueOf(commitLogOffset+""));b.putLong(totalSize);//totalSizeb.put(getBytes(msgId, 64));//msgIdb.put(getBytes(topic, 64));//topic,定长64b.putLong(queueOffset);//索引偏移量b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySizeb.put(msgBody.getBytes(StandardCharsets.UTF_8));//bodyb.put(getBytes(transactionId, 64));b.putLong(commitLogOffset);//bodySizeb.flip();mappedByteBuffer.put(b);System.out.println("写入消息,第:" + i + "次");System.out.println("totalSize:" + totalSize);System.out.println("msgId:" + msgId);System.out.println("topic:" + topic);System.out.println("msgBody:" + msgBody);System.out.println("transactionId:" + transactionId);System.out.println("commitLogOffset:" + commitLogOffset);ConsumerQueueData consumerQueueData = new ConsumerQueueData();consumerQueueData.setOffset(commitLogOffset);consumerQueueData.setMsgLength(totalSize);consumerQueueData.setTagCode(100L);list.add(consumerQueueData);count ++;}mappedByteBuffer.force();System.out.println("commitLog数据保存完成,totalSize:" + count);return list;}private static void createConsumerQueue(List<ConsumerQueueData> list) throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);fileChannel.close();int count = 0;for (int i = 0; i < list.size(); i++) {ConsumerQueueData consumerQueueData = list.get(i);mappedByteBuffer.position(i * 20);ByteBuffer b = ByteBuffer.allocate(20);b.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)b.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)b.putLong(consumerQueueData.getTagCode());//8byte (tagCode)b.flip();mappedByteBuffer.put(b);count++;System.out.println("createConsumerQueue:" + JSON.toJSONString(consumerQueueData));}System.out.println("ConsumerQueue数据保存完成count:" + count);mappedByteBuffer.force();}//将变长字符串定长byte[],方便读取private static byte[] getBytes(String s, int length) {int fixLength = length - s.getBytes().length;if (s.getBytes().length < length) {byte[] S_bytes = new byte[length];System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);for (int x = length - fixLength; x < length; x++) {S_bytes[x] = 0x00;}return S_bytes;}return s.getBytes(StandardCharsets.UTF_8);}}

运行结果:(数据有100条,没展示全部)




public class CommitLogReadTest {static FileChannel commitLogfileChannel = null;public static void main(String[] args) throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);fileChannel.close();mappedByteBuffer.position(0);//根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中int index = 0 ;for(int i =index;i<100;i++){mappedByteBuffer.position(i*20);long commitlogOffset = mappedByteBuffer.getLong();// System.out.println(commitlogOffset);long msgLen = mappedByteBuffer.getInt();Long tag = mappedByteBuffer.getLong();//System.out.println("======读取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");//根据偏移量读取CcommitLogreadCommitLog(Integer.valueOf(commitlogOffset+""));}}public static MappedByteBuffer initFileChannel() throws IOException {MappedByteBuffer mappedByteBuffer = null;if(mappedByteBuffer == null){commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);commitLogfileChannel.close();}return mappedByteBuffer;}/*** 根据偏移量读取CcommitLog* */public static void readCommitLog(int offset) throws IOException {/*b.putLong(totalSize);//totalSizeb.put(getBytes(msgId, 64));//msgIdb.put(getBytes(topic, 64));//topic,定长64b.putLong(queueOffset);//索引偏移量b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySizeb.put(msgBody.getBytes(StandardCharsets.UTF_8));//bodyb.put(getBytes(transactionId, 64));b.putLong(commitLogOffset);//commitLogOffset*/System.out.println("=================commitlog读取偏移量为"+offset+"的消息===================");MappedByteBuffermappedByteBuffer = initFileChannel();mappedByteBuffer.position(offset);long totalSize = mappedByteBuffer.getLong();//消息长度byte[] msgIdByte = new byte[64];//uuid 固定是64mappedByteBuffer.get(msgIdByte);byte[] topicByte = new byte[64];// 固定是64mappedByteBuffer.get(topicByte);long queueOffset = mappedByteBuffer.getLong();Long bodySize = mappedByteBuffer.getLong();byte[] bodyByte = new byte[Integer.parseInt(bodySize+"")];//bodySize 长度不固定mappedByteBuffer.get(bodyByte);byte[] transactionIdByte = new byte[64];//uuid 固定是64mappedByteBuffer.get(transactionIdByte);long commitLogOffset = mappedByteBuffer.getLong();//偏移量System.out.println("totalSize:"+totalSize);System.out.println("msgId:"+new String(msgIdByte));System.out.println("topic:"+new String(topicByte));System.out.println("queueOffset:"+queueOffset);System.out.println("bodySize:"+bodySize);System.out.println("body:"+new String(bodyByte));System.out.println("transactionId:"+new String(transactionIdByte));System.out.println("commitLogOffset:"+commitLogOffset);}}

运行结果:(数据有100条,没展示全部)




总结:

文章基于Java NIO的MappedByteBuffer实现了一个简单RocketMQ存储文件CommotLog存储,ConsumeQueue索引文件的构建,以及按索引下标读取CommotLog的实现,希望能加深大家对RocketMQ文件存储的了解。

版权声明:一个RocketMQ文件存储的简单实现内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请联系 删除。本文链接:https://www.qi520.com/n/17656.html