001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.handler; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.Server; 025import org.apache.hadoop.hbase.TableNotFoundException; 026import org.apache.hadoop.hbase.client.AsyncClusterConnection; 027import org.apache.hadoop.hbase.executor.EventHandler; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.regionserver.HRegion; 030import org.apache.hadoop.hbase.util.FutureUtils; 031import org.apache.hadoop.hbase.util.RetryCounter; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 039 040/** 041 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in 042 * secondary region replicas. This means that a secondary region replica can serve some edits from 043 * it's memstore that are still not flushed from primary. We do not want to allow secondary region's 044 * seqId to go back in time, when this secondary region is opened elsewhere after a crash or region 045 * move. We will trigger a flush cache in the primary region replica and wait for observing a 046 * complete flush cycle before marking the region readsEnabled. This handler does the flushing of 047 * the primary region replica and ensures that regular region opening is not blocked while the 048 * secondary replica is blocked on flush. 049 */ 050@InterfaceAudience.Private 051public class RegionReplicaFlushHandler extends EventHandler { 052 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); 053 054 private final AsyncClusterConnection connection; 055 056 private final HRegion region; 057 058 public RegionReplicaFlushHandler(Server server, HRegion region) { 059 super(server, EventType.RS_REGION_REPLICA_FLUSH); 060 this.connection = server.getAsyncClusterConnection(); 061 this.region = region; 062 } 063 064 @Override 065 public void process() throws IOException { 066 triggerFlushInPrimaryRegion(region); 067 } 068 069 @Override 070 protected void handleException(Throwable t) { 071 if (t instanceof InterruptedIOException || t instanceof InterruptedException) { 072 LOG.error("Caught throwable while processing event " + eventType, t); 073 } else if (t instanceof RuntimeException) { 074 server.abort("Server aborting", t); 075 } else { 076 // something fishy since we cannot flush the primary region until all retries (retries from 077 // rpc times 35 trigger). We cannot close the region since there is no such mechanism to 078 // close a region without master triggering it. We just abort the server for now. 079 server.abort("ServerAborting because an exception was thrown", t); 080 } 081 } 082 083 private int getRetriesCount(Configuration conf) { 084 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 085 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 086 if (numRetries > 10) { 087 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 088 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 089 numRetries = numRetries / mult; // reset if HRS has multiplied this already 090 } 091 return numRetries; 092 } 093 094 void triggerFlushInPrimaryRegion(final HRegion region) throws IOException { 095 long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 096 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 097 098 int maxAttempts = getRetriesCount(connection.getConfiguration()); 099 RetryCounter counter = new RetryCounterFactory(maxAttempts, (int) pause).create(); 100 101 if (LOG.isDebugEnabled()) { 102 LOG.debug("RPC'ing to primary " 103 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 104 .getRegionNameAsString() 105 + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH"); 106 } 107 while ( 108 !region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped() 109 ) { 110 // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we 111 // do not have to wait for the whole flush here, just initiate it. 112 FlushRegionResponse response; 113 try { 114 response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil 115 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true)); 116 } catch (IOException e) { 117 if ( 118 e instanceof TableNotFoundException || FutureUtils 119 .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable())) 120 ) { 121 return; 122 } 123 if (!counter.shouldRetry()) { 124 throw e; 125 } 126 // The reason that why we need to retry here is that, the retry for asynchronous admin 127 // request is much simpler than the normal operation, if we failed to locate the region once 128 // then we will throw the exception out and will not try to relocate again. So here we need 129 // to add some retries by ourselves to prevent shutting down the region server too 130 // frequent... 131 LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}", 132 ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 133 .getRegionNameAsString(), 134 region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e); 135 try { 136 counter.sleepUntilNextRetry(); 137 } catch (InterruptedException e1) { 138 throw new InterruptedIOException(e1.getMessage()); 139 } 140 continue; 141 } 142 143 if (response.getFlushed()) { 144 // then we have to wait for seeing the flush entry. All reads will be rejected until we see 145 // a complete flush cycle or replay a region open event 146 if (LOG.isDebugEnabled()) { 147 LOG.debug("Triggered flush of primary region replica " 148 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 149 .getRegionNameAsString() 150 + " for " + region.getRegionInfo().getEncodedName() 151 + "; now waiting and blocking reads until completes a full flush cycle"); 152 } 153 region.setReadsEnabled(true); 154 break; 155 } else { 156 if (response.hasWroteFlushWalMarker()) { 157 if (response.getWroteFlushWalMarker()) { 158 if (LOG.isDebugEnabled()) { 159 LOG.debug("Triggered empty flush marker (memstore empty) on primary region replica " 160 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 161 .getRegionNameAsString() 162 + " for " + region.getRegionInfo().getEncodedName() 163 + "; now waiting and blocking reads until observing a flush marker"); 164 } 165 region.setReadsEnabled(true); 166 break; 167 } else { 168 // somehow we were not able to get the primary to write the flush request. It may be 169 // closing or already flushing. Retry flush again after some sleep. 170 if (!counter.shouldRetry()) { 171 throw new IOException("Cannot cause primary to flush or drop a wal marker after " 172 + counter.getAttemptTimes() + " retries. Failing opening of this region replica " 173 + region.getRegionInfo().getRegionNameAsString()); 174 } else { 175 LOG.warn( 176 "Cannot cause primary replica {} to flush or drop a wal marker " 177 + "for region replica {}, retry={}", 178 ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 179 .getRegionNameAsString(), 180 region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes()); 181 } 182 } 183 } else { 184 // nothing to do. Are we dealing with an old server? 185 LOG.warn("Was not able to trigger a flush from primary region due to old server version? " 186 + "Continuing to open the secondary region replica: " 187 + region.getRegionInfo().getRegionNameAsString()); 188 break; 189 } 190 } 191 try { 192 counter.sleepUntilNextRetry(); 193 } catch (InterruptedException e) { 194 throw new InterruptedIOException(e.getMessage()); 195 } 196 } 197 region.setReadsEnabled(true); 198 } 199}