001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.Stoppable;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.Delete;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
039import org.apache.hadoop.hbase.replication.ReplicationException;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
047 * meta table.
048 */
049@InterfaceAudience.Private
050public class ReplicationBarrierCleaner extends ScheduledChore {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
053
054  private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
055    "hbase.master.cleaner.replication.barrier.interval";
056
057  // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
058  // interval.
059  private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
060
061  private final Connection conn;
062
063  private final ReplicationPeerManager peerManager;
064
065  public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
066      ReplicationPeerManager peerManager) {
067    super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
068      DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
069    this.conn = conn;
070    this.peerManager = peerManager;
071  }
072
073  @Override
074  protected void chore() {
075    long totalRows = 0;
076    long cleanedRows = 0;
077    long deletedRows = 0;
078    long deletedBarriers = 0;
079    long deletedLastPushedSeqIds = 0;
080    TableName tableName = null;
081    List<String> peerIds = null;
082    try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
083        ResultScanner scanner = metaTable.getScanner(
084          new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
085      for (;;) {
086        Result result = scanner.next();
087        if (result == null) {
088          break;
089        }
090        totalRows++;
091        long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
092        if (barriers.length == 0) {
093          continue;
094        }
095        byte[] regionName = result.getRow();
096        TableName tn = RegionInfo.getTable(regionName);
097        if (!tn.equals(tableName)) {
098          tableName = tn;
099          peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
100        }
101        if (peerIds.isEmpty()) {
102          // no serial replication
103          // check if the region has already been removed, i.e, no catalog family
104          if (metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
105            // exists, then only keep the newest barrier
106            Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
107              HConstants.SEQNUM_QUALIFIER);
108            metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
109              cell.getTimestamp() - 1));
110            deletedBarriers += barriers.length - 1;
111          } else {
112            // not exists, delete all the barriers
113            metaTable
114              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
115            deletedBarriers += barriers.length;
116          }
117          cleanedRows++;
118          continue;
119        }
120        String encodedRegionName = RegionInfo.encodeRegionName(regionName);
121        long pushedSeqId = Long.MAX_VALUE;
122        for (String peerId : peerIds) {
123          pushedSeqId = Math.min(pushedSeqId,
124            peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
125        }
126        int index = Arrays.binarySearch(barriers, pushedSeqId);
127        if (index == -1) {
128          // beyond the first barrier, usually this should not happen but anyway let's add a check
129          // for it.
130          continue;
131        }
132        if (index < 0) {
133          index = -index - 1;
134        } else {
135          index++;
136        }
137        // A special case for merged/split region, and also deleted tables, where we are in the last
138        // closed range and the pushedSeqId is the last barrier minus 1.
139        if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
140          // check if the region has already been removed, i.e, no catalog family
141          if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
142            ReplicationQueueStorage queueStorage = peerManager.getQueueStorage();
143            for (String peerId: peerIds) {
144              queueStorage.removeLastSequenceIds(peerId, Arrays.asList(encodedRegionName));
145              deletedLastPushedSeqIds++;
146            }
147            metaTable
148              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
149            deletedRows++;
150            deletedBarriers += barriers.length;
151            continue;
152          }
153        }
154        // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
155        // SerialReplicationChecker for more details.
156        if (index - 1 > 0) {
157          List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
158            HConstants.SEQNUM_QUALIFIER);
159          // All barriers before this cell(exclusive) can be removed
160          Cell cell = cells.get(cells.size() - index);
161          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
162            cell.getTimestamp() - 1));
163          cleanedRows++;
164          deletedBarriers += index - 1;
165        }
166      }
167    } catch (ReplicationException | IOException e) {
168      LOG.warn("Failed to clean up replication barrier", e);
169    }
170    if (totalRows > 0) {
171      LOG.info(
172        "Cleanup replication barriers: totalRows {}, " +
173          "cleanedRows {}, deletedRows {}, deletedBarriers {}, deletedLastPushedSeqIds {}",
174        totalRows, cleanedRows, deletedRows, deletedBarriers, deletedLastPushedSeqIds);
175    }
176  }
177
178}