作者:京东物流 宫丙来
一、主从复制概述
RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。
CommitLog的消息复制是发生在消息写入时,当消息写完Broker Master时,会通过单独的线程,将消息写入到从服务器,在写入的时候支持同步写入、异步写入两种方式。
Broker元数据的写入,则是Broker从服务器通过单独的线程每隔10s从主Broker上获取,然后更新从的配置,并持久化到相应的配置文件中。
RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。
二、CommitLog消息复制
2.1、整体概述 CommitLog主从复制的流程如下:
1.Producer发送消息到Broker Master,Broker进行消息存储,并调用handleHA进行主从同步; 2.如果是同步复制的话,参考2.6章节的同步复制; 3.如果是异步复制的话,流程如下:
1 2 3 4 5 | 1. Broker Master启动,并在指定端口监听; 2. Broker Slave启动,主动连接Broker Master,通过Java NIO建立TCP连接; 3. Broker Slave以每隔 5s 的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地CommitLog文件中最大的偏移量,以该偏移量向服务端拉取消息 4. Broker Master 解析请求,并返回数据给Broker Slave; 5.Broker Slave收到一批消息后,将消息写入本地CommitLog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量; |
我们先看下异步复制的整体流程,最后再看下同步复制的流程,异步复制的入口为HAService.start();
1 2 3 4 5 6 7 8 9 | public void start() throws Exception { / / broker master启动,接收slave请求,并处理 this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); / / 同步复制线程启动 this.groupTransferService.start(); / / broker slave启动 this.haClient.start(); } |
下面分别对上面的每一步做详细说明。
2.2、HAService Master启动
1 2 3 4 5 6 7 8 | public void beginAccept() throws Exception { this.serverSocketChannel = ServerSocketChannel. open (); this.selector = RemotingUtil.openSelector(); this.serverSocketChannel.socket().setReuseAddress(true); this.serverSocketChannel.socket().bind(this.socketAddressListen); this.serverSocketChannel.configureBlocking(false); this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } |
在beginAccept方法中主要创建了ServerSocketChannel、Selector、设置TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册OP_ACCEPT(连接事件)。可以看到在这里是通过Java原生的NIO来实现的,并没有通过Netty框架来实现。
acceptSocketService.start()启动方法代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | while (!this.isStopped()) { try { / / 获取事件 this.selector.select( 1000 ); Set <SelectionKey> selected = this.selector.selectedKeys(); if (selected ! = null) { for (SelectionKey k : selected) { / / 处理OP_ACCEPT事件,并创建HAConnection if ((k.readyOps() & SelectionKey.OP_ACCEPT) ! = 0 ) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc ! = null) { HAConnection conn = new HAConnection(HAService.this, sc); / / 主要是启动readSocketService,writeSocketService这两个线程 conn.start(); HAService.this.addConnection(conn); } } } selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception." , e); } } |
选择器每1s处理一次处理一次连接就绪事件。连接事件就绪后,调用ServerSocketChannel的accept()方法创建SocketChannel,与服务端数据传输的通道。然后为每一个连接创建一个HAConnection对象,该HAConnection将负责Master-Slave数据同步逻辑。HAConnection.start方法如下:
1 2 3 4 | public void start() { this.readSocketService.start(); this.writeSocketService.start(); } |
2.3、HAClient启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | while (!this.isStopped()) { try { / / 和broker master建立连接,通过java nio来实现 if (this.connectMaster()) { / / 在心跳的同时,上报offset if (this.isTimeToReportOffset()) { / / 上报offset boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } this.selector.select( 1000 ); / / 处理网络读请求,也就是处理从Master传回的消息数据 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue ; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn( "HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn( "HAClient, master not response some time, so close connection" ); } } else { this.waitForRunning( 1000 * 5 ); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. " , e); this.waitForRunning( 1000 * 5 ); } } |
2.3.1、HAService主从建立连接
如果socketChannel为空,则尝试连接Master,如果Master地址为空,返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | private boolean connectMaster() throws ClosedChannelException { if (null = = socketChannel) { String addr = this.masterAddress.get(); if (addr ! = null) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress ! = null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel ! = null) { / / 注册读事件,监听broker master返回的数据 this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } / / 获取当前的offset this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp = System.currentTimeMillis(); } return this.socketChannel ! = null; } |
Broker Slave通过NIO来进行Broker Master连接,代码如下:
1 2 3 4 5 6 7 8 9 | SocketChannel sc = null; sc = SocketChannel. open (); sc.configureBlocking(true); sc.socket().setSoLinger(false, - 1 ); sc.socket().setTcpNoDelay(true); sc.socket().setReceiveBufferSize( 1024 * 64 ); sc.socket().setSendBufferSize( 1024 * 64 ); sc.socket().connect(remote, timeoutMillis); sc.configureBlocking(false); |
1 2 3 4 5 6 7 8 9 10 11 12 13 | public long getMaxPhyOffset() { return this.commitLog.getMaxOffset(); } public long getMaxOffset() { return this.mappedFileQueue.getMaxOffset(); } public long getMaxOffset() { MappedFile mappedFile = getLastMappedFile(); if (mappedFile ! = null) { return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } return 0 ; } |
可以看到最终还是通过读取MappedFile的position来获取从的offset。
2.3.2、上报offset时间判断
1 2 3 4 5 6 7 8 9 10 | private boolean isTimeToReportOffset() { / / 当前时间 - 上次写的时间 long interval = HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() .getHaSendHeartbeatInterval(); return needHeart; } |
判断逻辑为当前时间-上次写的时间>haSendHeartbeatInterval时,则进行心跳和offset的上报。haSendHeartbeatInterval默认为5s,可配置。
2.3.3、上报offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | private boolean reportSlaveMaxOffset(final long maxOffset) { this.reportOffset.position( 0 ); this.reportOffset.limit( 8 ); this.reportOffset.putLong(maxOffset); this.reportOffset.position( 0 ); this.reportOffset.limit( 8 ); / / 最多发送三次,reportOffset是否有剩余 for ( int i = 0 ; i < 3 && this.reportOffset.hasRemaining(); i + + ) { try { this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception" , e); return false; } } return !this.reportOffset.hasRemaining(); } |
主要还是通过NIO发送请求。
2.4、Broker Master处理请求
在主从建立连接时创建了HAConnection对象,该对象主要包含了如下两个重要的线程服务类:
1 2 3 4 | / / 负责写,将commitlog数据发送到从 private WriteSocketService writeSocketService; / / 负责读,读取从上报的offset,并根据offset从Broker Master读取commitlog private ReadSocketService readSocketService; |
2.4.1、ReadSocketService接收读请求
readSocketService.run方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | while (!this.isStopped()) { try { this.selector.select( 1000 ); / / 处理读事件 boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error( "processReadEvent error" ); break ; } long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn( "ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); break ; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception." , e); break ; } } |
processReadEvent的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0 ) { readSizeZeroTimes = 0 ; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPostion) > = 8 ) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8 ); / / 获取slave 请求的offset long readOffset = this.byteBufferRead.getLong(pos - 8 ); this.processPostion = pos; HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0 ) { HAConnection.this.slaveRequestOffset = readOffset; log.info( "slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } / / 如果是同步复制的话,判断请求的offset是否push2SlaveMaxOffset相同,相同的话则唤醒master GroupTransferService HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } |
可以看到processReadEvent逻辑很简单,就是从ByteBuffer中解析出offset,然后设置HAConnection.this.slaveRequestOffset;
2.4.2、WriteSocketService进行写处理
Broker Master通过HAConnection.WriteSocketService进行CommitLog的读取,run方法主逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | this.selector.select( 1000 ); / / nextTransferFromWhere下次传输commitLog的起始位置 if ( - 1 = = this.nextTransferFromWhere) { if ( 0 = = HAConnection.this.slaveRequestOffset) { long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getMapedFileSizeCommitLog()); if (masterOffset < 0 ) { masterOffset = 0 ; } this.nextTransferFromWhere = masterOffset; } else { this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info( "master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset); } / / 获取commitLog数据 SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); / / 获取commitLog数据 SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult ! = null) { int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere + = size; selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult; / / Build Header this.byteBufferHeader.position( 0 ); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); / / nio发送commitlog this.lastWriteOver = this.transferData(); } else { / / 如果没有获取到commitLog数据,等待 100ms HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning( 1 |
这里面主要包括获取CommitLog数据、发送CommitLog数据这两个步骤。
2.4.2.1、获取CommitLog数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile ! = null) { int pos = ( int ) (offset % mappedFileSize); SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } return null; } public SelectMappedBufferResult selectMappedBuffer( int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos > = 0 ) { if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer. slice (); byteBuffer.position(pos); int size = readPosition - pos; ByteBuffer byteBufferNew = byteBuffer. slice (); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } return null; } |
可以看到最终还是根据offset从MappedFile读取数据。
2.4.2.2、发送CommitLog数据
数据主要包括header、body两部分,数据发送的话还是通过NIO来实现,主要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | / / Build Header this.byteBufferHeader.position( 0 ); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); int writeSize = this.socketChannel.write(this.byteBufferHeader); / / Write Body if (!this.byteBufferHeader.hasRemaining()) { while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); if (writeSize > 0 ) { writeSizeZeroTimes = 0 ; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize = = 0 ) { if ( + + writeSizeZeroTimes > = 3 ) { break ; } } else { throw new Exception( "ha master write body error < 0" ); } } } |
CommitLog主从发送完成后,Broker Slave则会监听读事件、获取CommitLog数据,并进行CommitLog的写入。
2.5、HAClient processReadEvent
在主从建立连接后,从注册了可读事件,目的就是读取从Broker Master返回的CommitLog数据,对应的方法为HAClient.processReadEvent:
1 2 3 4 5 6 7 8 9 10 | int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0 ) { lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); readSizeZeroTimes = 0 ; boolean result = this.dispatchReadRequest(); if (!result) { log.error( "HAClient, dispatchReadRequest error" ); return false; } } |
dispatchReadRequest方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | / / 读取返回的body data byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPostion + = msgHeaderSize + bodySize; / / 上报从的offset if (!reportSlaveMaxOffsetPlus()) { return false; |
里面的核心逻辑主要包括如下三个步骤:
<!---->
1 2 3 4 5 6 7 | public boolean appendToCommitLog( long startOffset, byte[] data) { / / 将数据写到commitlog,同普通消息的存储 boolean result = this.commitLog.appendData(startOffset, data); / / 唤醒reputMessageService,构建consumeQueue,index this.reputMessageService.wakeup(); return result; } |
2.6、同步复制
上面主要介绍了Broker的异步复制,下面再来看下Broker的同步复制的实现。同步复制的整体流程图如下:
大概说明如下:
producer发送消息到broker,broker进行消息的存储,将消息写入到commitLog;
broker master写消息线程唤醒WriteSocketService线程,查询commitLog数据,然后发送到从。在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,会等待100ms。所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。
broker master创建GroupCommitRequest,同步等待主从复制完成;
从接受新的commitLog数据,然后写commitLog数据,并返回新的slave offset到主;
主更新push2SlaveMaxOffset,并判断push2SlaveMaxOffset是否大于等于主从复制请求的offset,如果大于等于的话,则认为主从复制完成,返回commitLog.handleHA方法成功,从而返回消息保存成功。
对应的代码入口为CommitLog.handleHA方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { / / 如果是broker主,并且是同步复制的话 if (BrokerRole.SYNC_MASTER = = this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { / / 获取HAService HAService service = this.defaultMessageStore.getHaService(); / / 获取Message上的MessageConst.PROPERTY_WAIT_STORE_MSG_OK,默认是需要等待主从复制完成 if (messageExt.isWaitStoreMsgOK()) { / * * * 判断从是否可用,判断的逻辑是:(主offset - push2SlaveMaxOffset< 1024 * 1024 * 256 ),也就是如果主从的offset差的太多, * 则认为从不可用, Tell the producer, slave not available * 这里的result = mappedFile.appendMessage(msg, this.appendMessageCallback); * / if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { / / 组装GroupCommitRequest,nextOffset = result.getWroteOffset() + result.getWroteBytes(),这里的nextOffset指的就是从要写到的offset GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); / * * * 调用的是this.groupTransferService.putRequest(request);将request放到requestsWrite list 中。 * HAService持有GroupTransferService groupTransferService引用; * / service.putRequest(request); / * * * 唤醒的是WriteSocketService,查询commitLog数据,然后发送到从。 * 在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,等待 100ms * HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning( 100 ); * 所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。 * / service.getWaitNotifyObject().wakeupAll(); / / 等待同步复制完成,判断逻辑是: HAService.this.push2SlaveMaxOffset.get() > = req.getNextOffset(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); / / 如果同步复制失败的话,设置putMessageResult中的状态为同步从超时 if (!flushOK) { log.error( "do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } / / Slave problem else { / / Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } |
2.6.1、GroupTransferService启动
在HAService启动的时候,启动了GroupTransferService线程,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public void run() { while (!this.isStopped()) { this.waitForRunning( 10 ); this.doWaitTransfer(); } } private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { / * * * req.getNextOffset:result.getWroteOffset() + result.getWroteBytes() * push2SlaveMaxOffset: * / boolean transferOK = HAService.this.push2SlaveMaxOffset.get() > = req.getNextOffset(); / / 在这循环 5 次,最多等待 5s ,因为slave 心跳间隔默认 5s for ( int i = 0 ; !transferOK && i < 5 ; i + + ) { this.notifyTransferObject.waitForRunning( 1000 ); transferOK = HAService.this.push2SlaveMaxOffset.get() > = req.getNextOffset(); } if (!transferOK) { log.warn( "transfer messsage to slave timeout, " + req.getNextOffset()); } / / 主从复制完成,唤醒handleHA后续操作 req.wakeupCustomer(transferOK); } this.requestsRead.clear(); } } } |
wakeupCustomer:
1 2 3 4 | public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } |
2.6.2、唤醒WriteSocketService
service.getWaitNotifyObject().wakeupAll();
唤醒的是WriteSocketService,查询commitLog数据,然后发送到从。在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,等待100ms。HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。
2.6.3、同步等待,直到复制完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); public boolean waitForFlush( long timeout) { try { / / 等待同步复制完成 this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { log.error( "Interrupted" , e); return false; } } } |
三、元数据的复制
broker元数据的复制,主要包括topicConfig、consumerOffset、delayOffset、subscriptionGroup这几部分,整体流程图如下:
从broker通过单独的线程,每隔10s进行一次元数据的复制 ,代码入口为:BrokerController.start -> SlaveSynchronize.syncAll:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { / / 10s 进行一次主从同步 BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error( "ScheduledTask SlaveSynchronize syncAll error." , e); } } }, 1000 * 3 , 1000 * 10 , TimeUnit.MILLISECONDS); public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); this.syncDelayOffset(); this.syncSubscriptionGroupConfig(); } |
3.1、syncTopicConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 | / / 从Master获取TopicConfig信息,最终调用的是AdminBrokerProcessor.getAllTopicConfig TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); if (!this.brokerController.getTopicConfigManager().getDataVersion() .equals(topicWrapper.getDataVersion())) { this.brokerController.getTopicConfigManager().getDataVersion() .assignNewOne(topicWrapper.getDataVersion()); this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); this.brokerController.getTopicConfigManager().getTopicConfigTable() .putAll(topicWrapper.getTopicConfigTable()); / / 将topicConfig进行持久化,对应的文件为topics.json this.brokerController.getTopicConfigManager().persist(); log.info( "Update slave topic config from master, {}" , masterAddrBak) |
3.2、syncConsumerOffset
1 2 3 4 5 6 7 8 | / / 从 "主Broker" 获取ConsumerOffset ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); / / 设置从的offsetTable this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); / / 并持久化到从的consumerOffset.json文件中 this.brokerController.getConsumerOffsetManager().persist(); |
3.3、syncDelayOffset
1 2 3 4 | String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController .getMessageStoreConfig().getStorePathRootDir()); MixAll.string2File(delayOffset, fileName); |
3.4、syncSubscriptionGroupConfig
1 2 3 4 5 6 | SubscriptionGroupWrapper subscriptionWrapper = this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak); SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); subscriptionGroupManager.getSubscriptionGroupTable().clear(); subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable()); subscriptionGroupManager.persist(); |
四、思考与收获
通过上面的分享,我们基本上了解了RocketMQ的主从复制原理,其中有些思想我们可以后续借鉴下:
在功能设计的时候将元数据、程序数据分开管理;
主从复制的时候,基本思想都是从请求主,请求时带上offset,然后主查询数据返回从,从再执行;mysql的主从复制、redis的主从复制基本也是这样;
主从复制包括异步复制、同步复制两种方式,可以通过配置来决定使用哪种同步方式,这个需要根据实际业务场景来决定;
主从复制线程尽量和消息写线程或者主线程分开;
由于时间、精力有限,难免会有纰漏、考虑不到之处,如有问题欢迎沟通、交流。