001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.handler; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.Server; 025import org.apache.hadoop.hbase.TableNotFoundException; 026import org.apache.hadoop.hbase.client.ClusterConnection; 027import org.apache.hadoop.hbase.client.FlushRegionCallable; 028import org.apache.hadoop.hbase.client.RegionReplicaUtil; 029import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 030import org.apache.hadoop.hbase.executor.EventHandler; 031import org.apache.hadoop.hbase.executor.EventType; 032import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 033import org.apache.hadoop.hbase.regionserver.HRegion; 034import org.apache.hadoop.hbase.util.RetryCounter; 035import org.apache.hadoop.hbase.util.RetryCounterFactory; 036import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 042 043/** 044 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in 045 * secondary region replicas. This means that a secondary region replica can serve some edits from 046 * it's memstore that are still not flushed from primary. We do not want to allow secondary region's 047 * seqId to go back in time, when this secondary region is opened elsewhere after a crash or region 048 * move. We will trigger a flush cache in the primary region replica and wait for observing a 049 * complete flush cycle before marking the region readsEnabled. This handler does the flushing of 050 * the primary region replica and ensures that regular region opening is not blocked while the 051 * secondary replica is blocked on flush. 052 */ 053@InterfaceAudience.Private 054public class RegionReplicaFlushHandler extends EventHandler { 055 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); 056 057 private final ClusterConnection connection; 058 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 059 private final RpcControllerFactory rpcControllerFactory; 060 private final int operationTimeout; 061 private final HRegion region; 062 063 public RegionReplicaFlushHandler(Server server, ClusterConnection connection, 064 RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, 065 int operationTimeout, HRegion region) { 066 super(server, EventType.RS_REGION_REPLICA_FLUSH); 067 this.connection = connection; 068 this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; 069 this.rpcControllerFactory = rpcControllerFactory; 070 this.operationTimeout = operationTimeout; 071 this.region = region; 072 } 073 074 @Override 075 public void process() throws IOException { 076 triggerFlushInPrimaryRegion(region); 077 } 078 079 @Override 080 protected void handleException(Throwable t) { 081 if (t instanceof InterruptedIOException || t instanceof InterruptedException) { 082 LOG.error("Caught throwable while processing event " + eventType, t); 083 } else if (t instanceof RuntimeException) { 084 server.abort("Server aborting", t); 085 } else { 086 // something fishy since we cannot flush the primary region until all retries (retries from 087 // rpc times 35 trigger). We cannot close the region since there is no such mechanism to 088 // close a region without master triggering it. We just abort the server for now. 089 server.abort("ServerAborting because an exception was thrown", t); 090 } 091 } 092 093 private int getRetriesCount(Configuration conf) { 094 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 095 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 096 if (numRetries > 10) { 097 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 098 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 099 numRetries = numRetries / mult; // reset if HRS has multiplied this already 100 } 101 return numRetries; 102 } 103 104 void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { 105 long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 106 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 107 108 int maxAttempts = getRetriesCount(connection.getConfiguration()); 109 RetryCounter counter = new RetryCounterFactory(maxAttempts, (int) pause).create(); 110 111 if (LOG.isDebugEnabled()) { 112 LOG.debug("RPC'ing to primary " 113 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 114 .getRegionNameAsString() 115 + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH"); 116 } 117 while ( 118 !region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped() 119 ) { 120 FlushRegionCallable flushCallable = new FlushRegionCallable(connection, rpcControllerFactory, 121 RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); 122 123 // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we 124 // do not have to wait for the whole flush here, just initiate it. 125 FlushRegionResponse response = null; 126 try { 127 response = rpcRetryingCallerFactory.<FlushRegionResponse> newCaller() 128 .callWithRetries(flushCallable, this.operationTimeout); 129 } catch (IOException ex) { 130 if ( 131 ex instanceof TableNotFoundException 132 || connection.isTableDisabled(region.getRegionInfo().getTable()) 133 ) { 134 return; 135 } 136 throw ex; 137 } 138 139 if (response.getFlushed()) { 140 // then we have to wait for seeing the flush entry. All reads will be rejected until we see 141 // a complete flush cycle or replay a region open event 142 if (LOG.isDebugEnabled()) { 143 LOG.debug("Triggered flush of primary region replica " 144 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 145 .getEncodedName() 146 + " for " + region.getRegionInfo().getEncodedName() 147 + "; now waiting and blocking reads until completes a full flush cycle"); 148 } 149 region.setReadsEnabled(true); 150 break; 151 } else { 152 if (response.hasWroteFlushWalMarker()) { 153 if (response.getWroteFlushWalMarker()) { 154 if (LOG.isDebugEnabled()) { 155 LOG.debug( 156 "Triggered empty flush marker (memstore empty) on primary " + "region replica " 157 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 158 .getEncodedName() 159 + " for " + region.getRegionInfo().getEncodedName() + "; now waiting and " 160 + "blocking reads until observing a flush marker"); 161 } 162 region.setReadsEnabled(true); 163 break; 164 } else { 165 // somehow we were not able to get the primary to write the flush request. It may be 166 // closing or already flushing. Retry flush again after some sleep. 167 if (!counter.shouldRetry()) { 168 throw new IOException("Cannot cause primary to flush or drop a wal marker after " 169 + "retries. Failing opening of this region replica " 170 + region.getRegionInfo().getEncodedName()); 171 } 172 } 173 } else { 174 // nothing to do. Are we dealing with an old server? 175 LOG.warn("Was not able to trigger a flush from primary region due to old server version? " 176 + "Continuing to open the secondary region replica: " 177 + region.getRegionInfo().getEncodedName()); 178 region.setReadsEnabled(true); 179 break; 180 } 181 } 182 try { 183 counter.sleepUntilNextRetry(); 184 } catch (InterruptedException e) { 185 throw new InterruptedIOException(e.getMessage()); 186 } 187 } 188 } 189 190}