001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.replication.WALEntryFilter;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.wal.WAL.Entry;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * WAL reader for a serial replication peer.
032 */
033@InterfaceAudience.Private
034public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
035
036  // used to store the first cell in an entry before filtering. This is because that if serial
037  // replication is enabled, we may find out that an entry can not be pushed after filtering. And
038  // when we try the next time, the cells maybe null since the entry has already been filtered,
039  // especially for region event wal entries. And this can also used to determine whether we can
040  // skip filtering.
041  private Cell firstCellInEntryBeforeFiltering;
042
043  private final SerialReplicationChecker checker;
044
045  public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
046    ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
047    ReplicationSource source, String walGroupId) {
048    super(fs, conf, logQueue, startPosition, filter, source, walGroupId);
049    checker = new SerialReplicationChecker(conf, source);
050  }
051
052  @Override
053  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
054    throws IOException, InterruptedException {
055    Path currentPath = entryStream.getCurrentPath();
056    long positionBefore = entryStream.getPosition();
057    for (;;) {
058      Entry entry = entryStream.peek();
059      boolean doFiltering = true;
060      if (firstCellInEntryBeforeFiltering == null) {
061        assert !entry.getEdit().isEmpty() : "should not write empty edits";
062        // Used to locate the region record in meta table. In WAL we only have the table name and
063        // encoded region name which can not be mapping to region name without scanning all the
064        // records for a table, so we need a start key, just like what we have done at client side
065        // when locating a region. For the markers, we will use the start key of the region as the
066        // row key for the edit. And we need to do this before filtering since all the cells may
067        // be filtered out, especially that for the markers.
068        firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
069      } else {
070        // if this is not null then we know that the entry has already been filtered.
071        doFiltering = false;
072      }
073
074      if (doFiltering) {
075        entry = filterEntry(entry);
076      }
077      if (entry != null) {
078        if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
079          if (batch.getLastWalPosition() > positionBefore) {
080            // we have something that can push, break
081            break;
082          } else {
083            checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
084          }
085        }
086        // arrive here means we can push the entry, record the last sequence id
087        batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
088          entry.getKey().getSequenceId());
089        // actually remove the entry.
090        removeEntryFromStream(entryStream, batch);
091        if (addEntryToBatch(batch, entry)) {
092          break;
093        }
094      } else {
095        // actually remove the entry.
096        removeEntryFromStream(entryStream, batch);
097      }
098      boolean hasNext = entryStream.hasNext();
099      // always return if we have switched to a new file.
100      if (switched(entryStream, currentPath)) {
101        batch.setEndOfFile(true);
102        break;
103      }
104      if (!hasNext) {
105        break;
106      }
107    }
108  }
109
110  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
111    throws IOException {
112    entryStream.next();
113    firstCellInEntryBeforeFiltering = null;
114    batch.setLastWalPosition(entryStream.getPosition());
115  }
116}