001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.Stoppable;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.Delete;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
039import org.apache.hadoop.hbase.replication.ReplicationException;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
047 * meta table.
048 */
049@InterfaceAudience.Private
050public class ReplicationBarrierCleaner extends ScheduledChore {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
053
054  private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
055    "hbase.master.cleaner.replication.barrier.interval";
056
057  // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
058  // interval.
059  private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
060
061  private final Connection conn;
062
063  private final ReplicationPeerManager peerManager;
064
065  public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
066      ReplicationPeerManager peerManager) {
067    super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
068      DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
069    this.conn = conn;
070    this.peerManager = peerManager;
071  }
072
073  @Override
074  protected void chore() {
075    long totalRows = 0;
076    long cleanedRows = 0;
077    long deletedRows = 0;
078    long deletedBarriers = 0;
079    long deletedLastPushedSeqIds = 0;
080    TableName tableName = null;
081    List<String> peerIds = null;
082    try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
083        ResultScanner scanner = metaTable.getScanner(
084          new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
085      for (;;) {
086        Result result = scanner.next();
087        if (result == null) {
088          break;
089        }
090        totalRows++;
091        long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
092        if (barriers.length == 0) {
093          continue;
094        }
095        byte[] regionName = result.getRow();
096        TableName tn = RegionInfo.getTable(regionName);
097        if (!tn.equals(tableName)) {
098          tableName = tn;
099          peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
100        }
101        if (peerIds.isEmpty()) {
102          // no serial replication, only keep the newest barrier
103          Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
104            HConstants.SEQNUM_QUALIFIER);
105          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
106            cell.getTimestamp() - 1));
107          cleanedRows++;
108          deletedBarriers += barriers.length - 1;
109          continue;
110        }
111        String encodedRegionName = RegionInfo.encodeRegionName(regionName);
112        long pushedSeqId = Long.MAX_VALUE;
113        for (String peerId : peerIds) {
114          pushedSeqId = Math.min(pushedSeqId,
115            peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
116        }
117        int index = Arrays.binarySearch(barriers, pushedSeqId);
118        if (index == -1) {
119          // beyond the first barrier, usually this should not happen but anyway let's add a check
120          // for it.
121          continue;
122        }
123        if (index < 0) {
124          index = -index - 1;
125        } else {
126          index++;
127        }
128        // A special case for merged/split region, and also deleted tables, where we are in the last
129        // closed range and the pushedSeqId is the last barrier minus 1.
130        if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
131          // check if the region has already been removed, i.e, no catalog family
132          if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
133            ReplicationQueueStorage queueStorage = peerManager.getQueueStorage();
134            for (String peerId: peerIds) {
135              queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName));
136              deletedLastPushedSeqIds++;
137            }
138            metaTable
139              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
140            deletedRows++;
141            deletedBarriers += barriers.length;
142            continue;
143          }
144        }
145        // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
146        // SerialReplicationChecker for more details.
147        if (index - 1 > 0) {
148          List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
149            HConstants.SEQNUM_QUALIFIER);
150          // All barriers before this cell(exclusive) can be removed
151          Cell cell = cells.get(cells.size() - index);
152          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
153            cell.getTimestamp() - 1));
154          cleanedRows++;
155          deletedBarriers += index - 1;
156        }
157      }
158    } catch (ReplicationException | IOException e) {
159      LOG.warn("Failed to clean up replication barrier", e);
160    }
161    if (totalRows > 0) {
162      LOG.info(
163        "Cleanup replication barriers: totalRows {}, " +
164          "cleanedRows {}, deletedRows {}, deletedBarriers {}, deletedLastPushedSeqIds {}",
165        totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds);
166    }
167  }
168
169}