001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.replication.WALEntryFilter;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.wal.WAL.Entry;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * WAL reader for a serial replication peer.
034 */
035@InterfaceAudience.Private
036public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
037
038  private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationSourceWALReader.class);
039
040  // used to store the first cell in an entry before filtering. This is because that if serial
041  // replication is enabled, we may find out that an entry can not be pushed after filtering. And
042  // when we try the next time, the cells maybe null since the entry has already been filtered,
043  // especially for region event wal entries. And this can also used to determine whether we can
044  // skip filtering.
045  private Cell firstCellInEntryBeforeFiltering;
046
047  private final SerialReplicationChecker checker;
048
049  public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
050    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
051    ReplicationSource source, String walGroupId) {
052    super(fs, conf, logQueue, startPosition, filter, source, walGroupId);
053    checker = new SerialReplicationChecker(conf, source);
054  }
055
056  @Override
057  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
058    throws InterruptedException {
059    Path currentPath = entryStream.getCurrentPath();
060    long positionBefore = entryStream.getPosition();
061    for (;;) {
062      Entry entry = entryStream.peek();
063      boolean doFiltering = true;
064      if (firstCellInEntryBeforeFiltering == null) {
065        assert !entry.getEdit().isEmpty() : "should not write empty edits";
066        // Used to locate the region record in meta table. In WAL we only have the table name and
067        // encoded region name which can not be mapping to region name without scanning all the
068        // records for a table, so we need a start key, just like what we have done at client side
069        // when locating a region. For the markers, we will use the start key of the region as the
070        // row key for the edit. And we need to do this before filtering since all the cells may
071        // be filtered out, especially that for the markers.
072        firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
073      } else {
074        // if this is not null then we know that the entry has already been filtered.
075        doFiltering = false;
076      }
077
078      if (doFiltering) {
079        entry = filterEntry(entry);
080      }
081      if (entry != null) {
082        int sleepMultiplier = 1;
083        try {
084          if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
085            if (batch.getLastWalPosition() > positionBefore) {
086              // we have something that can push, break
087              break;
088            } else {
089              checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
090            }
091          }
092        } catch (IOException e) {
093          LOG.warn("failed to check whether we can push the WAL entries", e);
094          if (batch.getLastWalPosition() > positionBefore) {
095            // we have something that can push, break
096            break;
097          }
098          sleepMultiplier = sleep(sleepMultiplier);
099        }
100        // arrive here means we can push the entry, record the last sequence id
101        batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
102          entry.getKey().getSequenceId());
103        // actually remove the entry.
104        removeEntryFromStream(entryStream, batch);
105        if (addEntryToBatch(batch, entry)) {
106          break;
107        }
108      } else {
109        // actually remove the entry.
110        removeEntryFromStream(entryStream, batch);
111      }
112      WALEntryStream.HasNext hasNext = entryStream.hasNext();
113      // always return if we have switched to a new file.
114      if (switched(entryStream, currentPath)) {
115        batch.setEndOfFile(true);
116        break;
117      }
118      if (hasNext != WALEntryStream.HasNext.YES) {
119        // For hasNext other than YES, it is OK to just retry.
120        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
121        // return NO again when you call the method next time, so it is OK to just return here and
122        // let the loop in the upper layer to call hasNext again.
123        break;
124      }
125    }
126  }
127
128  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) {
129    entryStream.next();
130    firstCellInEntryBeforeFiltering = null;
131    batch.setLastWalPosition(entryStream.getPosition());
132  }
133}