001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.MetaTableAccessor; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.Stoppable; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.Delete; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 039import org.apache.hadoop.hbase.replication.ReplicationException; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in 047 * meta table. 048 */ 049@InterfaceAudience.Private 050public class ReplicationBarrierCleaner extends ScheduledChore { 051 052 private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class); 053 054 private static final String REPLICATION_BARRIER_CLEANER_INTERVAL = 055 "hbase.master.cleaner.replication.barrier.interval"; 056 057 // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large 058 // interval. 059 private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000; 060 061 private final Connection conn; 062 063 private final ReplicationPeerManager peerManager; 064 065 public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn, 066 ReplicationPeerManager peerManager) { 067 super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL, 068 DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL)); 069 this.conn = conn; 070 this.peerManager = peerManager; 071 } 072 073 @Override 074 protected void chore() { 075 long totalRows = 0; 076 long cleanedRows = 0; 077 long deletedRows = 0; 078 long deletedBarriers = 0; 079 long deletedLastPushedSeqIds = 0; 080 TableName tableName = null; 081 List<String> peerIds = null; 082 try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME); 083 ResultScanner scanner = metaTable.getScanner( 084 new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) { 085 for (;;) { 086 Result result = scanner.next(); 087 if (result == null) { 088 break; 089 } 090 totalRows++; 091 long[] barriers = MetaTableAccessor.getReplicationBarriers(result); 092 if (barriers.length == 0) { 093 continue; 094 } 095 byte[] regionName = result.getRow(); 096 TableName tn = RegionInfo.getTable(regionName); 097 if (!tn.equals(tableName)) { 098 tableName = tn; 099 peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName); 100 } 101 if (peerIds.isEmpty()) { 102 // no serial replication 103 // check if the region has already been removed, i.e, no catalog family 104 if (metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { 105 // exists, then only keep the newest barrier 106 Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY, 107 HConstants.SEQNUM_QUALIFIER); 108 metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, 109 cell.getTimestamp() - 1)); 110 deletedBarriers += barriers.length - 1; 111 } else { 112 // not exists, delete all the barriers 113 metaTable 114 .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); 115 deletedBarriers += barriers.length; 116 } 117 cleanedRows++; 118 continue; 119 } 120 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 121 long pushedSeqId = Long.MAX_VALUE; 122 for (String peerId : peerIds) { 123 pushedSeqId = Math.min(pushedSeqId, 124 peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId)); 125 } 126 int index = Arrays.binarySearch(barriers, pushedSeqId); 127 if (index == -1) { 128 // beyond the first barrier, usually this should not happen but anyway let's add a check 129 // for it. 130 continue; 131 } 132 if (index < 0) { 133 index = -index - 1; 134 } else { 135 index++; 136 } 137 // A special case for merged/split region, and also deleted tables, where we are in the last 138 // closed range and the pushedSeqId is the last barrier minus 1. 139 if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) { 140 // check if the region has already been removed, i.e, no catalog family 141 if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { 142 ReplicationQueueStorage queueStorage = peerManager.getQueueStorage(); 143 for (String peerId: peerIds) { 144 queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName)); 145 deletedLastPushedSeqIds++; 146 } 147 metaTable 148 .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); 149 deletedRows++; 150 deletedBarriers += barriers.length; 151 continue; 152 } 153 } 154 // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in 155 // SerialReplicationChecker for more details. 156 if (index - 1 > 0) { 157 List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, 158 HConstants.SEQNUM_QUALIFIER); 159 // All barriers before this cell(exclusive) can be removed 160 Cell cell = cells.get(cells.size() - index); 161 metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, 162 cell.getTimestamp() - 1)); 163 cleanedRows++; 164 deletedBarriers += index - 1; 165 } 166 } 167 } catch (ReplicationException | IOException e) { 168 LOG.warn("Failed to clean up replication barrier", e); 169 } 170 if (totalRows > 0) { 171 LOG.info( 172 "Cleanup replication barriers: totalRows {}, " + 173 "cleanedRows {}, deletedRows {}, deletedBarriers {}, deletedLastPushedSeqIds {}", 174 totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds); 175 } 176 } 177 178}