002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.Stoppable;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.Delete;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
039import org.apache.hadoop.hbase.replication.ReplicationException;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
046 * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
047 * meta table.
048 */
050public class ReplicationBarrierCleaner extends ScheduledChore {
051  private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
053  private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
054    "hbase.master.cleaner.replication.barrier.interval";
056  // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
057  // interval.
058  private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
060  private final Connection conn;
062  private final ReplicationPeerManager peerManager;
064  public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
065    ReplicationPeerManager peerManager) {
066    super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
068    this.conn = conn;
069    this.peerManager = peerManager;
070  }
072  @Override
073  // Public so can be run out of MasterRpcServices. Synchronized so only one
074  // running instance at a time.
075  public synchronized void chore() {
076    long totalRows = 0;
077    long cleanedRows = 0;
078    long deletedRows = 0;
079    long deletedBarriers = 0;
080    long deletedLastPushedSeqIds = 0;
081    TableName tableName = null;
082    List<String> peerIds = null;
083    try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
084      ResultScanner scanner = metaTable.getScanner(
085        new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
086      for (;;) {
087        Result result = scanner.next();
088        if (result == null) {
089          break;
090        }
091        totalRows++;
092        long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
093        if (barriers.length == 0) {
094          continue;
095        }
096        byte[] regionName = result.getRow();
097        TableName tn = RegionInfo.getTable(regionName);
098        if (!tn.equals(tableName)) {
099          tableName = tn;
100          peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
101        }
102        if (peerIds.isEmpty()) {
103          // no serial replication
104          // check if the region has already been removed, i.e, no catalog family
105          if (metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
106            // exists, then only keep the newest barrier
107            Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
108              HConstants.SEQNUM_QUALIFIER);
109            metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
110              cell.getTimestamp() - 1));
111            deletedBarriers += barriers.length - 1;
112          } else {
113            // not exists, delete all the barriers
114            metaTable
115              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
116            deletedBarriers += barriers.length;
117          }
118          cleanedRows++;
119          continue;
120        }
121        String encodedRegionName = RegionInfo.encodeRegionName(regionName);
122        long pushedSeqId = Long.MAX_VALUE;
123        for (String peerId : peerIds) {
124          pushedSeqId = Math.min(pushedSeqId,
125            peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
126        }
127        int index = Arrays.binarySearch(barriers, pushedSeqId);
128        if (index == -1) {
129          // beyond the first barrier, usually this should not happen but anyway let's add a check
130          // for it.
131          continue;
132        }
133        if (index < 0) {
134          index = -index - 1;
135        } else {
136          index++;
137        }
138        // A special case for merged/split region, and also deleted tables, where we are in the last
139        // closed range and the pushedSeqId is the last barrier minus 1.
140        if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
141          // check if the region has already been removed, i.e, no catalog family
142          if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
143            ReplicationQueueStorage queueStorage = peerManager.getQueueStorage();
144            for (String peerId : peerIds) {
145              queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName));
146              deletedLastPushedSeqIds++;
147            }
148            metaTable
149              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
150            deletedRows++;
151            deletedBarriers += barriers.length;
152            continue;
153          }
154        }
155        // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
156        // SerialReplicationChecker for more details.
157        if (index - 1 > 0) {
158          List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
159            HConstants.SEQNUM_QUALIFIER);
160          // All barriers before this cell(exclusive) can be removed
161          Cell cell = cells.get(cells.size() - index);
162          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
163            cell.getTimestamp() - 1));
164          cleanedRows++;
165          deletedBarriers += index - 1;
166        }
167      }
168    } catch (ReplicationException | IOException e) {
169      LOG.warn("Failed to clean up replication barrier", e);
170    }
171    if (totalRows > 0) {
172      LOG.info(
173        "TotalRows={}, cleanedRows={}, deletedRows={}, deletedBarriers={}, "
174          + "deletedLastPushedSeqIds={}",
175        totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds);
176    }
177  }