001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver.handler; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.Server; 027import org.apache.hadoop.hbase.TableNotFoundException; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.client.ClusterConnection; 032import org.apache.hadoop.hbase.client.FlushRegionCallable; 033import org.apache.hadoop.hbase.client.RegionReplicaUtil; 034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 035import org.apache.hadoop.hbase.executor.EventHandler; 036import org.apache.hadoop.hbase.executor.EventType; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.util.RetryCounter; 041import org.apache.hadoop.hbase.util.RetryCounterFactory; 042import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 043 044/** 045 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in 046 * secondary region replicas. This means that a secondary region replica can serve some edits from 047 * it's memstore that that is still not flushed from primary. We do not want to allow secondary 048 * region's seqId to go back in time, when this secondary region is opened elsewhere after a 049 * crash or region move. We will trigger a flush cache in the primary region replica and wait 050 * for observing a complete flush cycle before marking the region readsEnabled. This handler does 051 * the flushing of the primary region replica and ensures that regular region opening is not 052 * blocked while the secondary replica is blocked on flush. 053 */ 054@InterfaceAudience.Private 055public class RegionReplicaFlushHandler extends EventHandler { 056 057 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); 058 059 private final ClusterConnection connection; 060 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 061 private final RpcControllerFactory rpcControllerFactory; 062 private final int operationTimeout; 063 private final HRegion region; 064 065 public RegionReplicaFlushHandler(Server server, ClusterConnection connection, 066 RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, 067 int operationTimeout, HRegion region) { 068 super(server, EventType.RS_REGION_REPLICA_FLUSH); 069 this.connection = connection; 070 this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; 071 this.rpcControllerFactory = rpcControllerFactory; 072 this.operationTimeout = operationTimeout; 073 this.region = region; 074 } 075 076 @Override 077 public void process() throws IOException { 078 triggerFlushInPrimaryRegion(region); 079 } 080 081 @Override 082 protected void handleException(Throwable t) { 083 if (t instanceof InterruptedIOException || t instanceof InterruptedException) { 084 LOG.error("Caught throwable while processing event " + eventType, t); 085 } else if (t instanceof RuntimeException) { 086 server.abort("ServerAborting because a runtime exception was thrown", t); 087 } else { 088 // something fishy since we cannot flush the primary region until all retries (retries from 089 // rpc times 35 trigger). We cannot close the region since there is no such mechanism to 090 // close a region without master triggering it. We just abort the server for now. 091 server.abort("ServerAborting because an exception was thrown", t); 092 } 093 } 094 095 private int getRetriesCount(Configuration conf) { 096 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 097 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 098 if (numRetries > 10) { 099 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 100 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 101 numRetries = numRetries / mult; // reset if HRS has multiplied this already 102 } 103 return numRetries; 104 } 105 106 void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { 107 long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 108 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 109 110 int maxAttempts = getRetriesCount(connection.getConfiguration()); 111 RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); 112 113 if (LOG.isDebugEnabled()) { 114 LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil 115 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " 116 + region.getRegionInfo().getEncodedName() + " to trigger a flush"); 117 } 118 while (!region.isClosing() && !region.isClosed() 119 && !server.isAborted() && !server.isStopped()) { 120 FlushRegionCallable flushCallable = new FlushRegionCallable( 121 connection, rpcControllerFactory, 122 RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); 123 124 // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we 125 // do not have to wait for the whole flush here, just initiate it. 126 FlushRegionResponse response = null; 127 try { 128 response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller() 129 .callWithRetries(flushCallable, this.operationTimeout); 130 } catch (IOException ex) { 131 if (ex instanceof TableNotFoundException 132 || connection.isTableDisabled(region.getRegionInfo().getTable())) { 133 return; 134 } 135 throw ex; 136 } 137 138 if (response.getFlushed()) { 139 // then we have to wait for seeing the flush entry. All reads will be rejected until we see 140 // a complete flush cycle or replay a region open event 141 if (LOG.isDebugEnabled()) { 142 LOG.debug("Successfully triggered a flush of primary region replica " 143 + ServerRegionReplicaUtil 144 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() 145 + " of region " + region.getRegionInfo().getEncodedName() 146 + " Now waiting and blocking reads until observing a full flush cycle"); 147 } 148 region.setReadsEnabled(true); 149 break; 150 } else { 151 if (response.hasWroteFlushWalMarker()) { 152 if(response.getWroteFlushWalMarker()) { 153 if (LOG.isDebugEnabled()) { 154 LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " 155 + "region replica " + ServerRegionReplicaUtil 156 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() 157 + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " 158 + "blocking reads until observing a flush marker"); 159 } 160 region.setReadsEnabled(true); 161 break; 162 } else { 163 // somehow we were not able to get the primary to write the flush request. It may be 164 // closing or already flushing. Retry flush again after some sleep. 165 if (!counter.shouldRetry()) { 166 throw new IOException("Cannot cause primary to flush or drop a wal marker after " + 167 "retries. Failing opening of this region replica " 168 + region.getRegionInfo().getEncodedName()); 169 } 170 } 171 } else { 172 // nothing to do. Are we dealing with an old server? 173 LOG.warn("Was not able to trigger a flush from primary region due to old server version? " 174 + "Continuing to open the secondary region replica: " 175 + region.getRegionInfo().getEncodedName()); 176 region.setReadsEnabled(true); 177 break; 178 } 179 } 180 try { 181 counter.sleepUntilNextRetry(); 182 } catch (InterruptedException e) { 183 throw new InterruptedIOException(e.getMessage()); 184 } 185 } 186 } 187 188}