View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.handler;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.Server;
29  import org.apache.hadoop.hbase.TableNotFoundException;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.client.ClusterConnection;
32  import org.apache.hadoop.hbase.client.FlushRegionCallable;
33  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
34  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
35  import org.apache.hadoop.hbase.executor.EventHandler;
36  import org.apache.hadoop.hbase.executor.EventType;
37  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
38  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.util.RetryCounter;
41  import org.apache.hadoop.hbase.util.RetryCounterFactory;
42  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
43  
44  /**
45   * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
46   * secondary region replicas. This means that a secondary region replica can serve some edits from
47   * it's memstore that that is still not flushed from primary. We do not want to allow secondary
48   * region's seqId to go back in time, when this secondary region is opened elsewhere after a
49   * crash or region move. We will trigger a flush cache in the primary region replica and wait
50   * for observing a complete flush cycle before marking the region readsEnabled. This handler does
51   * the flushing of the primary region replica and ensures that regular region opening is not
52   * blocked while the secondary replica is blocked on flush.
53   */
54  @InterfaceAudience.Private
55  public class RegionReplicaFlushHandler extends EventHandler {
56  
57    private static final Log LOG = LogFactory.getLog(RegionReplicaFlushHandler.class);
58  
59    private final ClusterConnection connection;
60    private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
61    private final RpcControllerFactory rpcControllerFactory;
62    private final int operationTimeout;
63    private final HRegion region;
64  
65    public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
66        RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
67        int operationTimeout, HRegion region) {
68      super(server, EventType.RS_REGION_REPLICA_FLUSH);
69      this.connection = connection;
70      this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
71      this.rpcControllerFactory = rpcControllerFactory;
72      this.operationTimeout = operationTimeout;
73      this.region = region;
74    }
75  
76    @Override
77    public void process() throws IOException {
78      triggerFlushInPrimaryRegion(region);
79    }
80  
81    @Override
82    protected void handleException(Throwable t) {
83      if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
84        LOG.error("Caught throwable while processing event " + eventType, t);
85      } else if (t instanceof RuntimeException) {
86        server.abort("ServerAborting because a runtime exception was thrown", t);
87      } else {
88        // something fishy since we cannot flush the primary region until all retries (retries from
89        // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
90        // close a region without master triggering it. We just abort the server for now.
91        server.abort("ServerAborting because an exception was thrown", t);
92      }
93    }
94  
95    private int getRetriesCount(Configuration conf) {
96      int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
97        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
98      if (numRetries > 10) {
99        int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
100       numRetries = numRetries / mult; // reset if HRS has multiplied this already
101     }
102     return numRetries;
103   }
104 
105   void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
106     long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
107       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
108 
109     int maxAttempts = getRetriesCount(connection.getConfiguration());
110     RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
111 
112     if (LOG.isDebugEnabled()) {
113       LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
114         .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
115        + region.getRegionInfo().getEncodedName() + " to trigger a flush");
116     }
117     while (!region.isClosing() && !region.isClosed()
118         && !server.isAborted() && !server.isStopped()) {
119       FlushRegionCallable flushCallable = new FlushRegionCallable(
120         connection, rpcControllerFactory,
121         RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
122 
123       // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
124       // do not have to wait for the whole flush here, just initiate it.
125       FlushRegionResponse response = null;
126       try {
127          response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
128           .callWithRetries(flushCallable, this.operationTimeout);
129       } catch (IOException ex) {
130         if (ex instanceof TableNotFoundException
131             || connection.isTableDisabled(region.getRegionInfo().getTable())) {
132           return;
133         }
134         throw ex;
135       }
136 
137       if (response.getFlushed()) {
138         // then we have to wait for seeing the flush entry. All reads will be rejected until we see
139         // a complete flush cycle or replay a region open event
140         if (LOG.isDebugEnabled()) {
141           LOG.debug("Successfully triggered a flush of primary region replica "
142               + ServerRegionReplicaUtil
143                 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
144                 + " of region " + region.getRegionInfo().getEncodedName()
145                 + " Now waiting and blocking reads until observing a full flush cycle");
146         }
147         break;
148       } else {
149         if (response.hasWroteFlushWalMarker()) {
150           if(response.getWroteFlushWalMarker()) {
151             if (LOG.isDebugEnabled()) {
152               LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
153                   + "region replica " + ServerRegionReplicaUtil
154                     .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
155                   + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
156                   + "blocking reads until observing a flush marker");
157             }
158             break;
159           } else {
160             // somehow we were not able to get the primary to write the flush request. It may be
161             // closing or already flushing. Retry flush again after some sleep.
162             if (!counter.shouldRetry()) {
163               throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
164                   "retries. Failing opening of this region replica "
165                   + region.getRegionInfo().getEncodedName());
166             }
167           }
168         } else {
169           // nothing to do. Are we dealing with an old server?
170           LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
171               + "Continuing to open the secondary region replica: "
172               + region.getRegionInfo().getEncodedName());
173           region.setReadsEnabled(true);
174           break;
175         }
176       }
177       try {
178         counter.sleepUntilNextRetry();
179       } catch (InterruptedException e) {
180         throw new InterruptedIOException(e.getMessage());
181       }
182     }
183   }
184 
185 }