001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.concurrent.TimeUnit;
023import org.apache.commons.lang3.mutable.MutableLong;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.MetaTableAccessor;
029import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.master.RegionState;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
042import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
043import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
044import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
045
046/**
047 * <p>
048 * Helper class to determine whether we can push a given WAL entry without breaking the replication
049 * order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe.
050 * </p>
051 * <p>
052 * We record all the open sequence number for a region in a special family in meta, which is called
053 * 'rep_barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call
054 * [bn, bn+1) a range, and it is obvious that a region will always be on the same RS within a
055 * range.
056 * <p>
057 * When split and merge, we will also record the parent for the generated region(s) in the special
058 * family in meta. And also, we will write an extra 'open sequence number' for the parent
059 * region(s), which is the max sequence id of the region plus one.
060 * </p>
061 * </p>
062 * <p>
063 * For each peer, we record the last pushed sequence id for each region. It is managed by the
064 * replication storage.
065 * </p>
066 * <p>
067 * The algorithm works like this:
068 * <ol>
069 * <li>Locate the sequence id we want to push in the barriers</li>
070 * <li>If it is before the first barrier, we are safe to push. This usually because we enable serial
071 * replication for this table after we create the table and write data into the table.</li>
072 * <li>In general, if the previous range is finished, then we are safe to push. The way to determine
073 * whether a range is finish is straight-forward: check whether the last pushed sequence id is equal
074 * to the end barrier of the range minus 1. There are several exceptions:
075 * <ul>
076 * <li>If it is in the first range, we need to check whether there are parent regions. If so, we
077 * need to make sure that the data for parent regions have all been pushed.</li>
078 * <li>If it is in the last range, we need to check the region state. If state is OPENING, then we
079 * are not safe to push. This is because that, before we call reportRIT to master which update the
080 * open sequence number into meta table, we will write a open region event marker to WAL first, and
081 * its sequence id is greater than the newest open sequence number(which has not been updated to
082 * meta table yet so we do not know). For this scenario, the WAL entry for this open region event
083 * marker actually belongs to the range after the 'last' range, so we are not safe to push it.
084 * Otherwise the last pushed sequence id will be updated to this value and then we think the
085 * previous range has already been finished, but this is not true.</li>
086 * <li>Notice that the above two exceptions are not conflicts, since the first range can also be the
087 * last range if we only have one range.</li>
088 * </ul>
089 * </li>
090 * </ol>
091 * </p>
092 * <p>
093 * And for performance reason, we do not want to check meta for every WAL entry, so we introduce two
094 * in memory maps. The idea is simple:
095 * <ul>
096 * <li>If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.</li>
097 * <li>Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If
098 * the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe
099 * to push.</li>
100 * </ul>
101 * And for the last range, we do not have an end barrier, so we use the continuity of sequence id to
102 * determine whether we can push. The rule is:
103 * <ul>
104 * <li>When an entry is able to push, then put its sequence id into the {@code pushed} map.</li>
105 * <li>Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus
106 * one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of
107 * the WAL entry.</li>
108 * </ul>
109 * </p>
110 */
111@InterfaceAudience.Private
112class SerialReplicationChecker {
113
114  private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class);
115
116  public static final String REPLICATION_SERIALLY_WAITING_KEY =
117    "hbase.serial.replication.waiting.ms";
118  public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
119
120  private final String peerId;
121
122  private final ReplicationQueueStorage storage;
123
124  private final Connection conn;
125
126  private final long waitTimeMs;
127
128  private final LoadingCache<String, MutableLong> pushed = CacheBuilder.newBuilder()
129    .expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, MutableLong>() {
130
131      @Override
132      public MutableLong load(String key) throws Exception {
133        return new MutableLong(HConstants.NO_SEQNUM);
134      }
135    });
136
137  // Use guava cache to set ttl for each key
138  private final Cache<String, Long> canPushUnder =
139    CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
140
141  public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
142    this.peerId = source.getPeerId();
143    this.storage = source.getQueueStorage();
144    this.conn = source.getServer().getConnection();
145    this.waitTimeMs =
146      conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
147  }
148
149  private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
150    long pushedSeqId;
151    try {
152      pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
153    } catch (ReplicationException e) {
154      throw new IOException(
155        "Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
156    }
157    // endBarrier is the open sequence number. When opening a region, the open sequence number will
158    // be set to the old max sequence id plus one, so here we need to minus one.
159    return pushedSeqId >= endBarrier - 1;
160  }
161
162  private boolean isParentFinished(byte[] regionName) throws IOException {
163    long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, regionName);
164    if (barriers.length == 0) {
165      return true;
166    }
167    return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName));
168  }
169
170  // We may write a open region marker to WAL before we write the open sequence number to meta, so
171  // if a region is in OPENING state and we are in the last range, it is not safe to say we can push
172  // even if the previous range is finished.
173  private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) {
174    return index == barrierResult.getBarriers().length &&
175      barrierResult.getState() == RegionState.State.OPENING;
176  }
177
178  private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) {
179    if (barriers.length > index) {
180      canPushUnder.put(encodedNameAsString, barriers[index]);
181    }
182    pushed.getUnchecked(encodedNameAsString).setValue(seqId);
183  }
184
185  private boolean canPush(Entry entry, byte[] row) throws IOException {
186    String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
187    long seqId = entry.getKey().getSequenceId();
188    ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
189      entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
190    LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
191    long[] barriers = barrierResult.getBarriers();
192    int index = Arrays.binarySearch(barriers, seqId);
193    if (index == -1) {
194      LOG.debug("{} is before the first barrier, pass", entry);
195      // This means we are in the range before the first record openSeqNum, this usually because the
196      // wal is written before we enable serial replication for this table, just return true since
197      // we can not guarantee the order.
198      pushed.getUnchecked(encodedNameAsString).setValue(seqId);
199      return true;
200    }
201    // The sequence id range is left closed and right open, so either we decrease the missed insert
202    // point to make the index start from 0, or increase the hit insert point to make the index
203    // start from 1. Here we choose the latter one.
204    if (index < 0) {
205      index = -index - 1;
206    } else {
207      index++;
208    }
209    if (index == 1) {
210      // we are in the first range, check whether we have parents
211      for (byte[] regionName : barrierResult.getParentRegionNames()) {
212        if (!isParentFinished(regionName)) {
213          LOG.debug("Parent {} has not been finished yet for entry {}, give up",
214            Bytes.toStringBinary(regionName), entry);
215          return false;
216        }
217      }
218      if (isLastRangeAndOpening(barrierResult, index)) {
219        LOG.debug("{} is in the last range and the region is opening, give up", entry);
220        return false;
221      }
222      LOG.debug("{} is in the first range, pass", entry);
223      recordCanPush(encodedNameAsString, seqId, barriers, 1);
224      return true;
225    }
226    // check whether the previous range is finished
227    if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
228      LOG.debug("Previous range for {} has not been finished yet, give up", entry);
229      return false;
230    }
231    if (isLastRangeAndOpening(barrierResult, index)) {
232      LOG.debug("{} is in the last range and the region is opening, give up", entry);
233      return false;
234    }
235    LOG.debug("The previous range for {} has been finished, pass", entry);
236    recordCanPush(encodedNameAsString, seqId, barriers, index);
237    return true;
238  }
239
240  public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
241    String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
242    long seqId = entry.getKey().getSequenceId();
243    Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
244    if (canReplicateUnderSeqId != null) {
245      if (seqId < canReplicateUnderSeqId.longValue()) {
246        LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
247        return true;
248      }
249      LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
250        canReplicateUnderSeqId);
251      // we are already beyond the last safe point, remove
252      canPushUnder.invalidate(encodedNameAsString);
253    }
254    // This is for the case where the region is currently opened on us, if the sequence id is
255    // continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
256    // has been moved to another RS and then back, so we need to check the barrier.
257    MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
258    if (seqId == previousPushedSeqId.longValue() + 1) {
259      LOG.trace("The sequence id for {} is continuous, pass", entry);
260      previousPushedSeqId.increment();
261      return true;
262    }
263    return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
264  }
265
266  public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
267      throws IOException, InterruptedException {
268    byte[] row = CellUtil.cloneRow(firstCellInEdit);
269    while (!canPush(entry, row)) {
270      LOG.debug("Can not push {}, wait", entry);
271      Thread.sleep(waitTimeMs);
272    }
273  }
274}