001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.Stoppable;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.RegionInfoBuilder;
030import org.apache.hadoop.hbase.regionserver.HRegion;
031import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
032import org.apache.hadoop.hbase.regionserver.RegionServerServices;
033import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.hbase.wal.WALEdit;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This chore is responsible to create replication marker rows with special WALEdit with family as
044 * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
045 * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
046 * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
047 * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
048 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
049 * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
050 * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
051 * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
052 * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
053 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
054 * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
055 */
056@InterfaceAudience.Private
057public class ReplicationMarkerChore extends ScheduledChore {
058  private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
059  private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
060  public static final RegionInfo REGION_INFO =
061    RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
062  private static final String DELIMITER = "_";
063  private final Configuration conf;
064  private final RegionServerServices rsServices;
065  private WAL wal;
066
067  public static final String REPLICATION_MARKER_ENABLED_KEY =
068    "hbase.regionserver.replication.marker.enabled";
069  public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
070
071  public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
072    "hbase.regionserver.replication.marker.chore.duration";
073  public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
074
075  public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
076    int period, Configuration conf) {
077    super("ReplicationTrackerChore", stopper, period);
078    this.conf = conf;
079    this.rsServices = rsServices;
080  }
081
082  @Override
083  protected void chore() {
084    if (wal == null) {
085      try {
086        // TODO: We need to add support for multi WAL implementation.
087        wal = rsServices.getWAL(null);
088      } catch (IOException ioe) {
089        LOG.warn("Unable to get WAL ", ioe);
090        // Shouldn't happen. Ignore and wait for the next chore run.
091        return;
092      }
093    }
094    String serverName = rsServices.getServerName().getServerName();
095    long timeStamp = EnvironmentEdgeManager.currentTime();
096    // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
097    // regionserver name and wal offset at ReplicationSourceWALReaderThread.
098    byte[] rowKey = getRowKey(serverName, timeStamp);
099    if (LOG.isTraceEnabled()) {
100      LOG.trace("Creating replication marker edit.");
101    }
102
103    // This creates a new ArrayList of all the online regions for every call.
104    List<HRegion> regions = rsServices.getRegions();
105
106    if (regions.isEmpty()) {
107      LOG.info("There are no online regions for this server, so skipping adding replication marker"
108        + " rows for this regionserver");
109      return;
110    }
111    HRegion region = regions.get(ThreadLocalRandom.current().nextInt(regions.size()));
112    try {
113      WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
114    } catch (IOException ioe) {
115      LOG.error("Exception while sync'ing replication tracker edit", ioe);
116      // TODO: Should we stop region server or add a metric and keep going.
117    }
118  }
119
120  /**
121   * Creates a rowkey with region server name and timestamp.
122   * @param serverName region server name
123   * @param timestamp  timestamp
124   */
125  public static byte[] getRowKey(String serverName, long timestamp) {
126    // converting to string since this will help seeing the timestamp in string format using
127    // hbase shell commands.
128    String timestampStr = String.valueOf(timestamp);
129    final String rowKeyStr = serverName + DELIMITER + timestampStr;
130    return Bytes.toBytes(rowKeyStr);
131  }
132}