1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.Server;
29 import org.apache.hadoop.hbase.TableNotFoundException;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.client.ClusterConnection;
32 import org.apache.hadoop.hbase.client.FlushRegionCallable;
33 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
34 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
35 import org.apache.hadoop.hbase.executor.EventHandler;
36 import org.apache.hadoop.hbase.executor.EventType;
37 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.util.RetryCounter;
41 import org.apache.hadoop.hbase.util.RetryCounterFactory;
42 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class RegionReplicaFlushHandler extends EventHandler {
56
57 private static final Log LOG = LogFactory.getLog(RegionReplicaFlushHandler.class);
58
59 private final ClusterConnection connection;
60 private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
61 private final RpcControllerFactory rpcControllerFactory;
62 private final int operationTimeout;
63 private final HRegion region;
64
65 public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
66 RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
67 int operationTimeout, HRegion region) {
68 super(server, EventType.RS_REGION_REPLICA_FLUSH);
69 this.connection = connection;
70 this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
71 this.rpcControllerFactory = rpcControllerFactory;
72 this.operationTimeout = operationTimeout;
73 this.region = region;
74 }
75
76 @Override
77 public void process() throws IOException {
78 triggerFlushInPrimaryRegion(region);
79 }
80
81 @Override
82 protected void handleException(Throwable t) {
83 if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
84 LOG.error("Caught throwable while processing event " + eventType, t);
85 } else if (t instanceof RuntimeException) {
86 server.abort("ServerAborting because a runtime exception was thrown", t);
87 } else {
88
89
90
91 server.abort("ServerAborting because an exception was thrown", t);
92 }
93 }
94
95 private int getRetriesCount(Configuration conf) {
96 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
97 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
98 if (numRetries > 10) {
99 int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
100 numRetries = numRetries / mult;
101 }
102 return numRetries;
103 }
104
105 void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
106 long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
107 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
108
109 int maxAttempts = getRetriesCount(connection.getConfiguration());
110 RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
111
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
114 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
115 + region.getRegionInfo().getEncodedName() + " to trigger a flush");
116 }
117 while (!region.isClosing() && !region.isClosed()
118 && !server.isAborted() && !server.isStopped()) {
119 FlushRegionCallable flushCallable = new FlushRegionCallable(
120 connection, rpcControllerFactory,
121 RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
122
123
124
125 FlushRegionResponse response = null;
126 try {
127 response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
128 .callWithRetries(flushCallable, this.operationTimeout);
129 } catch (IOException ex) {
130 if (ex instanceof TableNotFoundException
131 || connection.isTableDisabled(region.getRegionInfo().getTable())) {
132 return;
133 }
134 throw ex;
135 }
136
137 if (response.getFlushed()) {
138
139
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("Successfully triggered a flush of primary region replica "
142 + ServerRegionReplicaUtil
143 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
144 + " of region " + region.getRegionInfo().getEncodedName()
145 + " Now waiting and blocking reads until observing a full flush cycle");
146 }
147 break;
148 } else {
149 if (response.hasWroteFlushWalMarker()) {
150 if(response.getWroteFlushWalMarker()) {
151 if (LOG.isDebugEnabled()) {
152 LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
153 + "region replica " + ServerRegionReplicaUtil
154 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
155 + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
156 + "blocking reads until observing a flush marker");
157 }
158 break;
159 } else {
160
161
162 if (!counter.shouldRetry()) {
163 throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
164 "retries. Failing opening of this region replica "
165 + region.getRegionInfo().getEncodedName());
166 }
167 }
168 } else {
169
170 LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
171 + "Continuing to open the secondary region replica: "
172 + region.getRegionInfo().getEncodedName());
173 region.setReadsEnabled(true);
174 break;
175 }
176 }
177 try {
178 counter.sleepUntilNextRetry();
179 } catch (InterruptedException e) {
180 throw new InterruptedIOException(e.getMessage());
181 }
182 }
183 }
184
185 }