001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.ThreadLocalRandom; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.Stoppable; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.client.RegionInfoBuilder; 030import org.apache.hadoop.hbase.regionserver.HRegion; 031import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 032import org.apache.hadoop.hbase.regionserver.RegionServerServices; 033import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This chore is responsible to create replication marker rows with special WALEdit with family as 044 * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as 045 * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key 046 * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every 047 * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms 048 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate 049 * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in 050 * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} 051 * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the 052 * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, 053 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the 054 * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table. 055 */ 056@InterfaceAudience.Private 057public class ReplicationMarkerChore extends ScheduledChore { 058 private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class); 059 private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); 060 public static final RegionInfo REGION_INFO = 061 RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build(); 062 private static final String DELIMITER = "_"; 063 private final Configuration conf; 064 private final RegionServerServices rsServices; 065 private WAL wal; 066 067 public static final String REPLICATION_MARKER_ENABLED_KEY = 068 "hbase.regionserver.replication.marker.enabled"; 069 public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false; 070 071 public static final String REPLICATION_MARKER_CHORE_DURATION_KEY = 072 "hbase.regionserver.replication.marker.chore.duration"; 073 public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds 074 075 public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices, 076 int period, Configuration conf) { 077 super("ReplicationTrackerChore", stopper, period); 078 this.conf = conf; 079 this.rsServices = rsServices; 080 } 081 082 @Override 083 protected void chore() { 084 if (wal == null) { 085 try { 086 // TODO: We need to add support for multi WAL implementation. 087 wal = rsServices.getWAL(null); 088 } catch (IOException ioe) { 089 LOG.warn("Unable to get WAL ", ioe); 090 // Shouldn't happen. Ignore and wait for the next chore run. 091 return; 092 } 093 } 094 String serverName = rsServices.getServerName().getServerName(); 095 long timeStamp = EnvironmentEdgeManager.currentTime(); 096 // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname, 097 // regionserver name and wal offset at ReplicationSourceWALReaderThread. 098 byte[] rowKey = getRowKey(serverName, timeStamp); 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("Creating replication marker edit."); 101 } 102 103 // This creates a new ArrayList of all the online regions for every call. 104 List<HRegion> regions = rsServices.getRegions(); 105 106 if (regions.isEmpty()) { 107 LOG.info("There are no online regions for this server, so skipping adding replication marker" 108 + " rows for this regionserver"); 109 return; 110 } 111 HRegion region = regions.get(ThreadLocalRandom.current().nextInt(regions.size())); 112 try { 113 WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp); 114 } catch (IOException ioe) { 115 LOG.error("Exception while sync'ing replication tracker edit", ioe); 116 // TODO: Should we stop region server or add a metric and keep going. 117 } 118 } 119 120 /** 121 * Creates a rowkey with region server name and timestamp. 122 * @param serverName region server name 123 * @param timestamp timestamp 124 */ 125 public static byte[] getRowKey(String serverName, long timestamp) { 126 // converting to string since this will help seeing the timestamp in string format using 127 // hbase shell commands. 128 String timestampStr = String.valueOf(timestamp); 129 final String rowKeyStr = serverName + DELIMITER + timestampStr; 130 return Bytes.toBytes(rowKeyStr); 131 } 132}