001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.Stoppable; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.Delete; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 038import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 039import org.apache.hadoop.hbase.replication.ReplicationException; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in 047 * meta table. 048 */ 049@InterfaceAudience.Private 050public class ReplicationBarrierCleaner extends ScheduledChore { 051 private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class); 052 053 private static final String REPLICATION_BARRIER_CLEANER_INTERVAL = 054 "hbase.master.cleaner.replication.barrier.interval"; 055 056 // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large 057 // interval. 058 private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000; 059 060 private final Connection conn; 061 062 private final ReplicationPeerManager peerManager; 063 064 public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn, 065 ReplicationPeerManager peerManager) { 066 super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL, 067 DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL)); 068 this.conn = conn; 069 this.peerManager = peerManager; 070 } 071 072 @Override 073 // Public so can be run out of MasterRpcServices. Synchronized so only one 074 // running instance at a time. 075 public synchronized void chore() { 076 long totalRows = 0; 077 long cleanedRows = 0; 078 long deletedRows = 0; 079 long deletedBarriers = 0; 080 long deletedLastPushedSeqIds = 0; 081 TableName tableName = null; 082 List<String> peerIds = null; 083 try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME); 084 ResultScanner scanner = metaTable.getScanner( 085 new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) { 086 for (;;) { 087 Result result = scanner.next(); 088 if (result == null) { 089 break; 090 } 091 totalRows++; 092 long[] barriers = ReplicationBarrierFamilyFormat.getReplicationBarriers(result); 093 if (barriers.length == 0) { 094 continue; 095 } 096 byte[] regionName = result.getRow(); 097 TableName tn = RegionInfo.getTable(regionName); 098 if (!tn.equals(tableName)) { 099 tableName = tn; 100 peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName); 101 } 102 if (peerIds.isEmpty()) { 103 // no serial replication 104 // check if the region has already been removed, i.e, no catalog family 105 if (metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { 106 // exists, then only keep the newest barrier 107 Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY, 108 HConstants.SEQNUM_QUALIFIER); 109 metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, 110 cell.getTimestamp() - 1)); 111 deletedBarriers += barriers.length - 1; 112 } else { 113 // not exists, delete all the barriers 114 metaTable 115 .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); 116 deletedBarriers += barriers.length; 117 } 118 cleanedRows++; 119 continue; 120 } 121 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 122 long pushedSeqId = Long.MAX_VALUE; 123 for (String peerId : peerIds) { 124 pushedSeqId = Math.min(pushedSeqId, 125 peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId)); 126 } 127 int index = Arrays.binarySearch(barriers, pushedSeqId); 128 if (index == -1) { 129 // beyond the first barrier, usually this should not happen but anyway let's add a check 130 // for it. 131 continue; 132 } 133 if (index < 0) { 134 index = -index - 1; 135 } else { 136 index++; 137 } 138 // A special case for merged/split region, and also deleted tables, where we are in the last 139 // closed range and the pushedSeqId is the last barrier minus 1. 140 if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) { 141 // check if the region has already been removed, i.e, no catalog family 142 if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { 143 ReplicationQueueStorage queueStorage = peerManager.getQueueStorage(); 144 for (String peerId : peerIds) { 145 queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName)); 146 deletedLastPushedSeqIds++; 147 } 148 metaTable 149 .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); 150 deletedRows++; 151 deletedBarriers += barriers.length; 152 continue; 153 } 154 } 155 // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in 156 // SerialReplicationChecker for more details. 157 if (index - 1 > 0) { 158 List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, 159 HConstants.SEQNUM_QUALIFIER); 160 // All barriers before this cell(exclusive) can be removed 161 Cell cell = cells.get(cells.size() - index); 162 metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, 163 cell.getTimestamp() - 1)); 164 cleanedRows++; 165 deletedBarriers += index - 1; 166 } 167 } 168 } catch (ReplicationException | IOException e) { 169 LOG.warn("Failed to clean up replication barrier", e); 170 } 171 if (totalRows > 0) { 172 LOG.info( 173 "TotalRows={}, cleanedRows={}, deletedRows={}, deletedBarriers={}, " 174 + "deletedLastPushedSeqIds={}", 175 totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds); 176 } 177 } 178}