001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.PriorityBlockingQueue;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.replication.WALEntryFilter;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * WAL reader for a serial replication peer.
033 */
034@InterfaceAudience.Private
035public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
036
037  // used to store the first cell in an entry before filtering. This is because that if serial
038  // replication is enabled, we may find out that an entry can not be pushed after filtering. And
039  // when we try the next time, the cells maybe null since the entry has already been filtered,
040  // especially for region event wal entries. And this can also used to determine whether we can
041  // skip filtering.
042  private Cell firstCellInEntryBeforeFiltering;
043
044  private final SerialReplicationChecker checker;
045
046  public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
047      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
048      ReplicationSource source) {
049    super(fs, conf, logQueue, startPosition, filter, source);
050    checker = new SerialReplicationChecker(conf, source);
051  }
052
053  @Override
054  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
055      throws IOException, InterruptedException {
056    Path currentPath = entryStream.getCurrentPath();
057    if (!entryStream.hasNext()) {
058      // check whether we have switched a file
059      if (currentPath != null && switched(entryStream, currentPath)) {
060        return WALEntryBatch.endOfFile(currentPath);
061      } else {
062        return null;
063      }
064    }
065    if (currentPath != null) {
066      if (switched(entryStream, currentPath)) {
067        return WALEntryBatch.endOfFile(currentPath);
068      }
069    } else {
070      // when reading from the entry stream first time we will enter here
071      currentPath = entryStream.getCurrentPath();
072    }
073    long positionBefore = entryStream.getPosition();
074    WALEntryBatch batch = createBatch(entryStream);
075    for (;;) {
076      Entry entry = entryStream.peek();
077      boolean doFiltering = true;
078      if (firstCellInEntryBeforeFiltering == null) {
079        assert !entry.getEdit().isEmpty() : "should not write empty edits";
080        // Used to locate the region record in meta table. In WAL we only have the table name and
081        // encoded region name which can not be mapping to region name without scanning all the
082        // records for a table, so we need a start key, just like what we have done at client side
083        // when locating a region. For the markers, we will use the start key of the region as the
084        // row key for the edit. And we need to do this before filtering since all the cells may
085        // be filtered out, especially that for the markers.
086        firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
087      } else {
088        // if this is not null then we know that the entry has already been filtered.
089        doFiltering = false;
090      }
091
092      if (doFiltering) {
093        entry = filterEntry(entry);
094      }
095      if (entry != null) {
096        if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
097          if (batch.getLastWalPosition() > positionBefore) {
098            // we have something that can push, break
099            break;
100          } else {
101            checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
102          }
103        }
104        // arrive here means we can push the entry, record the last sequence id
105        batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
106          entry.getKey().getSequenceId());
107        // actually remove the entry.
108        removeEntryFromStream(entryStream, batch);
109        if (addEntryToBatch(batch, entry)) {
110          break;
111        }
112      } else {
113        // actually remove the entry.
114        removeEntryFromStream(entryStream, batch);
115      }
116      boolean hasNext = entryStream.hasNext();
117      // always return if we have switched to a new file.
118      if (switched(entryStream, currentPath)) {
119        batch.setEndOfFile(true);
120        break;
121      }
122      if (!hasNext) {
123        break;
124      }
125    }
126    return batch;
127  }
128
129  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
130      throws IOException {
131    entryStream.next();
132    firstCellInEntryBeforeFiltering = null;
133    batch.setLastWalPosition(entryStream.getPosition());
134  }
135}