001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.concurrent.PriorityBlockingQueue;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.hbase.replication.WALEntryFilter;
031
032/**
033 * Used by a {@link RecoveredReplicationSourceShipper}.
034 */
035@InterfaceAudience.Private
036@InterfaceStability.Evolving
037public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
038  private static final Logger LOG =
039      LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
040
041  public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
042      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
043      ReplicationSource source) {
044    super(fs, conf, logQueue, startPosition, filter, source);
045  }
046
047  @Override
048  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
049      throws InterruptedException {
050    LOG.trace("Didn't read any new entries from WAL");
051    // we're done with queue recovery, shut ourself down
052    setReaderRunning(false);
053    // shuts down shipper thread immediately
054    entryBatchQueue.put(batch != null ? batch
055        : new WALEntryBatch(replicationBatchCountCapacity, currentPath));
056  }
057}