001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.replication.regionserver;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
030import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
031import org.apache.hadoop.hbase.coprocessor.ObserverContext;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.RegionObserver;
035import org.apache.hadoop.hbase.regionserver.HRegionServer;
036import org.apache.hadoop.hbase.regionserver.RegionServerServices;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An Observer to add HFile References to replication queue.
044 */
045@CoreCoprocessor
046@InterfaceAudience.Private
047public class ReplicationObserver implements RegionCoprocessor, RegionObserver {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationObserver.class);
049
050  @Override
051  public Optional<RegionObserver> getRegionObserver() {
052    return Optional.of(this);
053  }
054
055  @Override
056  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
057      justification="NPE should never happen; if it does it is a bigger issue")
058  public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
059      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
060    RegionCoprocessorEnvironment env = ctx.getEnvironment();
061    Configuration c = env.getConfiguration();
062    if (pairs == null || pairs.isEmpty() ||
063        !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
064          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
065      LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
066          + "data replication.");
067      return;
068    }
069    // This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is
070    // just going to break. This is all private. Not allowed. Regions shouldn't assume they are
071    // hosted in a RegionServer. TODO: fix.
072    RegionServerServices rss = ((HasRegionServerServices)env).getRegionServerServices();
073    Replication rep = (Replication)((HRegionServer)rss).getReplicationSourceService();
074    rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
075  }
076}