001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.handler; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.Server; 025import org.apache.hadoop.hbase.TableNotFoundException; 026import org.apache.hadoop.hbase.client.AsyncClusterConnection; 027import org.apache.hadoop.hbase.executor.EventHandler; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.regionserver.HRegion; 030import org.apache.hadoop.hbase.util.FutureUtils; 031import org.apache.hadoop.hbase.util.RetryCounter; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 039 040/** 041 * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in 042 * secondary region replicas. This means that a secondary region replica can serve some edits from 043 * it's memstore that are still not flushed from primary. We do not want to allow secondary region's 044 * seqId to go back in time, when this secondary region is opened elsewhere after a crash or region 045 * move. We will trigger a flush cache in the primary region replica and wait for observing a 046 * complete flush cycle before marking the region readsEnabled. This handler does the flushing of 047 * the primary region replica and ensures that regular region opening is not blocked while the 048 * secondary replica is blocked on flush. 049 */ 050@InterfaceAudience.Private 051public class RegionReplicaFlushHandler extends EventHandler { 052 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); 053 054 private final AsyncClusterConnection connection; 055 056 private final HRegion region; 057 058 public RegionReplicaFlushHandler(Server server, HRegion region) { 059 super(server, EventType.RS_REGION_REPLICA_FLUSH); 060 this.connection = server.getAsyncClusterConnection(); 061 this.region = region; 062 } 063 064 @Override 065 public void process() throws IOException { 066 triggerFlushInPrimaryRegion(region); 067 } 068 069 @Override 070 protected void handleException(Throwable t) { 071 if (t instanceof InterruptedIOException || t instanceof InterruptedException) { 072 LOG.error("Caught throwable while processing event " + eventType, t); 073 } else if (t instanceof RuntimeException) { 074 server.abort("Server aborting", t); 075 } else { 076 // something fishy since we cannot flush the primary region until all retries (retries from 077 // rpc times 35 trigger). We cannot close the region since there is no such mechanism to 078 // close a region without master triggering it. We just abort the server for now. 079 server.abort("ServerAborting because an exception was thrown", t); 080 } 081 } 082 083 private int getRetriesCount(Configuration conf) { 084 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 085 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 086 if (numRetries > 10) { 087 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 088 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 089 numRetries = numRetries / mult; // reset if HRS has multiplied this already 090 } 091 return numRetries; 092 } 093 094 private boolean isTableDisabledOrDropped(IOException error, HRegion region) { 095 if (error instanceof TableNotFoundException) { 096 return true; 097 } 098 try { 099 if ( 100 FutureUtils.get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable())) 101 ) { 102 return true; 103 } 104 } catch (IOException e) { 105 if (error instanceof TableNotFoundException) { 106 return true; 107 } else { 108 LOG.warn("failed tp check whether table {} is disabled", region.getRegionInfo().getTable(), 109 e); 110 } 111 } 112 return false; 113 } 114 115 void triggerFlushInPrimaryRegion(final HRegion region) throws IOException { 116 long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 117 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 118 119 int maxAttempts = getRetriesCount(connection.getConfiguration()); 120 RetryCounter counter = new RetryCounterFactory(maxAttempts, (int) pause).create(); 121 122 if (LOG.isDebugEnabled()) { 123 LOG.debug("RPC'ing to primary " 124 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 125 .getRegionNameAsString() 126 + " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH"); 127 } 128 while ( 129 !region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped() 130 ) { 131 // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we 132 // do not have to wait for the whole flush here, just initiate it. 133 FlushRegionResponse response; 134 try { 135 response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil 136 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true)); 137 } catch (IOException e) { 138 if (isTableDisabledOrDropped(e, region)) { 139 return; 140 } 141 if (!counter.shouldRetry()) { 142 throw e; 143 } 144 // The reason that why we need to retry here is that, the retry for asynchronous admin 145 // request is much simpler than the normal operation, if we failed to locate the region once 146 // then we will throw the exception out and will not try to relocate again. So here we need 147 // to add some retries by ourselves to prevent shutting down the region server too 148 // frequent... 149 LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}", 150 ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 151 .getRegionNameAsString(), 152 region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e); 153 try { 154 counter.sleepUntilNextRetry(); 155 } catch (InterruptedException e1) { 156 throw new InterruptedIOException(e1.getMessage()); 157 } 158 continue; 159 } 160 161 if (response.getFlushed()) { 162 // then we have to wait for seeing the flush entry. All reads will be rejected until we see 163 // a complete flush cycle or replay a region open event 164 if (LOG.isDebugEnabled()) { 165 LOG.debug("Triggered flush of primary region replica " 166 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 167 .getRegionNameAsString() 168 + " for " + region.getRegionInfo().getEncodedName() 169 + "; now waiting and blocking reads until completes a full flush cycle"); 170 } 171 region.setReadsEnabled(true); 172 break; 173 } else { 174 if (response.hasWroteFlushWalMarker()) { 175 if (response.getWroteFlushWalMarker()) { 176 if (LOG.isDebugEnabled()) { 177 LOG.debug("Triggered empty flush marker (memstore empty) on primary region replica " 178 + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 179 .getRegionNameAsString() 180 + " for " + region.getRegionInfo().getEncodedName() 181 + "; now waiting and blocking reads until observing a flush marker"); 182 } 183 region.setReadsEnabled(true); 184 break; 185 } else { 186 // somehow we were not able to get the primary to write the flush request. It may be 187 // closing or already flushing. Retry flush again after some sleep. 188 if (!counter.shouldRetry()) { 189 throw new IOException("Cannot cause primary to flush or drop a wal marker after " 190 + counter.getAttemptTimes() + " retries. Failing opening of this region replica " 191 + region.getRegionInfo().getRegionNameAsString()); 192 } else { 193 LOG.warn( 194 "Cannot cause primary replica {} to flush or drop a wal marker " 195 + "for region replica {}, retry={}", 196 ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) 197 .getRegionNameAsString(), 198 region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes()); 199 } 200 } 201 } else { 202 // nothing to do. Are we dealing with an old server? 203 LOG.warn("Was not able to trigger a flush from primary region due to old server version? " 204 + "Continuing to open the secondary region replica: " 205 + region.getRegionInfo().getRegionNameAsString()); 206 break; 207 } 208 } 209 try { 210 counter.sleepUntilNextRetry(); 211 } catch (InterruptedException e) { 212 throw new InterruptedIOException(e.getMessage()); 213 } 214 } 215 region.setReadsEnabled(true); 216 } 217}