1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.BindException;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.NavigableMap;
35 import java.util.Set;
36 import java.util.TreeSet;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.Cell;
44 import org.apache.hadoop.hbase.CellScannable;
45 import org.apache.hadoop.hbase.CellScanner;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.DoNotRetryIOException;
48 import org.apache.hadoop.hbase.DroppedSnapshotException;
49 import org.apache.hadoop.hbase.HBaseIOException;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.MetaTableAccessor;
54 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
55 import org.apache.hadoop.hbase.NotServingRegionException;
56 import org.apache.hadoop.hbase.ServerName;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.UnknownScannerException;
59 import org.apache.hadoop.hbase.classification.InterfaceAudience;
60 import org.apache.hadoop.hbase.client.Append;
61 import org.apache.hadoop.hbase.client.ConnectionUtils;
62 import org.apache.hadoop.hbase.client.Delete;
63 import org.apache.hadoop.hbase.client.Durability;
64 import org.apache.hadoop.hbase.client.Get;
65 import org.apache.hadoop.hbase.client.Increment;
66 import org.apache.hadoop.hbase.client.Mutation;
67 import org.apache.hadoop.hbase.client.Put;
68 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69 import org.apache.hadoop.hbase.client.Result;
70 import org.apache.hadoop.hbase.client.RowMutations;
71 import org.apache.hadoop.hbase.client.Scan;
72 import org.apache.hadoop.hbase.client.VersionInfoUtil;
73 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
74 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
75 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
76 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
77 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
78 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
79 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
80 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
81 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
82 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
83 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
84 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
85 import org.apache.hadoop.hbase.ipc.PriorityFunction;
86 import org.apache.hadoop.hbase.ipc.QosPriority;
87 import org.apache.hadoop.hbase.ipc.RpcCallContext;
88 import org.apache.hadoop.hbase.ipc.RpcServer;
89 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
90 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
91 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
92 import org.apache.hadoop.hbase.ipc.ServerRpcController;
93 import org.apache.hadoop.hbase.master.MasterRpcServices;
94 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
95 import org.apache.hadoop.hbase.protobuf.RequestConverter;
96 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
156 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
157 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
158 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
159 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.OperationQuota;
166 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
167 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
168 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
169 import org.apache.hadoop.hbase.regionserver.Region.Operation;
170 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
171 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
172 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
173 import org.apache.hadoop.hbase.wal.WAL;
174 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
175 import org.apache.hadoop.hbase.security.User;
176 import org.apache.hadoop.hbase.util.Bytes;
177 import org.apache.hadoop.hbase.util.Counter;
178 import org.apache.hadoop.hbase.util.DNS;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.Pair;
181 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
182 import org.apache.hadoop.hbase.util.Strings;
183 import org.apache.hadoop.hbase.wal.WALKey;
184 import org.apache.hadoop.hbase.wal.WALSplitter;
185 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
186 import org.apache.zookeeper.KeeperException;
187
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.protobuf.ByteString;
190 import com.google.protobuf.Message;
191 import com.google.protobuf.RpcController;
192 import com.google.protobuf.ServiceException;
193 import com.google.protobuf.TextFormat;
194
195
196
197
198 @InterfaceAudience.Private
199 @SuppressWarnings("deprecation")
200 public class RSRpcServices implements HBaseRPCErrorHandler,
201 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
202 ConfigurationObserver {
203 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
204
205
206 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
207 "hbase.region.server.rpc.scheduler.factory.class";
208
209
210
211
212
213
214 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
215 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
216
217
218
219 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
220
221
222 final Counter requestCount = new Counter();
223
224 final RpcServerInterface rpcServer;
225 final InetSocketAddress isa;
226
227 private final HRegionServer regionServer;
228 private final long maxScannerResultSize;
229
230
231 private final PriorityFunction priority;
232
233 private ScannerIdGenerator scannerIdGenerator;
234 private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
235 new ConcurrentHashMap<String, RegionScannerHolder>();
236
237
238
239
240 private final int scannerLeaseTimeoutPeriod;
241
242
243
244
245 private final int rpcTimeout;
246
247
248
249
250 private final long minimumScanTimeLimitDelta;
251
252
253
254
255 private static class RegionScannerHolder {
256 private AtomicLong nextCallSeq = new AtomicLong(0);
257 private RegionScanner s;
258 private Region r;
259
260 public RegionScannerHolder(RegionScanner s, Region r) {
261 this.s = s;
262 this.r = r;
263 }
264
265 private long getNextCallSeq() {
266 return nextCallSeq.get();
267 }
268
269 private void incNextCallSeq() {
270 nextCallSeq.incrementAndGet();
271 }
272
273 private void rollbackNextCallSeq() {
274 nextCallSeq.decrementAndGet();
275 }
276 }
277
278
279
280
281
282 private class ScannerListener implements LeaseListener {
283 private final String scannerName;
284
285 ScannerListener(final String n) {
286 this.scannerName = n;
287 }
288
289 @Override
290 public void leaseExpired() {
291 RegionScannerHolder rsh = scanners.remove(this.scannerName);
292 if (rsh != null) {
293 RegionScanner s = rsh.s;
294 LOG.info("Scanner " + this.scannerName + " lease expired on region "
295 + s.getRegionInfo().getRegionNameAsString());
296 Region region = null;
297 try {
298 region = regionServer.getRegion(s.getRegionInfo().getRegionName());
299 if (region != null && region.getCoprocessorHost() != null) {
300 region.getCoprocessorHost().preScannerClose(s);
301 }
302 } catch (IOException e) {
303 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
304 } finally {
305 try {
306 s.close();
307 if (region != null && region.getCoprocessorHost() != null) {
308 region.getCoprocessorHost().postScannerClose(s);
309 }
310 } catch (IOException e) {
311 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
312 }
313 }
314 } else {
315 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
316 " scanner found, hence no chance to close that related scanner!");
317 }
318 }
319 }
320
321 private static ResultOrException getResultOrException(
322 final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
323 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
324 }
325
326 private static ResultOrException getResultOrException(final Exception e, final int index) {
327 return getResultOrException(ResponseConverter.buildActionResult(e), index);
328 }
329
330 private static ResultOrException getResultOrException(
331 final ResultOrException.Builder builder, final int index) {
332 return builder.setIndex(index).build();
333 }
334
335
336
337
338
339
340
341 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
342 throws IOException, OperationConflictException {
343 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
344 boolean canProceed = false;
345 try {
346 canProceed = regionServer.nonceManager.startOperation(
347 nonceGroup, mutation.getNonce(), regionServer);
348 } catch (InterruptedException ex) {
349 throw new InterruptedIOException("Nonce start operation interrupted");
350 }
351 if (!canProceed) {
352
353 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
354 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
355 + "] may have already completed";
356 throw new OperationConflictException(message);
357 }
358 return mutation.getNonce();
359 }
360
361
362
363
364
365
366
367 private void endNonceOperation(final MutationProto mutation,
368 long nonceGroup, boolean success) {
369 if (regionServer.nonceManager != null && mutation.hasNonce()) {
370 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
371 }
372 }
373
374
375
376
377 private boolean isClientCellBlockSupport() {
378 RpcCallContext context = RpcServer.getCurrentCall();
379 return context != null && context.isClientCellBlockSupported();
380 }
381
382 private void addResult(final MutateResponse.Builder builder,
383 final Result result, final PayloadCarryingRpcController rpcc) {
384 if (result == null) return;
385 if (isClientCellBlockSupport()) {
386 builder.setResult(ProtobufUtil.toResultNoData(result));
387 rpcc.setCellScanner(result.cellScanner());
388 } else {
389 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
390 builder.setResult(pbr);
391 }
392 }
393
394 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
395 final RpcController controller, boolean isDefaultRegion) {
396 builder.setStale(!isDefaultRegion);
397 if (results == null || results.isEmpty()) return;
398 if (isClientCellBlockSupport()) {
399 for (Result res : results) {
400 builder.addCellsPerResult(res.size());
401 builder.addPartialFlagPerResult(res.isPartial());
402 }
403 ((PayloadCarryingRpcController)controller).
404 setCellScanner(CellUtil.createCellScanner(results));
405 } else {
406 for (Result res: results) {
407 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
408 builder.addResults(pbr);
409 }
410 }
411 }
412
413
414
415
416
417
418
419
420
421 private ClientProtos.RegionLoadStats mutateRows(final Region region,
422 final List<ClientProtos.Action> actions,
423 final CellScanner cellScanner) throws IOException {
424 int countOfCompleteMutation = 0;
425 try {
426 if (!region.getRegionInfo().isMetaTable()) {
427 regionServer.cacheFlusher.reclaimMemStoreMemory();
428 }
429 RowMutations rm = null;
430 for (ClientProtos.Action action: actions) {
431 if (action.hasGet()) {
432 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
433 action.getGet());
434 }
435 MutationType type = action.getMutation().getMutateType();
436 if (rm == null) {
437 rm = new RowMutations(action.getMutation().getRow().toByteArray());
438 }
439 switch (type) {
440 case PUT:
441 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
442 ++countOfCompleteMutation;
443 rm.add(put);
444 break;
445 case DELETE:
446 Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
447 ++countOfCompleteMutation;
448 rm.add(delete);
449 break;
450 default:
451 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
452 }
453 }
454 region.mutateRow(rm);
455 return ((HRegion)region).getRegionStats();
456 } finally {
457
458
459 for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
460 skipCellsForMutation(actions.get(i), cellScanner);
461 }
462 }
463 }
464
465
466
467
468
469
470
471
472
473
474
475
476
477 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
478 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
479 CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
480 int countOfCompleteMutation = 0;
481 try {
482 if (!region.getRegionInfo().isMetaTable()) {
483 regionServer.cacheFlusher.reclaimMemStoreMemory();
484 }
485 RowMutations rm = null;
486 for (ClientProtos.Action action: actions) {
487 if (action.hasGet()) {
488 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
489 action.getGet());
490 }
491 MutationType type = action.getMutation().getMutateType();
492 if (rm == null) {
493 rm = new RowMutations(action.getMutation().getRow().toByteArray());
494 }
495 switch (type) {
496 case PUT:
497 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
498 ++countOfCompleteMutation;
499 rm.add(put);
500 break;
501 case DELETE:
502 Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
503 ++countOfCompleteMutation;
504 rm.add(delete);
505 break;
506 default:
507 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
508 }
509 }
510 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, true);
511 } finally {
512
513
514 for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
515 skipCellsForMutation(actions.get(i), cellScanner);
516 }
517 }
518 }
519
520
521
522
523
524
525
526
527
528
529
530 private Result append(final Region region, final OperationQuota quota, final MutationProto m,
531 final CellScanner cellScanner, long nonceGroup) throws IOException {
532 long before = EnvironmentEdgeManager.currentTime();
533 Append append = ProtobufUtil.toAppend(m, cellScanner);
534 quota.addMutation(append);
535 Result r = null;
536 if (region.getCoprocessorHost() != null) {
537 r = region.getCoprocessorHost().preAppend(append);
538 }
539 if (r == null) {
540 long nonce = startNonceOperation(m, nonceGroup);
541 boolean success = false;
542 try {
543 r = region.append(append, nonceGroup, nonce);
544 success = true;
545 } finally {
546 endNonceOperation(m, nonceGroup, success);
547 }
548 if (region.getCoprocessorHost() != null) {
549 region.getCoprocessorHost().postAppend(append, r);
550 }
551 }
552 if (regionServer.metricsRegionServer != null) {
553 regionServer.metricsRegionServer.updateAppend(
554 EnvironmentEdgeManager.currentTime() - before);
555 }
556 return r;
557 }
558
559
560
561
562
563
564
565
566
567 private Result increment(final Region region, final OperationQuota quota,
568 final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
569 long before = EnvironmentEdgeManager.currentTime();
570 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
571 quota.addMutation(increment);
572 Result r = null;
573 if (region.getCoprocessorHost() != null) {
574 r = region.getCoprocessorHost().preIncrement(increment);
575 }
576 if (r == null) {
577 long nonce = startNonceOperation(mutation, nonceGroup);
578 boolean success = false;
579 try {
580 r = region.increment(increment, nonceGroup, nonce);
581 success = true;
582 } finally {
583 endNonceOperation(mutation, nonceGroup, success);
584 }
585 if (region.getCoprocessorHost() != null) {
586 r = region.getCoprocessorHost().postIncrement(increment, r);
587 }
588 }
589 if (regionServer.metricsRegionServer != null) {
590 regionServer.metricsRegionServer.updateIncrement(
591 EnvironmentEdgeManager.currentTime() - before);
592 }
593 return r;
594 }
595
596
597
598
599
600
601
602
603
604
605
606
607 private List<CellScannable> doNonAtomicRegionMutation(final Region region,
608 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
609 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
610
611
612
613
614 List<ClientProtos.Action> mutations = null;
615 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
616 RpcCallContext context = RpcServer.getCurrentCall();
617 IOException sizeIOE = null;
618 Object lastBlock = null;
619 for (ClientProtos.Action action : actions.getActionList()) {
620 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
621 try {
622 Result r = null;
623
624 if (context != null
625 && context.isRetryImmediatelySupported()
626 && (context.getResponseCellSize() > maxQuotaResultSize
627 || context.getResponseBlockSize() > maxQuotaResultSize)) {
628
629
630
631 if (sizeIOE == null ) {
632
633
634
635
636 sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
637 + " CellSize: " + context.getResponseCellSize()
638 + " BlockSize: " + context.getResponseBlockSize());
639
640
641
642 rpcServer.getMetrics().exception(sizeIOE);
643 }
644
645
646
647
648
649 resultOrExceptionBuilder = ResultOrException.newBuilder().
650 setException(ResponseConverter.buildException(sizeIOE));
651 resultOrExceptionBuilder.setIndex(action.getIndex());
652 builder.addResultOrException(resultOrExceptionBuilder.build());
653 skipCellsForMutation(action, cellScanner);
654 continue;
655 }
656 if (action.hasGet()) {
657 long before = EnvironmentEdgeManager.currentTime();
658 try {
659 Get get = ProtobufUtil.toGet(action.getGet());
660 r = region.get(get);
661 } finally {
662 if (regionServer.metricsRegionServer != null) {
663 regionServer.metricsRegionServer.updateGet(
664 EnvironmentEdgeManager.currentTime() - before);
665 }
666 }
667 } else if (action.hasServiceCall()) {
668 resultOrExceptionBuilder = ResultOrException.newBuilder();
669 try {
670 Message result = execServiceOnRegion(region, action.getServiceCall());
671 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
672 ClientProtos.CoprocessorServiceResult.newBuilder();
673 resultOrExceptionBuilder.setServiceResult(
674 serviceResultBuilder.setValue(
675 serviceResultBuilder.getValueBuilder()
676 .setName(result.getClass().getName())
677 .setValue(result.toByteString())));
678 } catch (IOException ioe) {
679 rpcServer.getMetrics().exception(ioe);
680 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
681 }
682 } else if (action.hasMutation()) {
683 MutationType type = action.getMutation().getMutateType();
684 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
685 !mutations.isEmpty()) {
686
687 doBatchOp(builder, region, quota, mutations, cellScanner);
688 mutations.clear();
689 }
690 switch (type) {
691 case APPEND:
692 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
693 break;
694 case INCREMENT:
695 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
696 break;
697 case PUT:
698 case DELETE:
699
700 if (mutations == null) {
701 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
702 }
703 mutations.add(action);
704 break;
705 default:
706 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
707 }
708 } else {
709 throw new HBaseIOException("Unexpected Action type");
710 }
711 if (r != null) {
712 ClientProtos.Result pbResult = null;
713 if (isClientCellBlockSupport()) {
714 pbResult = ProtobufUtil.toResultNoData(r);
715
716 if (cellsToReturn == null) {
717 cellsToReturn = new ArrayList<CellScannable>();
718 }
719 cellsToReturn.add(r);
720 } else {
721 pbResult = ProtobufUtil.toResult(r);
722 }
723 lastBlock = addSize(context, r, lastBlock);
724 resultOrExceptionBuilder =
725 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
726 }
727
728
729
730
731 } catch (IOException ie) {
732 rpcServer.getMetrics().exception(ie);
733 resultOrExceptionBuilder = ResultOrException.newBuilder().
734 setException(ResponseConverter.buildException(ie));
735 }
736 if (resultOrExceptionBuilder != null) {
737
738 resultOrExceptionBuilder.setIndex(action.getIndex());
739 builder.addResultOrException(resultOrExceptionBuilder.build());
740 }
741 }
742
743 if (mutations != null && !mutations.isEmpty()) {
744 doBatchOp(builder, region, quota, mutations, cellScanner);
745 }
746 return cellsToReturn;
747 }
748
749
750
751
752
753
754
755
756 private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
757 final OperationQuota quota,
758 final List<ClientProtos.Action> mutations, final CellScanner cells) {
759 Mutation[] mArray = new Mutation[mutations.size()];
760 long before = EnvironmentEdgeManager.currentTime();
761 boolean batchContainsPuts = false, batchContainsDelete = false;
762 try {
763 int i = 0;
764 for (ClientProtos.Action action: mutations) {
765 MutationProto m = action.getMutation();
766 Mutation mutation;
767 if (m.getMutateType() == MutationType.PUT) {
768 mutation = ProtobufUtil.toPut(m, cells);
769 batchContainsPuts = true;
770 } else {
771 mutation = ProtobufUtil.toDelete(m, cells);
772 batchContainsDelete = true;
773 }
774 mArray[i++] = mutation;
775 quota.addMutation(mutation);
776 }
777
778 if (!region.getRegionInfo().isMetaTable()) {
779 regionServer.cacheFlusher.reclaimMemStoreMemory();
780 }
781
782 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
783 HConstants.NO_NONCE);
784 for (i = 0; i < codes.length; i++) {
785 int index = mutations.get(i).getIndex();
786 Exception e = null;
787 switch (codes[i].getOperationStatusCode()) {
788 case BAD_FAMILY:
789 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
790 builder.addResultOrException(getResultOrException(e, index));
791 break;
792
793 case SANITY_CHECK_FAILURE:
794 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
795 builder.addResultOrException(getResultOrException(e, index));
796 break;
797
798 default:
799 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
800 builder.addResultOrException(getResultOrException(e, index));
801 break;
802
803 case SUCCESS:
804 builder.addResultOrException(getResultOrException(
805 ClientProtos.Result.getDefaultInstance(), index,
806 ((HRegion) region).getRegionStats()));
807 break;
808 }
809 }
810 } catch (IOException ie) {
811 int processedMutationIndex = 0;
812 for (Action mutation : mutations) {
813
814 if (mArray[processedMutationIndex++] == null) {
815 skipCellsForMutation(mutation, cells);
816 }
817 builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
818 }
819 }
820 if (regionServer.metricsRegionServer != null) {
821 long after = EnvironmentEdgeManager.currentTime();
822 if (batchContainsPuts) {
823 regionServer.metricsRegionServer.updatePut(after - before);
824 }
825 if (batchContainsDelete) {
826 regionServer.metricsRegionServer.updateDelete(after - before);
827 }
828 }
829 }
830
831
832
833
834
835
836
837
838
839
840
841 private OperationStatus [] doReplayBatchOp(final Region region,
842 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
843 long before = EnvironmentEdgeManager.currentTime();
844 boolean batchContainsPuts = false, batchContainsDelete = false;
845 try {
846 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
847 WALSplitter.MutationReplay m = it.next();
848
849 if (m.type == MutationType.PUT) {
850 batchContainsPuts = true;
851 } else {
852 batchContainsDelete = true;
853 }
854
855 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
856 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
857 if (metaCells != null && !metaCells.isEmpty()) {
858 for (Cell metaCell : metaCells) {
859 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
860 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
861 HRegion hRegion = (HRegion)region;
862 if (compactionDesc != null) {
863
864
865 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
866 replaySeqId);
867 continue;
868 }
869 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
870 if (flushDesc != null && !isDefaultReplica) {
871 hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
872 continue;
873 }
874 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
875 if (regionEvent != null && !isDefaultReplica) {
876 hRegion.replayWALRegionEventMarker(regionEvent);
877 continue;
878 }
879 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
880 if (bulkLoadEvent != null) {
881 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
882 continue;
883 }
884 }
885 it.remove();
886 }
887 }
888 requestCount.add(mutations.size());
889 if (!region.getRegionInfo().isMetaTable()) {
890 regionServer.cacheFlusher.reclaimMemStoreMemory();
891 }
892 return region.batchReplay(mutations.toArray(
893 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
894 } finally {
895 if (regionServer.metricsRegionServer != null) {
896 long after = EnvironmentEdgeManager.currentTime();
897 if (batchContainsPuts) {
898 regionServer.metricsRegionServer.updatePut(after - before);
899 }
900 if (batchContainsDelete) {
901 regionServer.metricsRegionServer.updateDelete(after - before);
902 }
903 }
904 }
905 }
906
907 private void closeAllScanners() {
908
909
910 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
911 try {
912 e.getValue().s.close();
913 } catch (IOException ioe) {
914 LOG.warn("Closing scanner " + e.getKey(), ioe);
915 }
916 }
917 }
918
919 public RSRpcServices(HRegionServer rs) throws IOException {
920 regionServer = rs;
921
922 RpcSchedulerFactory rpcSchedulerFactory;
923 try {
924 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
925 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
926 SimpleRpcSchedulerFactory.class);
927 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
928 } catch (InstantiationException e) {
929 throw new IllegalArgumentException(e);
930 } catch (IllegalAccessException e) {
931 throw new IllegalArgumentException(e);
932 }
933
934 InetSocketAddress initialIsa;
935 InetSocketAddress bindAddress;
936 if(this instanceof MasterRpcServices) {
937 String hostname = getHostname(rs.conf, true);
938 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
939
940 initialIsa = new InetSocketAddress(hostname, port);
941 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
942 } else {
943 String hostname = getHostname(rs.conf, false);
944 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
945 HConstants.DEFAULT_REGIONSERVER_PORT);
946
947 initialIsa = new InetSocketAddress(hostname, port);
948 bindAddress = new InetSocketAddress(
949 rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
950 }
951 if (initialIsa.getAddress() == null) {
952 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
953 }
954 priority = createPriority();
955 String name = rs.getProcessName() + "/" + initialIsa.toString();
956
957 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
958 try {
959 rpcServer = new RpcServer(rs, name, getServices(),
960 bindAddress,
961 rs.conf,
962 rpcSchedulerFactory.create(rs.conf, this, rs));
963 rpcServer.setRsRpcServices(this);
964 } catch (BindException be) {
965 String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
966 HConstants.REGIONSERVER_PORT;
967 throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
968 "' configuration property.", be.getCause() != null ? be.getCause() : be);
969 }
970
971 scannerLeaseTimeoutPeriod = rs.conf.getInt(
972 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
973 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
974 maxScannerResultSize = rs.conf.getLong(
975 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
976 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
977 rpcTimeout = rs.conf.getInt(
978 HConstants.HBASE_RPC_TIMEOUT_KEY,
979 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
980 minimumScanTimeLimitDelta = rs.conf.getLong(
981 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
982 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
983
984 InetSocketAddress address = rpcServer.getListenerAddress();
985 if (address == null) {
986 throw new IOException("Listener channel is closed");
987 }
988
989 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
990 rpcServer.setErrorHandler(this);
991 rs.setName(name);
992 }
993
994 @Override
995 public void onConfigurationChange(Configuration newConf) {
996 if (rpcServer instanceof ConfigurationObserver) {
997 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
998 }
999 }
1000
1001 protected PriorityFunction createPriority() {
1002 return new AnnotationReadingPriorityFunction(this);
1003 }
1004
1005 public static String getHostname(Configuration conf, boolean isMaster)
1006 throws UnknownHostException {
1007 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
1008 HRegionServer.RS_HOSTNAME_KEY);
1009 if (hostname == null || hostname.isEmpty()) {
1010 String masterOrRS = isMaster ? "master" : "regionserver";
1011 return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
1012 conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
1013 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
1014 } else {
1015 LOG.info("hostname is configured to be " + hostname);
1016 return hostname;
1017 }
1018 }
1019
1020 RegionScanner getScanner(long scannerId) {
1021 String scannerIdString = Long.toString(scannerId);
1022 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1023 if (scannerHolder != null) {
1024 return scannerHolder.s;
1025 }
1026 return null;
1027 }
1028
1029 public String getScanDetailsWithId(long scannerId) {
1030 RegionScanner scanner = getScanner(scannerId);
1031 if (scanner == null) {
1032 return null;
1033 }
1034 StringBuilder builder = new StringBuilder();
1035 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1036 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1037 return builder.toString();
1038 }
1039
1040
1041
1042
1043
1044 long getScannerVirtualTime(long scannerId) {
1045 String scannerIdString = Long.toString(scannerId);
1046 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1047 if (scannerHolder != null) {
1048 return scannerHolder.getNextCallSeq();
1049 }
1050 return 0L;
1051 }
1052
1053 long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1054 long scannerId = this.scannerIdGenerator.generateNewScannerId();
1055 String scannerName = String.valueOf(scannerId);
1056
1057 RegionScannerHolder existing =
1058 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1059 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1060
1061 regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1062 new ScannerListener(scannerName));
1063 return scannerId;
1064 }
1065
1066
1067
1068
1069
1070 Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1071 if (context != null && !r.isEmpty()) {
1072 for (Cell c : r.rawCells()) {
1073 context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1074
1075
1076
1077
1078
1079
1080 byte[] valueArray = c.getValueArray();
1081 if (valueArray != lastBlock) {
1082 context.incrementResponseBlockSize(valueArray.length);
1083 lastBlock = valueArray;
1084 }
1085 }
1086 }
1087 return lastBlock;
1088 }
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099 Region getRegion(
1100 final RegionSpecifier regionSpecifier) throws IOException {
1101 return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1102 ProtobufUtil.getRegionEncodedName(regionSpecifier));
1103 }
1104
1105 @VisibleForTesting
1106 public PriorityFunction getPriority() {
1107 return priority;
1108 }
1109
1110 @VisibleForTesting
1111 public Configuration getConfiguration() {
1112 return regionServer.getConfiguration();
1113 }
1114
1115 private RegionServerQuotaManager getQuotaManager() {
1116 return regionServer.getRegionServerQuotaManager();
1117 }
1118
1119 void start() {
1120 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1121 rpcServer.start();
1122 }
1123
1124 void stop() {
1125 closeAllScanners();
1126 rpcServer.stop();
1127 }
1128
1129
1130
1131
1132
1133
1134 protected void checkOpen() throws IOException {
1135 if (regionServer.isAborted()) {
1136 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1137 }
1138 if (regionServer.isStopped()) {
1139 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1140 }
1141 if (!regionServer.fsOk) {
1142 throw new RegionServerStoppedException("File system not available");
1143 }
1144 if (!regionServer.isOnline()) {
1145 throw new ServerNotRunningYetException("Server is not running yet");
1146 }
1147 }
1148
1149
1150
1151
1152 protected List<BlockingServiceAndInterface> getServices() {
1153 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1154 bssi.add(new BlockingServiceAndInterface(
1155 ClientService.newReflectiveBlockingService(this),
1156 ClientService.BlockingInterface.class));
1157 bssi.add(new BlockingServiceAndInterface(
1158 AdminService.newReflectiveBlockingService(this),
1159 AdminService.BlockingInterface.class));
1160 return bssi;
1161 }
1162
1163 public InetSocketAddress getSocketAddress() {
1164 return isa;
1165 }
1166
1167 @Override
1168 public int getPriority(RequestHeader header, Message param, User user) {
1169 return priority.getPriority(header, param, user);
1170 }
1171
1172 @Override
1173 public long getDeadline(RequestHeader header, Message param) {
1174 return priority.getDeadline(header, param);
1175 }
1176
1177
1178
1179
1180
1181
1182
1183
1184 @Override
1185 public boolean checkOOME(final Throwable e) {
1186 return exitIfOOME(e);
1187 }
1188
1189 public static boolean exitIfOOME(final Throwable e ){
1190 boolean stop = false;
1191 try {
1192 if (e instanceof OutOfMemoryError
1193 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1194 || (e.getMessage() != null && e.getMessage().contains(
1195 "java.lang.OutOfMemoryError"))) {
1196 stop = true;
1197 LOG.fatal("Run out of memory; " + RSRpcServices.class.getSimpleName()
1198 + " will abort itself immediately", e);
1199 }
1200 } finally {
1201 if (stop) {
1202 Runtime.getRuntime().halt(1);
1203 }
1204 }
1205 return stop;
1206 }
1207
1208
1209
1210
1211
1212
1213
1214
1215 @Override
1216 @QosPriority(priority=HConstants.ADMIN_QOS)
1217 public CloseRegionResponse closeRegion(final RpcController controller,
1218 final CloseRegionRequest request) throws ServiceException {
1219 final ServerName sn = (request.hasDestinationServer() ?
1220 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1221
1222 try {
1223 checkOpen();
1224 if (request.hasServerStartCode()) {
1225
1226 long serverStartCode = request.getServerStartCode();
1227 if (regionServer.serverName.getStartcode() != serverStartCode) {
1228 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1229 "different server with startCode: " + serverStartCode + ", this server is: "
1230 + regionServer.serverName));
1231 }
1232 }
1233 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1234
1235 requestCount.increment();
1236 LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1237 CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1238 .getCloseRegionCoordination().parseFromProtoRequest(request);
1239
1240 boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1241 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1242 return builder.build();
1243 } catch (IOException ie) {
1244 throw new ServiceException(ie);
1245 }
1246 }
1247
1248
1249
1250
1251
1252
1253
1254
1255 @Override
1256 @QosPriority(priority=HConstants.ADMIN_QOS)
1257 public CompactRegionResponse compactRegion(final RpcController controller,
1258 final CompactRegionRequest request) throws ServiceException {
1259 try {
1260 checkOpen();
1261 requestCount.increment();
1262 Region region = getRegion(request.getRegion());
1263 region.startRegionOperation(Operation.COMPACT_REGION);
1264 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1265 boolean major = false;
1266 byte [] family = null;
1267 Store store = null;
1268 if (request.hasFamily()) {
1269 family = request.getFamily().toByteArray();
1270 store = region.getStore(family);
1271 if (store == null) {
1272 throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1273 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1274 }
1275 }
1276 if (request.hasMajor()) {
1277 major = request.getMajor();
1278 }
1279 if (major) {
1280 if (family != null) {
1281 store.triggerMajorCompaction();
1282 } else {
1283 region.triggerMajorCompaction();
1284 }
1285 }
1286
1287 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1288 if (LOG.isTraceEnabled()) {
1289 LOG.trace("User-triggered compaction requested for region "
1290 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1291 }
1292 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1293 if(family != null) {
1294 regionServer.compactSplitThread.requestCompaction(region, store, log,
1295 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1296 } else {
1297 regionServer.compactSplitThread.requestCompaction(region, log,
1298 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1299 }
1300 return CompactRegionResponse.newBuilder().build();
1301 } catch (IOException ie) {
1302 throw new ServiceException(ie);
1303 }
1304 }
1305
1306
1307
1308
1309
1310
1311
1312
1313 @Override
1314 @QosPriority(priority=HConstants.ADMIN_QOS)
1315 public FlushRegionResponse flushRegion(final RpcController controller,
1316 final FlushRegionRequest request) throws ServiceException {
1317 try {
1318 checkOpen();
1319 requestCount.increment();
1320 Region region = getRegion(request.getRegion());
1321 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1322 boolean shouldFlush = true;
1323 if (request.hasIfOlderThanTs()) {
1324 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1325 }
1326 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1327 if (shouldFlush) {
1328 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
1329 request.getWriteFlushWalMarker() : false;
1330 long startTime = EnvironmentEdgeManager.currentTime();
1331
1332 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1333 ((HRegion)region).flushcache(true, writeFlushWalMarker);
1334 if (flushResult.isFlushSucceeded()) {
1335 long endTime = EnvironmentEdgeManager.currentTime();
1336 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1337 }
1338 boolean compactionNeeded = flushResult.isCompactionNeeded();
1339 if (compactionNeeded) {
1340 regionServer.compactSplitThread.requestSystemCompaction(region,
1341 "Compaction through user triggered flush");
1342 }
1343 builder.setFlushed(flushResult.isFlushSucceeded());
1344 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1345 }
1346 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1347 return builder.build();
1348 } catch (DroppedSnapshotException ex) {
1349
1350
1351
1352
1353 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1354 throw new ServiceException(ex);
1355 } catch (IOException ie) {
1356 throw new ServiceException(ie);
1357 }
1358 }
1359
1360 @Override
1361 @QosPriority(priority=HConstants.ADMIN_QOS)
1362 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1363 final GetOnlineRegionRequest request) throws ServiceException {
1364 try {
1365 checkOpen();
1366 requestCount.increment();
1367 Map<String, Region> onlineRegions = regionServer.onlineRegions;
1368 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1369 for (Region region: onlineRegions.values()) {
1370 list.add(region.getRegionInfo());
1371 }
1372 Collections.sort(list);
1373 return ResponseConverter.buildGetOnlineRegionResponse(list);
1374 } catch (IOException ie) {
1375 throw new ServiceException(ie);
1376 }
1377 }
1378
1379 @Override
1380 @QosPriority(priority=HConstants.ADMIN_QOS)
1381 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1382 final GetRegionInfoRequest request) throws ServiceException {
1383 try {
1384 checkOpen();
1385 requestCount.increment();
1386 Region region = getRegion(request.getRegion());
1387 HRegionInfo info = region.getRegionInfo();
1388 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1389 builder.setRegionInfo(HRegionInfo.convert(info));
1390 if (request.hasCompactionState() && request.getCompactionState()) {
1391 builder.setCompactionState(region.getCompactionState());
1392 }
1393 builder.setIsRecovering(region.isRecovering());
1394 return builder.build();
1395 } catch (IOException ie) {
1396 throw new ServiceException(ie);
1397 }
1398 }
1399
1400
1401
1402
1403
1404
1405
1406
1407 @Override
1408 @QosPriority(priority=HConstants.ADMIN_QOS)
1409 public GetServerInfoResponse getServerInfo(final RpcController controller,
1410 final GetServerInfoRequest request) throws ServiceException {
1411 try {
1412 checkOpen();
1413 } catch (IOException ie) {
1414 throw new ServiceException(ie);
1415 }
1416 requestCount.increment();
1417 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1418 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1419 }
1420
1421 @Override
1422 @QosPriority(priority=HConstants.ADMIN_QOS)
1423 public GetStoreFileResponse getStoreFile(final RpcController controller,
1424 final GetStoreFileRequest request) throws ServiceException {
1425 try {
1426 checkOpen();
1427 Region region = getRegion(request.getRegion());
1428 requestCount.increment();
1429 Set<byte[]> columnFamilies;
1430 if (request.getFamilyCount() == 0) {
1431 columnFamilies = region.getTableDesc().getFamiliesKeys();
1432 } else {
1433 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1434 for (ByteString cf: request.getFamilyList()) {
1435 columnFamilies.add(cf.toByteArray());
1436 }
1437 }
1438 int nCF = columnFamilies.size();
1439 List<String> fileList = region.getStoreFileList(
1440 columnFamilies.toArray(new byte[nCF][]));
1441 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1442 builder.addAllStoreFile(fileList);
1443 return builder.build();
1444 } catch (IOException ie) {
1445 throw new ServiceException(ie);
1446 }
1447 }
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457 @Override
1458 @QosPriority(priority = HConstants.ADMIN_QOS)
1459 public MergeRegionsResponse mergeRegions(final RpcController controller,
1460 final MergeRegionsRequest request) throws ServiceException {
1461 try {
1462 checkOpen();
1463 requestCount.increment();
1464 Region regionA = getRegion(request.getRegionA());
1465 Region regionB = getRegion(request.getRegionB());
1466 boolean forcible = request.getForcible();
1467 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1468 regionA.startRegionOperation(Operation.MERGE_REGION);
1469 regionB.startRegionOperation(Operation.MERGE_REGION);
1470 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1471 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1472 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1473 }
1474 LOG.info("Receiving merging request for " + regionA + ", " + regionB
1475 + ",forcible=" + forcible);
1476 long startTime = EnvironmentEdgeManager.currentTime();
1477 FlushResult flushResult = regionA.flush(true);
1478 if (flushResult.isFlushSucceeded()) {
1479 long endTime = EnvironmentEdgeManager.currentTime();
1480 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1481 }
1482 startTime = EnvironmentEdgeManager.currentTime();
1483 flushResult = regionB.flush(true);
1484 if (flushResult.isFlushSucceeded()) {
1485 long endTime = EnvironmentEdgeManager.currentTime();
1486 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1487 }
1488 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1489 masterSystemTime, RpcServer.getRequestUser());
1490 return MergeRegionsResponse.newBuilder().build();
1491 } catch (DroppedSnapshotException ex) {
1492 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1493 throw new ServiceException(ex);
1494 } catch (IOException ie) {
1495 throw new ServiceException(ie);
1496 }
1497 }
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522 @Override
1523 @QosPriority(priority=HConstants.ADMIN_QOS)
1524 public OpenRegionResponse openRegion(final RpcController controller,
1525 final OpenRegionRequest request) throws ServiceException {
1526 requestCount.increment();
1527 if (request.hasServerStartCode()) {
1528
1529 long serverStartCode = request.getServerStartCode();
1530 if (regionServer.serverName.getStartcode() != serverStartCode) {
1531 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1532 "different server with startCode: " + serverStartCode + ", this server is: "
1533 + regionServer.serverName));
1534 }
1535 }
1536
1537 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1538 final int regionCount = request.getOpenInfoCount();
1539 final Map<TableName, HTableDescriptor> htds =
1540 new HashMap<TableName, HTableDescriptor>(regionCount);
1541 final boolean isBulkAssign = regionCount > 1;
1542 try {
1543 checkOpen();
1544 } catch (IOException ie) {
1545 TableName tableName = null;
1546 if (regionCount == 1) {
1547 RegionInfo ri = request.getOpenInfo(0).getRegion();
1548 if (ri != null) {
1549 tableName = ProtobufUtil.toTableName(ri.getTableName());
1550 }
1551 }
1552 if (!TableName.META_TABLE_NAME.equals(tableName)) {
1553 throw new ServiceException(ie);
1554 }
1555
1556 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1557 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1558 long endTime = System.currentTimeMillis() + timeout;
1559 synchronized (regionServer.online) {
1560 try {
1561 while (System.currentTimeMillis() <= endTime
1562 && !regionServer.isStopped() && !regionServer.isOnline()) {
1563 regionServer.online.wait(regionServer.msgInterval);
1564 }
1565 checkOpen();
1566 } catch (InterruptedException t) {
1567 Thread.currentThread().interrupt();
1568 throw new ServiceException(t);
1569 } catch (IOException e) {
1570 throw new ServiceException(e);
1571 }
1572 }
1573 }
1574
1575 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1576
1577 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1578 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1579 OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1580 getOpenRegionCoordination();
1581 OpenRegionCoordination.OpenRegionDetails ord =
1582 coordination.parseFromProtoRequest(regionOpenInfo);
1583
1584 HTableDescriptor htd;
1585 try {
1586 final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1587 if (onlineRegion != null) {
1588
1589 if (onlineRegion.getCoprocessorHost() != null) {
1590 onlineRegion.getCoprocessorHost().preOpen();
1591 }
1592
1593
1594 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1595 regionServer.getConnection(), region.getRegionName());
1596 if (regionServer.serverName.equals(p.getSecond())) {
1597 Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1598
1599
1600
1601
1602
1603 if (!Boolean.FALSE.equals(closing)
1604 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1605 LOG.warn("Attempted open of " + region.getEncodedName()
1606 + " but already online on this server");
1607 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1608 continue;
1609 }
1610 } else {
1611 LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1612 + " but hbase:meta does not have this server - continue opening.");
1613 regionServer.removeFromOnlineRegions(onlineRegion, null);
1614 }
1615 }
1616 LOG.info("Open " + region.getRegionNameAsString());
1617 htd = htds.get(region.getTable());
1618 if (htd == null) {
1619 htd = regionServer.tableDescriptors.get(region.getTable());
1620 if (htd == null) {
1621 throw new IOException("missing table descriptor for " + region.getEncodedName());
1622 }
1623 htds.put(region.getTable(), htd);
1624 }
1625
1626 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1627 region.getEncodedNameAsBytes(), Boolean.TRUE);
1628
1629 if (Boolean.FALSE.equals(previous)) {
1630
1631
1632 coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1633
1634 throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1635 + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1636 }
1637
1638 if (Boolean.TRUE.equals(previous)) {
1639
1640 LOG.info("Receiving OPEN for the region:" +
1641 region.getRegionNameAsString() + " , which we are already trying to OPEN"
1642 + " - ignoring this new request for this region.");
1643 }
1644
1645
1646
1647 regionServer.removeFromMovedRegions(region.getEncodedName());
1648
1649 if (previous == null) {
1650
1651 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1652 region.getEncodedName())) {
1653
1654
1655 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1656 || regionOpenInfo.getOpenForDistributedLogReplay()) {
1657 regionServer.recoveringRegions.put(region.getEncodedName(), null);
1658 } else {
1659
1660
1661 List<String> tmpRegions = new ArrayList<String>();
1662 tmpRegions.add(region.getEncodedName());
1663 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1664 tmpRegions);
1665 }
1666 }
1667
1668
1669 if (region.isMetaRegion()) {
1670 regionServer.service.submit(new OpenMetaHandler(
1671 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1672 } else {
1673 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1674 regionOpenInfo.getFavoredNodesList());
1675 regionServer.service.submit(new OpenRegionHandler(
1676 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1677 }
1678 }
1679
1680 builder.addOpeningState(RegionOpeningState.OPENED);
1681
1682 } catch (KeeperException zooKeeperEx) {
1683 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1684 throw new ServiceException(zooKeeperEx);
1685 } catch (IOException ie) {
1686 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1687 if (isBulkAssign) {
1688 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1689 } else {
1690 throw new ServiceException(ie);
1691 }
1692 }
1693 }
1694 return builder.build();
1695 }
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708 @Override
1709 public WarmupRegionResponse warmupRegion(final RpcController controller,
1710 final WarmupRegionRequest request) throws ServiceException {
1711
1712 RegionInfo regionInfo = request.getRegionInfo();
1713 final HRegionInfo region = HRegionInfo.convert(regionInfo);
1714 HTableDescriptor htd;
1715 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1716
1717 try {
1718 checkOpen();
1719 String encodedName = region.getEncodedName();
1720 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1721 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1722
1723 if (onlineRegion != null) {
1724 LOG.info("Region already online. Skipping warming up " + region);
1725 return response;
1726 }
1727
1728 if (LOG.isDebugEnabled()) {
1729 LOG.debug("Warming up Region " + region.getRegionNameAsString());
1730 }
1731
1732 htd = regionServer.tableDescriptors.get(region.getTable());
1733
1734 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1735 LOG.info("Region is in transition. Skipping warmup " + region);
1736 return response;
1737 }
1738
1739 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1740 regionServer.getConfiguration(), regionServer, null);
1741
1742 } catch (IOException ie) {
1743 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1744 throw new ServiceException(ie);
1745 }
1746
1747 return response;
1748 }
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758 @Override
1759 @QosPriority(priority = HConstants.REPLAY_QOS)
1760 public ReplicateWALEntryResponse replay(final RpcController controller,
1761 final ReplicateWALEntryRequest request) throws ServiceException {
1762 long before = EnvironmentEdgeManager.currentTime();
1763 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1764 try {
1765 checkOpen();
1766 List<WALEntry> entries = request.getEntryList();
1767 if (entries == null || entries.isEmpty()) {
1768
1769 return ReplicateWALEntryResponse.newBuilder().build();
1770 }
1771 ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1772 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1773 RegionCoprocessorHost coprocessorHost =
1774 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1775 ? region.getCoprocessorHost()
1776 : null;
1777 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1778
1779
1780 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1781 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1782
1783 for (WALEntry entry : entries) {
1784 if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1785 throw new NotServingRegionException("Replay request contains entries from multiple " +
1786 "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1787 + entry.getKey().getEncodedRegionName());
1788 }
1789 if (regionServer.nonceManager != null && isPrimary) {
1790 long nonceGroup = entry.getKey().hasNonceGroup()
1791 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1792 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1793 regionServer.nonceManager.reportOperationFromWal(
1794 nonceGroup,
1795 nonce,
1796 entry.getKey().getWriteTime());
1797 }
1798 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1799 new Pair<WALKey, WALEdit>();
1800 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1801 cells, walEntry, durability);
1802 if (coprocessorHost != null) {
1803
1804
1805 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1806 walEntry.getSecond())) {
1807
1808 continue;
1809 }
1810 walEntries.add(walEntry);
1811 }
1812 if(edits!=null && !edits.isEmpty()) {
1813 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1814 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1815 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1816
1817 for (int i = 0; result != null && i < result.length; i++) {
1818 if (result[i] != OperationStatus.SUCCESS) {
1819 throw new IOException(result[i].getExceptionMsg());
1820 }
1821 }
1822 }
1823 }
1824
1825
1826 WAL wal = getWAL(region);
1827 if (wal != null) {
1828 wal.sync();
1829 }
1830
1831 if (coprocessorHost != null) {
1832 for (Pair<WALKey, WALEdit> entry : walEntries) {
1833 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1834 entry.getSecond());
1835 }
1836 }
1837 return ReplicateWALEntryResponse.newBuilder().build();
1838 } catch (IOException ie) {
1839 throw new ServiceException(ie);
1840 } finally {
1841 if (regionServer.metricsRegionServer != null) {
1842 regionServer.metricsRegionServer.updateReplay(
1843 EnvironmentEdgeManager.currentTime() - before);
1844 }
1845 }
1846 }
1847
1848 WAL getWAL(Region region) {
1849 return ((HRegion)region).getWAL();
1850 }
1851
1852
1853
1854
1855
1856
1857
1858
1859 @Override
1860 @QosPriority(priority=HConstants.REPLICATION_QOS)
1861 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1862 final ReplicateWALEntryRequest request) throws ServiceException {
1863 try {
1864 checkOpen();
1865 if (regionServer.replicationSinkHandler != null) {
1866 requestCount.increment();
1867 List<WALEntry> entries = request.getEntryList();
1868 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1869 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1870 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1871 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1872 return ReplicateWALEntryResponse.newBuilder().build();
1873 } else {
1874 throw new ServiceException("Replication services are not initialized yet");
1875 }
1876 } catch (IOException ie) {
1877 throw new ServiceException(ie);
1878 }
1879 }
1880
1881
1882
1883
1884
1885
1886
1887 @Override
1888 public RollWALWriterResponse rollWALWriter(final RpcController controller,
1889 final RollWALWriterRequest request) throws ServiceException {
1890 try {
1891 checkOpen();
1892 requestCount.increment();
1893 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1894 regionServer.walRoller.requestRollAll();
1895 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1896 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1897 return builder.build();
1898 } catch (IOException ie) {
1899 throw new ServiceException(ie);
1900 }
1901 }
1902
1903
1904
1905
1906
1907
1908
1909
1910 @Override
1911 @QosPriority(priority=HConstants.ADMIN_QOS)
1912 public SplitRegionResponse splitRegion(final RpcController controller,
1913 final SplitRegionRequest request) throws ServiceException {
1914 try {
1915 checkOpen();
1916 requestCount.increment();
1917 Region region = getRegion(request.getRegion());
1918 region.startRegionOperation(Operation.SPLIT_REGION);
1919 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1920 throw new IOException("Can't split replicas directly. "
1921 + "Replicas are auto-split when their primary is split.");
1922 }
1923 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1924 long startTime = EnvironmentEdgeManager.currentTime();
1925 FlushResult flushResult = region.flush(true);
1926 if (flushResult.isFlushSucceeded()) {
1927 long endTime = EnvironmentEdgeManager.currentTime();
1928 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1929 }
1930 byte[] splitPoint = null;
1931 if (request.hasSplitPoint()) {
1932 splitPoint = request.getSplitPoint().toByteArray();
1933 }
1934 ((HRegion)region).forceSplit(splitPoint);
1935 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1936 RpcServer.getRequestUser());
1937 return SplitRegionResponse.newBuilder().build();
1938 } catch (DroppedSnapshotException ex) {
1939 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1940 throw new ServiceException(ex);
1941 } catch (IOException ie) {
1942 throw new ServiceException(ie);
1943 }
1944 }
1945
1946
1947
1948
1949
1950
1951
1952
1953 @Override
1954 @QosPriority(priority=HConstants.ADMIN_QOS)
1955 public StopServerResponse stopServer(final RpcController controller,
1956 final StopServerRequest request) throws ServiceException {
1957 requestCount.increment();
1958 String reason = request.getReason();
1959 regionServer.stop(reason);
1960 return StopServerResponse.newBuilder().build();
1961 }
1962
1963 @Override
1964 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1965 UpdateFavoredNodesRequest request) throws ServiceException {
1966 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1967 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1968 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1969 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1970 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1971 regionUpdateInfo.getFavoredNodesList());
1972 }
1973 respBuilder.setResponse(openInfoList.size());
1974 return respBuilder.build();
1975 }
1976
1977
1978
1979
1980
1981
1982 @Override
1983 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1984 final BulkLoadHFileRequest request) throws ServiceException {
1985 try {
1986 checkOpen();
1987 requestCount.increment();
1988 Region region = getRegion(request.getRegion());
1989 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1990 for (FamilyPath familyPath: request.getFamilyPathList()) {
1991 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1992 familyPath.getPath()));
1993 }
1994 boolean bypass = false;
1995 if (region.getCoprocessorHost() != null) {
1996 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1997 }
1998 boolean loaded = false;
1999 if (!bypass) {
2000 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
2001 }
2002 if (region.getCoprocessorHost() != null) {
2003 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
2004 }
2005 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2006 builder.setLoaded(loaded);
2007 return builder.build();
2008 } catch (IOException ie) {
2009 throw new ServiceException(ie);
2010 }
2011 }
2012
2013 @Override
2014 public CoprocessorServiceResponse execService(final RpcController controller,
2015 final CoprocessorServiceRequest request) throws ServiceException {
2016 try {
2017 checkOpen();
2018 requestCount.increment();
2019 Region region = getRegion(request.getRegion());
2020 Message result = execServiceOnRegion(region, request.getCall());
2021 CoprocessorServiceResponse.Builder builder =
2022 CoprocessorServiceResponse.newBuilder();
2023 builder.setRegion(RequestConverter.buildRegionSpecifier(
2024 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2025 builder.setValue(
2026 builder.getValueBuilder().setName(result.getClass().getName())
2027 .setValue(result.toByteString()));
2028 return builder.build();
2029 } catch (IOException ie) {
2030 throw new ServiceException(ie);
2031 }
2032 }
2033
2034 private Message execServiceOnRegion(Region region,
2035 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2036
2037 ServerRpcController execController = new ServerRpcController();
2038 return region.execService(execController, serviceCall);
2039 }
2040
2041
2042
2043
2044
2045
2046
2047
2048 @Override
2049 public GetResponse get(final RpcController controller,
2050 final GetRequest request) throws ServiceException {
2051 long before = EnvironmentEdgeManager.currentTime();
2052 OperationQuota quota = null;
2053 try {
2054 checkOpen();
2055 requestCount.increment();
2056 Region region = getRegion(request.getRegion());
2057
2058 GetResponse.Builder builder = GetResponse.newBuilder();
2059 ClientProtos.Get get = request.getGet();
2060 Boolean existence = null;
2061 Result r = null;
2062 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2063
2064 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2065 if (get.getColumnCount() != 1) {
2066 throw new DoNotRetryIOException(
2067 "get ClosestRowBefore supports one and only one family now, not "
2068 + get.getColumnCount() + " families");
2069 }
2070 byte[] row = get.getRow().toByteArray();
2071 byte[] family = get.getColumn(0).getFamily().toByteArray();
2072 r = region.getClosestRowBefore(row, family);
2073 } else {
2074 Get clientGet = ProtobufUtil.toGet(get);
2075 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2076 existence = region.getCoprocessorHost().preExists(clientGet);
2077 }
2078 if (existence == null) {
2079 r = region.get(clientGet);
2080 if (get.getExistenceOnly()) {
2081 boolean exists = r.getExists();
2082 if (region.getCoprocessorHost() != null) {
2083 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2084 }
2085 existence = exists;
2086 }
2087 }
2088 }
2089 if (existence != null){
2090 ClientProtos.Result pbr =
2091 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2092 builder.setResult(pbr);
2093 } else if (r != null) {
2094 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2095 builder.setResult(pbr);
2096 }
2097
2098 if (r != null && r.rawCells() != null) {
2099 quota.addGetResult(r);
2100 }
2101 return builder.build();
2102 } catch (IOException ie) {
2103 throw new ServiceException(ie);
2104 } finally {
2105 if (regionServer.metricsRegionServer != null) {
2106 regionServer.metricsRegionServer.updateGet(
2107 EnvironmentEdgeManager.currentTime() - before);
2108 }
2109 if (quota != null) {
2110 quota.close();
2111 }
2112 }
2113 }
2114
2115
2116
2117
2118
2119
2120
2121
2122 @Override
2123 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2124 throws ServiceException {
2125 try {
2126 checkOpen();
2127 } catch (IOException ie) {
2128 throw new ServiceException(ie);
2129 }
2130
2131
2132
2133 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2134 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2135 if (controller != null) {
2136 controller.setCellScanner(null);
2137 }
2138
2139 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2140
2141
2142 List<CellScannable> cellsToReturn = null;
2143 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2144 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2145 Boolean processed = null;
2146
2147 for (RegionAction regionAction : request.getRegionActionList()) {
2148 this.requestCount.add(regionAction.getActionCount());
2149 OperationQuota quota;
2150 Region region;
2151 regionActionResultBuilder.clear();
2152 try {
2153 region = getRegion(regionAction.getRegion());
2154 quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2155 } catch (IOException e) {
2156 rpcServer.getMetrics().exception(e);
2157 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2158 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2159
2160
2161
2162 skipCellsForMutations(regionAction.getActionList(), cellScanner);
2163 continue;
2164 }
2165
2166 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2167
2168
2169 try {
2170 if (request.hasCondition()) {
2171 Condition condition = request.getCondition();
2172 byte[] row = condition.getRow().toByteArray();
2173 byte[] family = condition.getFamily().toByteArray();
2174 byte[] qualifier = condition.getQualifier().toByteArray();
2175 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2176 ByteArrayComparable comparator =
2177 ProtobufUtil.toComparator(condition.getComparator());
2178 processed = checkAndRowMutate(region, regionAction.getActionList(),
2179 cellScanner, row, family, qualifier, compareOp, comparator);
2180 } else {
2181 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2182 cellScanner);
2183
2184 if(stats != null) {
2185 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2186 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2187 }
2188 processed = Boolean.TRUE;
2189 }
2190 } catch (IOException e) {
2191 rpcServer.getMetrics().exception(e);
2192
2193 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2194 }
2195 } else {
2196
2197 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2198 regionActionResultBuilder, cellsToReturn, nonceGroup);
2199 }
2200 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2201 quota.close();
2202 }
2203
2204 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2205 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2206 }
2207 if (processed != null) {
2208 responseBuilder.setProcessed(processed);
2209 }
2210 return responseBuilder.build();
2211 }
2212
2213 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2214 if (cellScanner == null) {
2215 return;
2216 }
2217 for (Action action : actions) {
2218 skipCellsForMutation(action, cellScanner);
2219 }
2220 }
2221
2222 private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2223 if (cellScanner == null) {
2224 return;
2225 }
2226 try {
2227 if (action.hasMutation()) {
2228 MutationProto m = action.getMutation();
2229 if (m.hasAssociatedCellCount()) {
2230 for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2231 cellScanner.advance();
2232 }
2233 }
2234 }
2235 } catch (IOException e) {
2236
2237
2238
2239 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2240 }
2241 }
2242
2243
2244
2245
2246
2247
2248
2249
2250 @Override
2251 public MutateResponse mutate(final RpcController rpcc,
2252 final MutateRequest request) throws ServiceException {
2253
2254
2255 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2256 CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2257 OperationQuota quota = null;
2258
2259 if (controller != null) {
2260 controller.setCellScanner(null);
2261 }
2262 try {
2263 checkOpen();
2264 requestCount.increment();
2265 Region region = getRegion(request.getRegion());
2266 MutateResponse.Builder builder = MutateResponse.newBuilder();
2267 MutationProto mutation = request.getMutation();
2268 if (!region.getRegionInfo().isMetaTable()) {
2269 regionServer.cacheFlusher.reclaimMemStoreMemory();
2270 }
2271 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2272 Result r = null;
2273 Boolean processed = null;
2274 MutationType type = mutation.getMutateType();
2275 long mutationSize = 0;
2276 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2277 switch (type) {
2278 case APPEND:
2279
2280 r = append(region, quota, mutation, cellScanner, nonceGroup);
2281 break;
2282 case INCREMENT:
2283
2284 r = increment(region, quota, mutation, cellScanner, nonceGroup);
2285 break;
2286 case PUT:
2287 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2288 quota.addMutation(put);
2289 if (request.hasCondition()) {
2290 Condition condition = request.getCondition();
2291 byte[] row = condition.getRow().toByteArray();
2292 byte[] family = condition.getFamily().toByteArray();
2293 byte[] qualifier = condition.getQualifier().toByteArray();
2294 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2295 ByteArrayComparable comparator =
2296 ProtobufUtil.toComparator(condition.getComparator());
2297 if (region.getCoprocessorHost() != null) {
2298 processed = region.getCoprocessorHost().preCheckAndPut(
2299 row, family, qualifier, compareOp, comparator, put);
2300 }
2301 if (processed == null) {
2302 boolean result = region.checkAndMutate(row, family,
2303 qualifier, compareOp, comparator, put, true);
2304 if (region.getCoprocessorHost() != null) {
2305 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2306 qualifier, compareOp, comparator, put, result);
2307 }
2308 processed = result;
2309 }
2310 } else {
2311 region.put(put);
2312 processed = Boolean.TRUE;
2313 }
2314 break;
2315 case DELETE:
2316 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2317 quota.addMutation(delete);
2318 if (request.hasCondition()) {
2319 Condition condition = request.getCondition();
2320 byte[] row = condition.getRow().toByteArray();
2321 byte[] family = condition.getFamily().toByteArray();
2322 byte[] qualifier = condition.getQualifier().toByteArray();
2323 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2324 ByteArrayComparable comparator =
2325 ProtobufUtil.toComparator(condition.getComparator());
2326 if (region.getCoprocessorHost() != null) {
2327 processed = region.getCoprocessorHost().preCheckAndDelete(
2328 row, family, qualifier, compareOp, comparator, delete);
2329 }
2330 if (processed == null) {
2331 boolean result = region.checkAndMutate(row, family,
2332 qualifier, compareOp, comparator, delete, true);
2333 if (region.getCoprocessorHost() != null) {
2334 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2335 qualifier, compareOp, comparator, delete, result);
2336 }
2337 processed = result;
2338 }
2339 } else {
2340 region.delete(delete);
2341 processed = Boolean.TRUE;
2342 }
2343 break;
2344 default:
2345 throw new DoNotRetryIOException(
2346 "Unsupported mutate type: " + type.name());
2347 }
2348 if (processed != null) builder.setProcessed(processed.booleanValue());
2349 addResult(builder, r, controller);
2350 return builder.build();
2351 } catch (IOException ie) {
2352 regionServer.checkFileSystem();
2353 throw new ServiceException(ie);
2354 } finally {
2355 if (quota != null) {
2356 quota.close();
2357 }
2358 }
2359 }
2360
2361
2362
2363
2364
2365
2366
2367
2368 @Override
2369 public ScanResponse scan(final RpcController controller, final ScanRequest request)
2370 throws ServiceException {
2371 OperationQuota quota = null;
2372 Leases.Lease lease = null;
2373 String scannerName = null;
2374 try {
2375 if (!request.hasScannerId() && !request.hasScan()) {
2376 throw new DoNotRetryIOException(
2377 "Missing required input: scannerId or scan");
2378 }
2379 long scannerId = -1;
2380 if (request.hasScannerId()) {
2381 scannerId = request.getScannerId();
2382 scannerName = String.valueOf(scannerId);
2383 }
2384 try {
2385 checkOpen();
2386 } catch (IOException e) {
2387
2388
2389 if (scannerName != null) {
2390 LOG.debug("Server shutting down and client tried to access missing scanner "
2391 + scannerName);
2392 if (regionServer.leases != null) {
2393 try {
2394 regionServer.leases.cancelLease(scannerName);
2395 } catch (LeaseException le) {
2396
2397 if (LOG.isTraceEnabled()) {
2398 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2399 }
2400 }
2401 }
2402 }
2403 throw e;
2404 }
2405 requestCount.increment();
2406
2407 int ttl = 0;
2408 Region region = null;
2409 RegionScanner scanner = null;
2410 RegionScannerHolder rsh = null;
2411 boolean moreResults = true;
2412 boolean closeScanner = false;
2413 boolean isSmallScan = false;
2414 RpcCallContext context = RpcServer.getCurrentCall();
2415 Object lastBlock = null;
2416
2417 ScanResponse.Builder builder = ScanResponse.newBuilder();
2418 if (request.hasCloseScanner()) {
2419 closeScanner = request.getCloseScanner();
2420 }
2421 int rows = closeScanner ? 0 : 1;
2422 if (request.hasNumberOfRows()) {
2423 rows = request.getNumberOfRows();
2424 }
2425 if (request.hasScannerId()) {
2426 rsh = scanners.get(scannerName);
2427 if (rsh == null) {
2428 LOG.warn("Client tried to access missing scanner " + scannerName);
2429 throw new UnknownScannerException(
2430 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2431 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2432 + "long wait between consecutive client checkins, c) Server may be closing down, "
2433 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2434 + "possible fix would be increasing the value of"
2435 + "'hbase.client.scanner.timeout.period' configuration.");
2436 }
2437 scanner = rsh.s;
2438 HRegionInfo hri = scanner.getRegionInfo();
2439 region = regionServer.getRegion(hri.getRegionName());
2440 if (region != rsh.r) {
2441 throw new NotServingRegionException("Region was re-opened after the scanner"
2442 + scannerName + " was created: " + hri.getRegionNameAsString());
2443 }
2444 } else {
2445 region = getRegion(request.getRegion());
2446 ClientProtos.Scan protoScan = request.getScan();
2447 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2448 Scan scan = ProtobufUtil.toScan(protoScan);
2449
2450 if (!isLoadingCfsOnDemandSet) {
2451 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2452 }
2453
2454 isSmallScan = scan.isSmall();
2455 if (!scan.hasFamilies()) {
2456
2457 for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2458 scan.addFamily(family);
2459 }
2460 }
2461
2462 if (region.getCoprocessorHost() != null) {
2463 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2464 }
2465 if (scanner == null) {
2466 scanner = region.getScanner(scan);
2467 }
2468 if (region.getCoprocessorHost() != null) {
2469 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2470 }
2471 scannerId = addScanner(scanner, region);
2472 scannerName = String.valueOf(scannerId);
2473 ttl = this.scannerLeaseTimeoutPeriod;
2474 }
2475 if (request.hasRenew() && request.getRenew()) {
2476 rsh = scanners.get(scannerName);
2477 lease = regionServer.leases.removeLease(scannerName);
2478 if (lease != null && rsh != null) {
2479 regionServer.leases.addLease(lease);
2480
2481 rsh.incNextCallSeq();
2482 }
2483 return builder.build();
2484 }
2485
2486 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2487 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2488 if (rows > 0) {
2489
2490
2491
2492 if (request.hasNextCallSeq()) {
2493 if (rsh == null) {
2494 rsh = scanners.get(scannerName);
2495 }
2496 if (rsh != null) {
2497 if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2498 throw new OutOfOrderScannerNextException(
2499 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2500 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2501 "; request=" + TextFormat.shortDebugString(request));
2502 }
2503
2504 rsh.incNextCallSeq();
2505 }
2506 }
2507 try {
2508
2509
2510 lease = regionServer.leases.removeLease(scannerName);
2511 List<Result> results = new ArrayList<Result>();
2512
2513 boolean done = false;
2514
2515 if (region != null && region.getCoprocessorHost() != null) {
2516 Boolean bypass = region.getCoprocessorHost().preScannerNext(
2517 scanner, results, rows);
2518 if (!results.isEmpty()) {
2519 for (Result r : results) {
2520 lastBlock = addSize(context, r, lastBlock);
2521 }
2522 }
2523 if (bypass != null && bypass.booleanValue()) {
2524 done = true;
2525 }
2526 }
2527
2528 if (!done) {
2529 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2530 if (maxResultSize <= 0) {
2531 maxResultSize = maxQuotaResultSize;
2532 }
2533
2534
2535
2536 List<Cell> values = new ArrayList<Cell>(32);
2537 region.startRegionOperation(Operation.SCAN);
2538 try {
2539 int i = 0;
2540 synchronized(scanner) {
2541 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2542 boolean clientHandlesPartials =
2543 request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2544 boolean clientHandlesHeartbeats =
2545 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2546
2547
2548
2549
2550
2551
2552 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2553 boolean allowPartialResults =
2554 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2555 boolean moreRows = false;
2556
2557
2558
2559
2560
2561
2562
2563
2564 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2565
2566
2567
2568 long timeLimit = -1;
2569
2570
2571
2572
2573 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2574 long timeLimitDelta;
2575 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2576 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2577 } else {
2578 timeLimitDelta =
2579 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2580 }
2581
2582
2583
2584 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2585 timeLimit = System.currentTimeMillis() + timeLimitDelta;
2586 }
2587
2588 final LimitScope sizeScope =
2589 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2590 final LimitScope timeScope =
2591 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2592
2593 boolean trackMetrics =
2594 request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2595
2596
2597
2598 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2599 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2600 contextBuilder.setBatchLimit(scanner.getBatch());
2601 contextBuilder.setTimeLimit(timeScope, timeLimit);
2602 contextBuilder.setTrackMetrics(trackMetrics);
2603 ScannerContext scannerContext = contextBuilder.build();
2604
2605 boolean limitReached = false;
2606 while (i < rows) {
2607
2608
2609
2610
2611
2612 scannerContext.setBatchProgress(0);
2613
2614
2615 moreRows = scanner.nextRaw(values, scannerContext);
2616
2617 if (!values.isEmpty()) {
2618 final boolean partial = scannerContext.partialResultFormed();
2619 Result r = Result.create(values, null, stale, partial);
2620 lastBlock = addSize(context, r, lastBlock);
2621 results.add(r);
2622 i++;
2623 }
2624
2625 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2626 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2627 boolean rowLimitReached = i >= rows;
2628 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2629
2630 if (limitReached || !moreRows) {
2631 if (LOG.isTraceEnabled()) {
2632 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2633 + moreRows + " scannerContext: " + scannerContext);
2634 }
2635
2636
2637
2638
2639 if (moreRows) {
2640
2641 builder.setHeartbeatMessage(timeLimitReached);
2642 }
2643 break;
2644 }
2645 values.clear();
2646 }
2647
2648 if (limitReached || moreRows) {
2649
2650 builder.setMoreResultsInRegion(true);
2651 } else {
2652
2653 builder.setMoreResultsInRegion(false);
2654 }
2655
2656
2657
2658 if (trackMetrics) {
2659 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2660 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2661 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2662
2663 for (Entry<String, Long> entry : metrics.entrySet()) {
2664 pairBuilder.setName(entry.getKey());
2665 pairBuilder.setValue(entry.getValue());
2666 metricBuilder.addMetrics(pairBuilder.build());
2667 }
2668
2669 builder.setScanMetrics(metricBuilder.build());
2670 }
2671 }
2672 region.updateReadRequestsCount(i);
2673 long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2674 region.getMetrics().updateScanNext(responseCellSize);
2675 if (regionServer.metricsRegionServer != null) {
2676 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2677 }
2678 } finally {
2679 region.closeRegionOperation();
2680 }
2681
2682
2683 if (region != null && region.getCoprocessorHost() != null) {
2684 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2685 }
2686 }
2687
2688 quota.addScanResult(results);
2689
2690
2691
2692
2693 if (scanner.isFilterDone() && results.isEmpty()) {
2694 moreResults = false;
2695 results = null;
2696 } else {
2697 addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2698 }
2699 } catch (IOException e) {
2700
2701
2702
2703 closeScanner(region, scanner, scannerName);
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716 if (e instanceof DoNotRetryIOException) {
2717 throw e;
2718 }
2719
2720
2721
2722 if (e instanceof FileNotFoundException) {
2723 throw new DoNotRetryIOException(e);
2724 }
2725
2726
2727
2728
2729 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2730
2731 throw new ScannerResetException("Scanner is closed on the server-side", e);
2732 } else {
2733
2734 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2735 + " scanner state for clients older than 1.3.", e);
2736 }
2737 } finally {
2738
2739
2740 if (scanners.containsKey(scannerName)) {
2741 if (lease != null) regionServer.leases.addLease(lease);
2742 ttl = this.scannerLeaseTimeoutPeriod;
2743 }
2744 }
2745 }
2746
2747 if (!moreResults || closeScanner) {
2748 ttl = 0;
2749 moreResults = false;
2750 closeScanner(region, scanner, scannerName);
2751 }
2752
2753 if (ttl > 0) {
2754 builder.setTtl(ttl);
2755 }
2756 builder.setScannerId(scannerId);
2757 builder.setMoreResults(moreResults);
2758 return builder.build();
2759 } catch (IOException ie) {
2760 if (scannerName != null && ie instanceof NotServingRegionException) {
2761 RegionScannerHolder rsh = scanners.remove(scannerName);
2762 if (rsh != null) {
2763 try {
2764 RegionScanner scanner = rsh.s;
2765 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2766 scanner.close();
2767 regionServer.leases.cancelLease(scannerName);
2768 } catch (IOException e) {
2769 LOG.warn("Getting exception closing " + scannerName, e);
2770 }
2771 }
2772 }
2773 throw new ServiceException(ie);
2774 } finally {
2775 if (quota != null) {
2776 quota.close();
2777 }
2778 }
2779 }
2780
2781 private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2782 throws IOException {
2783 if (region != null && region.getCoprocessorHost() != null) {
2784 if (region.getCoprocessorHost().preScannerClose(scanner)) {
2785 return true;
2786 }
2787 }
2788 RegionScannerHolder rsh = scanners.remove(scannerName);
2789 if (rsh != null) {
2790 scanner = rsh.s;
2791 scanner.close();
2792 try {
2793 regionServer.leases.cancelLease(scannerName);
2794 } catch (LeaseException le) {
2795
2796 if (LOG.isTraceEnabled()) {
2797 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2798 }
2799 }
2800 if (region != null && region.getCoprocessorHost() != null) {
2801 region.getCoprocessorHost().postScannerClose(scanner);
2802 }
2803 }
2804 return false;
2805 }
2806
2807 @Override
2808 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2809 CoprocessorServiceRequest request) throws ServiceException {
2810 return regionServer.execRegionServerService(controller, request);
2811 }
2812
2813 @Override
2814 public UpdateConfigurationResponse updateConfiguration(
2815 RpcController controller, UpdateConfigurationRequest request)
2816 throws ServiceException {
2817 try {
2818 this.regionServer.updateConfiguration();
2819 } catch (Exception e) {
2820 throw new ServiceException(e);
2821 }
2822 return UpdateConfigurationResponse.getDefaultInstance();
2823 }
2824 }