1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.NavigableMap;
32 import java.util.Set;
33 import java.util.TreeSet;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellScannable;
42 import org.apache.hadoop.hbase.CellScanner;
43 import org.apache.hadoop.hbase.CellUtil;
44 import org.apache.hadoop.hbase.DoNotRetryIOException;
45 import org.apache.hadoop.hbase.DroppedSnapshotException;
46 import org.apache.hadoop.hbase.HBaseIOException;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.MetaTableAccessor;
51 import org.apache.hadoop.hbase.NotServingRegionException;
52 import org.apache.hadoop.hbase.ServerName;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.UnknownScannerException;
55 import org.apache.hadoop.hbase.classification.InterfaceAudience;
56 import org.apache.hadoop.hbase.client.Append;
57 import org.apache.hadoop.hbase.client.ConnectionUtils;
58 import org.apache.hadoop.hbase.client.Delete;
59 import org.apache.hadoop.hbase.client.Durability;
60 import org.apache.hadoop.hbase.client.Get;
61 import org.apache.hadoop.hbase.client.Increment;
62 import org.apache.hadoop.hbase.client.Mutation;
63 import org.apache.hadoop.hbase.client.Put;
64 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
65 import org.apache.hadoop.hbase.client.Result;
66 import org.apache.hadoop.hbase.client.RowMutations;
67 import org.apache.hadoop.hbase.client.Scan;
68 import org.apache.hadoop.hbase.client.VersionInfoUtil;
69 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
72 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
73 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
74 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
75 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
76 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
77 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
78 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
79 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
80 import org.apache.hadoop.hbase.ipc.PriorityFunction;
81 import org.apache.hadoop.hbase.ipc.QosPriority;
82 import org.apache.hadoop.hbase.ipc.RpcCallContext;
83 import org.apache.hadoop.hbase.ipc.RpcServer;
84 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
85 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
86 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
87 import org.apache.hadoop.hbase.ipc.ServerRpcController;
88 import org.apache.hadoop.hbase.master.MasterRpcServices;
89 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90 import org.apache.hadoop.hbase.protobuf.RequestConverter;
91 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
95 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
129 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
150 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
151 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
153 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
154 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
155 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
156 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
157 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
158 import org.apache.hadoop.hbase.quotas.OperationQuota;
159 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
160 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
161 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
162 import org.apache.hadoop.hbase.regionserver.Region.Operation;
163 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
164 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
165 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
166 import org.apache.hadoop.hbase.wal.WAL;
167 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
168 import org.apache.hadoop.hbase.util.Bytes;
169 import org.apache.hadoop.hbase.util.Counter;
170 import org.apache.hadoop.hbase.util.DNS;
171 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
172 import org.apache.hadoop.hbase.util.Pair;
173 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
174 import org.apache.hadoop.hbase.util.Strings;
175 import org.apache.hadoop.hbase.wal.WALKey;
176 import org.apache.hadoop.hbase.wal.WALSplitter;
177 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
178 import org.apache.zookeeper.KeeperException;
179
180 import com.google.common.annotations.VisibleForTesting;
181 import com.google.protobuf.ByteString;
182 import com.google.protobuf.Message;
183 import com.google.protobuf.RpcController;
184 import com.google.protobuf.ServiceException;
185 import com.google.protobuf.TextFormat;
186
187
188
189
190 @InterfaceAudience.Private
191 @SuppressWarnings("deprecation")
192 public class RSRpcServices implements HBaseRPCErrorHandler,
193 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction {
194 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
195
196
197 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
198 "hbase.region.server.rpc.scheduler.factory.class";
199
200
201
202
203
204
205 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
206 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
207
208
209
210 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
211
212
213 final Counter requestCount = new Counter();
214
215 final RpcServerInterface rpcServer;
216 final InetSocketAddress isa;
217
218 private final HRegionServer regionServer;
219 private final long maxScannerResultSize;
220
221
222 private final PriorityFunction priority;
223
224 private final AtomicLong scannerIdGen = new AtomicLong(0L);
225 private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
226 new ConcurrentHashMap<String, RegionScannerHolder>();
227
228
229
230
231 private final int scannerLeaseTimeoutPeriod;
232
233
234
235
236 private final int rpcTimeout;
237
238
239
240
241 private final long minimumScanTimeLimitDelta;
242
243
244
245
246 private static class RegionScannerHolder {
247 private AtomicLong nextCallSeq = new AtomicLong(0);
248 private RegionScanner s;
249 private Region r;
250
251 public RegionScannerHolder(RegionScanner s, Region r) {
252 this.s = s;
253 this.r = r;
254 }
255
256 private long getNextCallSeq() {
257 return nextCallSeq.get();
258 }
259
260 private void incNextCallSeq() {
261 nextCallSeq.incrementAndGet();
262 }
263
264 private void rollbackNextCallSeq() {
265 nextCallSeq.decrementAndGet();
266 }
267 }
268
269
270
271
272
273 private class ScannerListener implements LeaseListener {
274 private final String scannerName;
275
276 ScannerListener(final String n) {
277 this.scannerName = n;
278 }
279
280 @Override
281 public void leaseExpired() {
282 RegionScannerHolder rsh = scanners.remove(this.scannerName);
283 if (rsh != null) {
284 RegionScanner s = rsh.s;
285 LOG.info("Scanner " + this.scannerName + " lease expired on region "
286 + s.getRegionInfo().getRegionNameAsString());
287 Region region = null;
288 try {
289 region = regionServer.getRegion(s.getRegionInfo().getRegionName());
290 if (region != null && region.getCoprocessorHost() != null) {
291 region.getCoprocessorHost().preScannerClose(s);
292 }
293 } catch (IOException e) {
294 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
295 } finally {
296 try {
297 s.close();
298 if (region != null && region.getCoprocessorHost() != null) {
299 region.getCoprocessorHost().postScannerClose(s);
300 }
301 } catch (IOException e) {
302 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
303 }
304 }
305 } else {
306 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
307 " scanner found, hence no chance to close that related scanner!");
308 }
309 }
310 }
311
312 private static ResultOrException getResultOrException(
313 final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
314 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
315 }
316
317 private static ResultOrException getResultOrException(final Exception e, final int index) {
318 return getResultOrException(ResponseConverter.buildActionResult(e), index);
319 }
320
321 private static ResultOrException getResultOrException(
322 final ResultOrException.Builder builder, final int index) {
323 return builder.setIndex(index).build();
324 }
325
326
327
328
329
330
331
332 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
333 throws IOException, OperationConflictException {
334 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
335 boolean canProceed = false;
336 try {
337 canProceed = regionServer.nonceManager.startOperation(
338 nonceGroup, mutation.getNonce(), regionServer);
339 } catch (InterruptedException ex) {
340 throw new InterruptedIOException("Nonce start operation interrupted");
341 }
342 if (!canProceed) {
343
344 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
345 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
346 + "] may have already completed";
347 throw new OperationConflictException(message);
348 }
349 return mutation.getNonce();
350 }
351
352
353
354
355
356
357
358 private void endNonceOperation(final MutationProto mutation,
359 long nonceGroup, boolean success) {
360 if (regionServer.nonceManager != null && mutation.hasNonce()) {
361 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
362 }
363 }
364
365
366
367
368 private boolean isClientCellBlockSupport() {
369 RpcCallContext context = RpcServer.getCurrentCall();
370 return context != null && context.isClientCellBlockSupport();
371 }
372
373 private void addResult(final MutateResponse.Builder builder,
374 final Result result, final PayloadCarryingRpcController rpcc) {
375 if (result == null) return;
376 if (isClientCellBlockSupport()) {
377 builder.setResult(ProtobufUtil.toResultNoData(result));
378 rpcc.setCellScanner(result.cellScanner());
379 } else {
380 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
381 builder.setResult(pbr);
382 }
383 }
384
385 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
386 final RpcController controller, boolean isDefaultRegion) {
387 builder.setStale(!isDefaultRegion);
388 if (results == null || results.isEmpty()) return;
389 if (isClientCellBlockSupport()) {
390 for (Result res : results) {
391 builder.addCellsPerResult(res.size());
392 builder.addPartialFlagPerResult(res.isPartial());
393 }
394 ((PayloadCarryingRpcController)controller).
395 setCellScanner(CellUtil.createCellScanner(results));
396 } else {
397 for (Result res: results) {
398 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
399 builder.addResults(pbr);
400 }
401 }
402 }
403
404
405
406
407
408
409
410
411
412 private ClientProtos.RegionLoadStats mutateRows(final Region region,
413 final List<ClientProtos.Action> actions,
414 final CellScanner cellScanner) throws IOException {
415 if (!region.getRegionInfo().isMetaTable()) {
416 regionServer.cacheFlusher.reclaimMemStoreMemory();
417 }
418 RowMutations rm = null;
419 for (ClientProtos.Action action: actions) {
420 if (action.hasGet()) {
421 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
422 action.getGet());
423 }
424 MutationType type = action.getMutation().getMutateType();
425 if (rm == null) {
426 rm = new RowMutations(action.getMutation().getRow().toByteArray());
427 }
428 switch (type) {
429 case PUT:
430 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
431 break;
432 case DELETE:
433 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
434 break;
435 default:
436 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
437 }
438 }
439 region.mutateRow(rm);
440 return ((HRegion)region).getRegionStats();
441 }
442
443
444
445
446
447
448
449
450
451
452
453
454
455 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
456 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
457 CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
458 if (!region.getRegionInfo().isMetaTable()) {
459 regionServer.cacheFlusher.reclaimMemStoreMemory();
460 }
461 RowMutations rm = null;
462 for (ClientProtos.Action action: actions) {
463 if (action.hasGet()) {
464 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
465 action.getGet());
466 }
467 MutationType type = action.getMutation().getMutateType();
468 if (rm == null) {
469 rm = new RowMutations(action.getMutation().getRow().toByteArray());
470 }
471 switch (type) {
472 case PUT:
473 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
474 break;
475 case DELETE:
476 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
477 break;
478 default:
479 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
480 }
481 }
482 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
483 }
484
485
486
487
488
489
490
491
492
493
494
495 private Result append(final Region region, final OperationQuota quota, final MutationProto m,
496 final CellScanner cellScanner, long nonceGroup) throws IOException {
497 long before = EnvironmentEdgeManager.currentTime();
498 Append append = ProtobufUtil.toAppend(m, cellScanner);
499 quota.addMutation(append);
500 Result r = null;
501 if (region.getCoprocessorHost() != null) {
502 r = region.getCoprocessorHost().preAppend(append);
503 }
504 if (r == null) {
505 long nonce = startNonceOperation(m, nonceGroup);
506 boolean success = false;
507 try {
508 r = region.append(append, nonceGroup, nonce);
509 success = true;
510 } finally {
511 endNonceOperation(m, nonceGroup, success);
512 }
513 if (region.getCoprocessorHost() != null) {
514 region.getCoprocessorHost().postAppend(append, r);
515 }
516 }
517 if (regionServer.metricsRegionServer != null) {
518 regionServer.metricsRegionServer.updateAppend(
519 EnvironmentEdgeManager.currentTime() - before);
520 }
521 return r;
522 }
523
524
525
526
527
528
529
530
531
532 private Result increment(final Region region, final OperationQuota quota,
533 final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
534 long before = EnvironmentEdgeManager.currentTime();
535 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
536 quota.addMutation(increment);
537 Result r = null;
538 if (region.getCoprocessorHost() != null) {
539 r = region.getCoprocessorHost().preIncrement(increment);
540 }
541 if (r == null) {
542 long nonce = startNonceOperation(mutation, nonceGroup);
543 boolean success = false;
544 try {
545 r = region.increment(increment, nonceGroup, nonce);
546 success = true;
547 } finally {
548 endNonceOperation(mutation, nonceGroup, success);
549 }
550 if (region.getCoprocessorHost() != null) {
551 r = region.getCoprocessorHost().postIncrement(increment, r);
552 }
553 }
554 if (regionServer.metricsRegionServer != null) {
555 regionServer.metricsRegionServer.updateIncrement(
556 EnvironmentEdgeManager.currentTime() - before);
557 }
558 return r;
559 }
560
561
562
563
564
565
566
567
568
569
570
571
572 private List<CellScannable> doNonAtomicRegionMutation(final Region region,
573 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
574 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
575
576
577
578
579 List<ClientProtos.Action> mutations = null;
580 for (ClientProtos.Action action: actions.getActionList()) {
581 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
582 try {
583 Result r = null;
584 if (action.hasGet()) {
585 long before = EnvironmentEdgeManager.currentTime();
586 try {
587 Get get = ProtobufUtil.toGet(action.getGet());
588 r = region.get(get);
589 } finally {
590 if (regionServer.metricsRegionServer != null) {
591 regionServer.metricsRegionServer.updateGet(
592 EnvironmentEdgeManager.currentTime() - before);
593 }
594 }
595 } else if (action.hasServiceCall()) {
596 resultOrExceptionBuilder = ResultOrException.newBuilder();
597 try {
598 Message result = execServiceOnRegion(region, action.getServiceCall());
599 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
600 ClientProtos.CoprocessorServiceResult.newBuilder();
601 resultOrExceptionBuilder.setServiceResult(
602 serviceResultBuilder.setValue(
603 serviceResultBuilder.getValueBuilder()
604 .setName(result.getClass().getName())
605 .setValue(result.toByteString())));
606 } catch (IOException ioe) {
607 rpcServer.getMetrics().exception(ioe);
608 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
609 }
610 } else if (action.hasMutation()) {
611 MutationType type = action.getMutation().getMutateType();
612 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
613 !mutations.isEmpty()) {
614
615 doBatchOp(builder, region, quota, mutations, cellScanner);
616 mutations.clear();
617 }
618 switch (type) {
619 case APPEND:
620 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
621 break;
622 case INCREMENT:
623 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
624 break;
625 case PUT:
626 case DELETE:
627
628 if (mutations == null) {
629 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
630 }
631 mutations.add(action);
632 break;
633 default:
634 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
635 }
636 } else {
637 throw new HBaseIOException("Unexpected Action type");
638 }
639 if (r != null) {
640 ClientProtos.Result pbResult = null;
641 if (isClientCellBlockSupport()) {
642 pbResult = ProtobufUtil.toResultNoData(r);
643
644 if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
645 cellsToReturn.add(r);
646 } else {
647 pbResult = ProtobufUtil.toResult(r);
648 }
649 resultOrExceptionBuilder =
650 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
651 }
652
653
654
655
656 } catch (IOException ie) {
657 rpcServer.getMetrics().exception(ie);
658 resultOrExceptionBuilder = ResultOrException.newBuilder().
659 setException(ResponseConverter.buildException(ie));
660 }
661 if (resultOrExceptionBuilder != null) {
662
663 resultOrExceptionBuilder.setIndex(action.getIndex());
664 builder.addResultOrException(resultOrExceptionBuilder.build());
665 }
666 }
667
668 if (mutations != null && !mutations.isEmpty()) {
669 doBatchOp(builder, region, quota, mutations, cellScanner);
670 }
671 return cellsToReturn;
672 }
673
674
675
676
677
678
679
680
681 private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
682 final OperationQuota quota,
683 final List<ClientProtos.Action> mutations, final CellScanner cells) {
684 Mutation[] mArray = new Mutation[mutations.size()];
685 long before = EnvironmentEdgeManager.currentTime();
686 boolean batchContainsPuts = false, batchContainsDelete = false;
687 try {
688 int i = 0;
689 for (ClientProtos.Action action: mutations) {
690 MutationProto m = action.getMutation();
691 Mutation mutation;
692 if (m.getMutateType() == MutationType.PUT) {
693 mutation = ProtobufUtil.toPut(m, cells);
694 batchContainsPuts = true;
695 } else {
696 mutation = ProtobufUtil.toDelete(m, cells);
697 batchContainsDelete = true;
698 }
699 mArray[i++] = mutation;
700 quota.addMutation(mutation);
701 }
702
703 if (!region.getRegionInfo().isMetaTable()) {
704 regionServer.cacheFlusher.reclaimMemStoreMemory();
705 }
706
707 OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE,
708 HConstants.NO_NONCE);
709 for (i = 0; i < codes.length; i++) {
710 int index = mutations.get(i).getIndex();
711 Exception e = null;
712 switch (codes[i].getOperationStatusCode()) {
713 case BAD_FAMILY:
714 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
715 builder.addResultOrException(getResultOrException(e, index));
716 break;
717
718 case SANITY_CHECK_FAILURE:
719 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
720 builder.addResultOrException(getResultOrException(e, index));
721 break;
722
723 default:
724 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
725 builder.addResultOrException(getResultOrException(e, index));
726 break;
727
728 case SUCCESS:
729 builder.addResultOrException(getResultOrException(
730 ClientProtos.Result.getDefaultInstance(), index,
731 ((HRegion)region).getRegionStats()));
732 break;
733 }
734 }
735 } catch (IOException ie) {
736 for (int i = 0; i < mutations.size(); i++) {
737 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
738 }
739 }
740 if (regionServer.metricsRegionServer != null) {
741 long after = EnvironmentEdgeManager.currentTime();
742 if (batchContainsPuts) {
743 regionServer.metricsRegionServer.updatePut(after - before);
744 }
745 if (batchContainsDelete) {
746 regionServer.metricsRegionServer.updateDelete(after - before);
747 }
748 }
749 }
750
751
752
753
754
755
756
757
758
759
760
761 private OperationStatus [] doReplayBatchOp(final Region region,
762 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
763 long before = EnvironmentEdgeManager.currentTime();
764 boolean batchContainsPuts = false, batchContainsDelete = false;
765 try {
766 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
767 WALSplitter.MutationReplay m = it.next();
768
769 if (m.type == MutationType.PUT) {
770 batchContainsPuts = true;
771 } else {
772 batchContainsDelete = true;
773 }
774
775 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
776 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
777 if (metaCells != null && !metaCells.isEmpty()) {
778 for (Cell metaCell : metaCells) {
779 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
780 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
781 HRegion hRegion = (HRegion)region;
782 if (compactionDesc != null) {
783
784
785 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
786 replaySeqId);
787 continue;
788 }
789 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
790 if (flushDesc != null && !isDefaultReplica) {
791 hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
792 continue;
793 }
794 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
795 if (regionEvent != null && !isDefaultReplica) {
796 hRegion.replayWALRegionEventMarker(regionEvent);
797 continue;
798 }
799 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
800 if (bulkLoadEvent != null) {
801 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
802 continue;
803 }
804 }
805 it.remove();
806 }
807 }
808 requestCount.add(mutations.size());
809 if (!region.getRegionInfo().isMetaTable()) {
810 regionServer.cacheFlusher.reclaimMemStoreMemory();
811 }
812 return region.batchReplay(mutations.toArray(
813 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
814 } finally {
815 if (regionServer.metricsRegionServer != null) {
816 long after = EnvironmentEdgeManager.currentTime();
817 if (batchContainsPuts) {
818 regionServer.metricsRegionServer.updatePut(after - before);
819 }
820 if (batchContainsDelete) {
821 regionServer.metricsRegionServer.updateDelete(after - before);
822 }
823 }
824 }
825 }
826
827 private void closeAllScanners() {
828
829
830 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
831 try {
832 e.getValue().s.close();
833 } catch (IOException ioe) {
834 LOG.warn("Closing scanner " + e.getKey(), ioe);
835 }
836 }
837 }
838
839 public RSRpcServices(HRegionServer rs) throws IOException {
840 regionServer = rs;
841
842 RpcSchedulerFactory rpcSchedulerFactory;
843 try {
844 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
845 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
846 SimpleRpcSchedulerFactory.class);
847 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
848 } catch (InstantiationException e) {
849 throw new IllegalArgumentException(e);
850 } catch (IllegalAccessException e) {
851 throw new IllegalArgumentException(e);
852 }
853
854 InetSocketAddress initialIsa;
855 InetSocketAddress bindAddress;
856 if(this instanceof MasterRpcServices) {
857 String hostname = getHostname(rs.conf, true);
858 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
859
860 initialIsa = new InetSocketAddress(hostname, port);
861 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
862 } else {
863 String hostname = getHostname(rs.conf, false);
864 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
865 HConstants.DEFAULT_REGIONSERVER_PORT);
866
867 initialIsa = new InetSocketAddress(hostname, port);
868 bindAddress = new InetSocketAddress(
869 rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
870 }
871 if (initialIsa.getAddress() == null) {
872 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
873 }
874 priority = new AnnotationReadingPriorityFunction(this);
875 String name = rs.getProcessName() + "/" + initialIsa.toString();
876
877 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
878 rpcServer = new RpcServer(rs, name, getServices(),
879 bindAddress,
880 rs.conf,
881 rpcSchedulerFactory.create(rs.conf, this, rs));
882
883 scannerLeaseTimeoutPeriod = rs.conf.getInt(
884 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
885 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
886 maxScannerResultSize = rs.conf.getLong(
887 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
888 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
889 rpcTimeout = rs.conf.getInt(
890 HConstants.HBASE_RPC_TIMEOUT_KEY,
891 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
892 minimumScanTimeLimitDelta = rs.conf.getLong(
893 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
894 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
895
896 InetSocketAddress address = rpcServer.getListenerAddress();
897 if (address == null) {
898 throw new IOException("Listener channel is closed");
899 }
900
901 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
902 rpcServer.setErrorHandler(this);
903 rs.setName(name);
904 }
905
906 public static String getHostname(Configuration conf, boolean isMaster)
907 throws UnknownHostException {
908 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
909 HRegionServer.RS_HOSTNAME_KEY);
910 if (hostname == null || hostname.isEmpty()) {
911 String masterOrRS = isMaster ? "master" : "regionserver";
912 return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
913 conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
914 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
915 } else {
916 LOG.info("hostname is configured to be " + hostname);
917 return hostname;
918 }
919 }
920
921 RegionScanner getScanner(long scannerId) {
922 String scannerIdString = Long.toString(scannerId);
923 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
924 if (scannerHolder != null) {
925 return scannerHolder.s;
926 }
927 return null;
928 }
929
930
931
932
933
934 long getScannerVirtualTime(long scannerId) {
935 String scannerIdString = Long.toString(scannerId);
936 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
937 if (scannerHolder != null) {
938 return scannerHolder.getNextCallSeq();
939 }
940 return 0L;
941 }
942
943 long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
944 long scannerId = this.scannerIdGen.incrementAndGet();
945 String scannerName = String.valueOf(scannerId);
946
947 RegionScannerHolder existing =
948 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
949 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
950
951 regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
952 new ScannerListener(scannerName));
953 return scannerId;
954 }
955
956
957
958
959
960
961
962
963
964 Region getRegion(
965 final RegionSpecifier regionSpecifier) throws IOException {
966 return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
967 ProtobufUtil.getRegionEncodedName(regionSpecifier));
968 }
969
970 @VisibleForTesting
971 public PriorityFunction getPriority() {
972 return priority;
973 }
974
975 Configuration getConfiguration() {
976 return regionServer.getConfiguration();
977 }
978
979 private RegionServerQuotaManager getQuotaManager() {
980 return regionServer.getRegionServerQuotaManager();
981 }
982
983 void start() {
984 rpcServer.start();
985 }
986
987 void stop() {
988 closeAllScanners();
989 rpcServer.stop();
990 }
991
992
993
994
995
996
997 protected void checkOpen() throws IOException {
998 if (regionServer.isAborted()) {
999 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1000 }
1001 if (regionServer.isStopped()) {
1002 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1003 }
1004 if (!regionServer.fsOk) {
1005 throw new RegionServerStoppedException("File system not available");
1006 }
1007 if (!regionServer.isOnline()) {
1008 throw new ServerNotRunningYetException("Server is not running yet");
1009 }
1010 }
1011
1012
1013
1014
1015 protected List<BlockingServiceAndInterface> getServices() {
1016 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1017 bssi.add(new BlockingServiceAndInterface(
1018 ClientService.newReflectiveBlockingService(this),
1019 ClientService.BlockingInterface.class));
1020 bssi.add(new BlockingServiceAndInterface(
1021 AdminService.newReflectiveBlockingService(this),
1022 AdminService.BlockingInterface.class));
1023 return bssi;
1024 }
1025
1026 public InetSocketAddress getSocketAddress() {
1027 return isa;
1028 }
1029
1030 @Override
1031 public int getPriority(RequestHeader header, Message param) {
1032 return priority.getPriority(header, param);
1033 }
1034
1035 @Override
1036 public long getDeadline(RequestHeader header, Message param) {
1037 return priority.getDeadline(header, param);
1038 }
1039
1040
1041
1042
1043
1044
1045
1046
1047 @Override
1048 public boolean checkOOME(final Throwable e) {
1049 boolean stop = false;
1050 try {
1051 if (e instanceof OutOfMemoryError
1052 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1053 || (e.getMessage() != null && e.getMessage().contains(
1054 "java.lang.OutOfMemoryError"))) {
1055 stop = true;
1056 LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1057 + " will abort itself immediately", e);
1058 }
1059 } finally {
1060 if (stop) {
1061 Runtime.getRuntime().halt(1);
1062 }
1063 }
1064 return stop;
1065 }
1066
1067
1068
1069
1070
1071
1072
1073
1074 @Override
1075 @QosPriority(priority=HConstants.ADMIN_QOS)
1076 public CloseRegionResponse closeRegion(final RpcController controller,
1077 final CloseRegionRequest request) throws ServiceException {
1078 final ServerName sn = (request.hasDestinationServer() ?
1079 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1080
1081 try {
1082 checkOpen();
1083 if (request.hasServerStartCode()) {
1084
1085 long serverStartCode = request.getServerStartCode();
1086 if (regionServer.serverName.getStartcode() != serverStartCode) {
1087 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1088 "different server with startCode: " + serverStartCode + ", this server is: "
1089 + regionServer.serverName));
1090 }
1091 }
1092 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1093
1094 requestCount.increment();
1095 LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1096 CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1097 .getCloseRegionCoordination().parseFromProtoRequest(request);
1098
1099 boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1100 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1101 return builder.build();
1102 } catch (IOException ie) {
1103 throw new ServiceException(ie);
1104 }
1105 }
1106
1107
1108
1109
1110
1111
1112
1113
1114 @Override
1115 @QosPriority(priority=HConstants.ADMIN_QOS)
1116 public CompactRegionResponse compactRegion(final RpcController controller,
1117 final CompactRegionRequest request) throws ServiceException {
1118 try {
1119 checkOpen();
1120 requestCount.increment();
1121 Region region = getRegion(request.getRegion());
1122 region.startRegionOperation(Operation.COMPACT_REGION);
1123 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1124 boolean major = false;
1125 byte [] family = null;
1126 Store store = null;
1127 if (request.hasFamily()) {
1128 family = request.getFamily().toByteArray();
1129 store = region.getStore(family);
1130 if (store == null) {
1131 throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1132 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1133 }
1134 }
1135 if (request.hasMajor()) {
1136 major = request.getMajor();
1137 }
1138 if (major) {
1139 if (family != null) {
1140 store.triggerMajorCompaction();
1141 } else {
1142 region.triggerMajorCompaction();
1143 }
1144 }
1145
1146 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1147 if (LOG.isTraceEnabled()) {
1148 LOG.trace("User-triggered compaction requested for region "
1149 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1150 }
1151 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1152 if(family != null) {
1153 regionServer.compactSplitThread.requestCompaction(region, store, log,
1154 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1155 } else {
1156 regionServer.compactSplitThread.requestCompaction(region, log,
1157 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1158 }
1159 return CompactRegionResponse.newBuilder().build();
1160 } catch (IOException ie) {
1161 throw new ServiceException(ie);
1162 }
1163 }
1164
1165
1166
1167
1168
1169
1170
1171
1172 @Override
1173 @QosPriority(priority=HConstants.ADMIN_QOS)
1174 public FlushRegionResponse flushRegion(final RpcController controller,
1175 final FlushRegionRequest request) throws ServiceException {
1176 try {
1177 checkOpen();
1178 requestCount.increment();
1179 Region region = getRegion(request.getRegion());
1180 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1181 boolean shouldFlush = true;
1182 if (request.hasIfOlderThanTs()) {
1183 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1184 }
1185 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1186 if (shouldFlush) {
1187 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
1188 request.getWriteFlushWalMarker() : false;
1189 long startTime = EnvironmentEdgeManager.currentTime();
1190
1191 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1192 ((HRegion)region).flushcache(true, writeFlushWalMarker);
1193 if (flushResult.isFlushSucceeded()) {
1194 long endTime = EnvironmentEdgeManager.currentTime();
1195 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1196 }
1197 boolean compactionNeeded = flushResult.isCompactionNeeded();
1198 if (compactionNeeded) {
1199 regionServer.compactSplitThread.requestSystemCompaction(region,
1200 "Compaction through user triggered flush");
1201 }
1202 builder.setFlushed(flushResult.isFlushSucceeded());
1203 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1204 }
1205 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1206 return builder.build();
1207 } catch (DroppedSnapshotException ex) {
1208
1209
1210
1211
1212 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1213 throw new ServiceException(ex);
1214 } catch (IOException ie) {
1215 throw new ServiceException(ie);
1216 }
1217 }
1218
1219 @Override
1220 @QosPriority(priority=HConstants.ADMIN_QOS)
1221 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1222 final GetOnlineRegionRequest request) throws ServiceException {
1223 try {
1224 checkOpen();
1225 requestCount.increment();
1226 Map<String, Region> onlineRegions = regionServer.onlineRegions;
1227 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1228 for (Region region: onlineRegions.values()) {
1229 list.add(region.getRegionInfo());
1230 }
1231 Collections.sort(list);
1232 return ResponseConverter.buildGetOnlineRegionResponse(list);
1233 } catch (IOException ie) {
1234 throw new ServiceException(ie);
1235 }
1236 }
1237
1238 @Override
1239 @QosPriority(priority=HConstants.ADMIN_QOS)
1240 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1241 final GetRegionInfoRequest request) throws ServiceException {
1242 try {
1243 checkOpen();
1244 requestCount.increment();
1245 Region region = getRegion(request.getRegion());
1246 HRegionInfo info = region.getRegionInfo();
1247 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1248 builder.setRegionInfo(HRegionInfo.convert(info));
1249 if (request.hasCompactionState() && request.getCompactionState()) {
1250 builder.setCompactionState(region.getCompactionState());
1251 }
1252 builder.setIsRecovering(region.isRecovering());
1253 return builder.build();
1254 } catch (IOException ie) {
1255 throw new ServiceException(ie);
1256 }
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266 @Override
1267 @QosPriority(priority=HConstants.ADMIN_QOS)
1268 public GetServerInfoResponse getServerInfo(final RpcController controller,
1269 final GetServerInfoRequest request) throws ServiceException {
1270 try {
1271 checkOpen();
1272 } catch (IOException ie) {
1273 throw new ServiceException(ie);
1274 }
1275 requestCount.increment();
1276 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1277 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1278 }
1279
1280 @Override
1281 @QosPriority(priority=HConstants.ADMIN_QOS)
1282 public GetStoreFileResponse getStoreFile(final RpcController controller,
1283 final GetStoreFileRequest request) throws ServiceException {
1284 try {
1285 checkOpen();
1286 Region region = getRegion(request.getRegion());
1287 requestCount.increment();
1288 Set<byte[]> columnFamilies;
1289 if (request.getFamilyCount() == 0) {
1290 columnFamilies = region.getTableDesc().getFamiliesKeys();
1291 } else {
1292 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1293 for (ByteString cf: request.getFamilyList()) {
1294 columnFamilies.add(cf.toByteArray());
1295 }
1296 }
1297 int nCF = columnFamilies.size();
1298 List<String> fileList = region.getStoreFileList(
1299 columnFamilies.toArray(new byte[nCF][]));
1300 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1301 builder.addAllStoreFile(fileList);
1302 return builder.build();
1303 } catch (IOException ie) {
1304 throw new ServiceException(ie);
1305 }
1306 }
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316 @Override
1317 @QosPriority(priority = HConstants.ADMIN_QOS)
1318 public MergeRegionsResponse mergeRegions(final RpcController controller,
1319 final MergeRegionsRequest request) throws ServiceException {
1320 try {
1321 checkOpen();
1322 requestCount.increment();
1323 Region regionA = getRegion(request.getRegionA());
1324 Region regionB = getRegion(request.getRegionB());
1325 boolean forcible = request.getForcible();
1326 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1327 regionA.startRegionOperation(Operation.MERGE_REGION);
1328 regionB.startRegionOperation(Operation.MERGE_REGION);
1329 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1330 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1331 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1332 }
1333 LOG.info("Receiving merging request for " + regionA + ", " + regionB
1334 + ",forcible=" + forcible);
1335 long startTime = EnvironmentEdgeManager.currentTime();
1336 FlushResult flushResult = regionA.flush(true);
1337 if (flushResult.isFlushSucceeded()) {
1338 long endTime = EnvironmentEdgeManager.currentTime();
1339 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1340 }
1341 startTime = EnvironmentEdgeManager.currentTime();
1342 flushResult = regionB.flush(true);
1343 if (flushResult.isFlushSucceeded()) {
1344 long endTime = EnvironmentEdgeManager.currentTime();
1345 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1346 }
1347 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1348 masterSystemTime, RpcServer.getRequestUser());
1349 return MergeRegionsResponse.newBuilder().build();
1350 } catch (DroppedSnapshotException ex) {
1351 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1352 throw new ServiceException(ex);
1353 } catch (IOException ie) {
1354 throw new ServiceException(ie);
1355 }
1356 }
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381 @Override
1382 @QosPriority(priority=HConstants.ADMIN_QOS)
1383 public OpenRegionResponse openRegion(final RpcController controller,
1384 final OpenRegionRequest request) throws ServiceException {
1385 requestCount.increment();
1386 if (request.hasServerStartCode()) {
1387
1388 long serverStartCode = request.getServerStartCode();
1389 if (regionServer.serverName.getStartcode() != serverStartCode) {
1390 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1391 "different server with startCode: " + serverStartCode + ", this server is: "
1392 + regionServer.serverName));
1393 }
1394 }
1395
1396 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1397 final int regionCount = request.getOpenInfoCount();
1398 final Map<TableName, HTableDescriptor> htds =
1399 new HashMap<TableName, HTableDescriptor>(regionCount);
1400 final boolean isBulkAssign = regionCount > 1;
1401 try {
1402 checkOpen();
1403 } catch (IOException ie) {
1404 TableName tableName = null;
1405 if (regionCount == 1) {
1406 RegionInfo ri = request.getOpenInfo(0).getRegion();
1407 if (ri != null) {
1408 tableName = ProtobufUtil.toTableName(ri.getTableName());
1409 }
1410 }
1411 if (!TableName.META_TABLE_NAME.equals(tableName)) {
1412 throw new ServiceException(ie);
1413 }
1414
1415 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1416 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1417 long endTime = System.currentTimeMillis() + timeout;
1418 synchronized (regionServer.online) {
1419 try {
1420 while (System.currentTimeMillis() <= endTime
1421 && !regionServer.isStopped() && !regionServer.isOnline()) {
1422 regionServer.online.wait(regionServer.msgInterval);
1423 }
1424 checkOpen();
1425 } catch (InterruptedException t) {
1426 Thread.currentThread().interrupt();
1427 throw new ServiceException(t);
1428 } catch (IOException e) {
1429 throw new ServiceException(e);
1430 }
1431 }
1432 }
1433
1434 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1435
1436 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1437 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1438 OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1439 getOpenRegionCoordination();
1440 OpenRegionCoordination.OpenRegionDetails ord =
1441 coordination.parseFromProtoRequest(regionOpenInfo);
1442
1443 HTableDescriptor htd;
1444 try {
1445 final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1446 if (onlineRegion != null) {
1447
1448 if (onlineRegion.getCoprocessorHost() != null) {
1449 onlineRegion.getCoprocessorHost().preOpen();
1450 }
1451
1452
1453 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1454 regionServer.getConnection(), region.getRegionName());
1455 if (regionServer.serverName.equals(p.getSecond())) {
1456 Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1457
1458
1459
1460
1461
1462 if (!Boolean.FALSE.equals(closing)
1463 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1464 LOG.warn("Attempted open of " + region.getEncodedName()
1465 + " but already online on this server");
1466 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1467 continue;
1468 }
1469 } else {
1470 LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1471 + " but hbase:meta does not have this server - continue opening.");
1472 regionServer.removeFromOnlineRegions(onlineRegion, null);
1473 }
1474 }
1475 LOG.info("Open " + region.getRegionNameAsString());
1476 htd = htds.get(region.getTable());
1477 if (htd == null) {
1478 htd = regionServer.tableDescriptors.get(region.getTable());
1479 htds.put(region.getTable(), htd);
1480 }
1481
1482 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1483 region.getEncodedNameAsBytes(), Boolean.TRUE);
1484
1485 if (Boolean.FALSE.equals(previous)) {
1486
1487
1488 coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1489
1490 throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1491 + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1492 }
1493
1494 if (Boolean.TRUE.equals(previous)) {
1495
1496 LOG.info("Receiving OPEN for the region:" +
1497 region.getRegionNameAsString() + " , which we are already trying to OPEN"
1498 + " - ignoring this new request for this region.");
1499 }
1500
1501
1502
1503 regionServer.removeFromMovedRegions(region.getEncodedName());
1504
1505 if (previous == null) {
1506
1507 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1508 region.getEncodedName())) {
1509
1510
1511 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1512 || regionOpenInfo.getOpenForDistributedLogReplay()) {
1513 regionServer.recoveringRegions.put(region.getEncodedName(), null);
1514 } else {
1515
1516
1517 List<String> tmpRegions = new ArrayList<String>();
1518 tmpRegions.add(region.getEncodedName());
1519 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1520 tmpRegions);
1521 }
1522 }
1523
1524
1525 if (region.isMetaRegion()) {
1526 regionServer.service.submit(new OpenMetaHandler(
1527 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1528 } else {
1529 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1530 regionOpenInfo.getFavoredNodesList());
1531 regionServer.service.submit(new OpenRegionHandler(
1532 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1533 }
1534 }
1535
1536 builder.addOpeningState(RegionOpeningState.OPENED);
1537
1538 } catch (KeeperException zooKeeperEx) {
1539 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1540 throw new ServiceException(zooKeeperEx);
1541 } catch (IOException ie) {
1542 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1543 if (isBulkAssign) {
1544 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1545 } else {
1546 throw new ServiceException(ie);
1547 }
1548 }
1549 }
1550 return builder.build();
1551 }
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564 @Override
1565 public WarmupRegionResponse warmupRegion(final RpcController controller,
1566 final WarmupRegionRequest request) throws ServiceException {
1567
1568 RegionInfo regionInfo = request.getRegionInfo();
1569 final HRegionInfo region = HRegionInfo.convert(regionInfo);
1570 HTableDescriptor htd;
1571 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1572
1573 try {
1574 checkOpen();
1575 String encodedName = region.getEncodedName();
1576 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1577 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1578
1579 if (onlineRegion != null) {
1580 LOG.info("Region already online. Skipping warming up " + region);
1581 return response;
1582 }
1583
1584 if (LOG.isDebugEnabled()) {
1585 LOG.debug("Warming up Region " + region.getRegionNameAsString());
1586 }
1587
1588 htd = regionServer.tableDescriptors.get(region.getTable());
1589
1590 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1591 LOG.info("Region is in transition. Skipping warmup " + region);
1592 return response;
1593 }
1594
1595 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1596 regionServer.getConfiguration(), regionServer, null);
1597
1598 } catch (IOException ie) {
1599 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1600 throw new ServiceException(ie);
1601 }
1602
1603 return response;
1604 }
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614 @Override
1615 @QosPriority(priority = HConstants.REPLAY_QOS)
1616 public ReplicateWALEntryResponse replay(final RpcController controller,
1617 final ReplicateWALEntryRequest request) throws ServiceException {
1618 long before = EnvironmentEdgeManager.currentTime();
1619 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1620 try {
1621 checkOpen();
1622 List<WALEntry> entries = request.getEntryList();
1623 if (entries == null || entries.isEmpty()) {
1624
1625 return ReplicateWALEntryResponse.newBuilder().build();
1626 }
1627 ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1628 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1629 RegionCoprocessorHost coprocessorHost =
1630 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1631 ? region.getCoprocessorHost()
1632 : null;
1633 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1634
1635
1636 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1637 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1638
1639 for (WALEntry entry : entries) {
1640 if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1641 throw new NotServingRegionException("Replay request contains entries from multiple " +
1642 "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1643 + entry.getKey().getEncodedRegionName());
1644 }
1645 if (regionServer.nonceManager != null && isPrimary) {
1646 long nonceGroup = entry.getKey().hasNonceGroup()
1647 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1648 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1649 regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
1650 }
1651 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1652 new Pair<WALKey, WALEdit>();
1653 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1654 cells, walEntry, durability);
1655 if (coprocessorHost != null) {
1656
1657
1658 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1659 walEntry.getSecond())) {
1660
1661 continue;
1662 }
1663 walEntries.add(walEntry);
1664 }
1665 if(edits!=null && !edits.isEmpty()) {
1666 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1667 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1668 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1669
1670 for (int i = 0; result != null && i < result.length; i++) {
1671 if (result[i] != OperationStatus.SUCCESS) {
1672 throw new IOException(result[i].getExceptionMsg());
1673 }
1674 }
1675 }
1676 }
1677
1678
1679 WAL wal = getWAL(region);
1680 if (wal != null) {
1681 wal.sync();
1682 }
1683
1684 if (coprocessorHost != null) {
1685 for (Pair<WALKey, WALEdit> entry : walEntries) {
1686 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1687 entry.getSecond());
1688 }
1689 }
1690 return ReplicateWALEntryResponse.newBuilder().build();
1691 } catch (IOException ie) {
1692 throw new ServiceException(ie);
1693 } finally {
1694 if (regionServer.metricsRegionServer != null) {
1695 regionServer.metricsRegionServer.updateReplay(
1696 EnvironmentEdgeManager.currentTime() - before);
1697 }
1698 }
1699 }
1700
1701 WAL getWAL(Region region) {
1702 return ((HRegion)region).getWAL();
1703 }
1704
1705
1706
1707
1708
1709
1710
1711
1712 @Override
1713 @QosPriority(priority=HConstants.REPLICATION_QOS)
1714 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1715 final ReplicateWALEntryRequest request) throws ServiceException {
1716 try {
1717 checkOpen();
1718 if (regionServer.replicationSinkHandler != null) {
1719 requestCount.increment();
1720 List<WALEntry> entries = request.getEntryList();
1721 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1722 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1723 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1724 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1725 return ReplicateWALEntryResponse.newBuilder().build();
1726 } else {
1727 throw new ServiceException("Replication services are not initialized yet");
1728 }
1729 } catch (IOException ie) {
1730 throw new ServiceException(ie);
1731 }
1732 }
1733
1734
1735
1736
1737
1738
1739
1740 @Override
1741 public RollWALWriterResponse rollWALWriter(final RpcController controller,
1742 final RollWALWriterRequest request) throws ServiceException {
1743 try {
1744 checkOpen();
1745 requestCount.increment();
1746 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1747 regionServer.walRoller.requestRollAll();
1748 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1749 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1750 return builder.build();
1751 } catch (IOException ie) {
1752 throw new ServiceException(ie);
1753 }
1754 }
1755
1756
1757
1758
1759
1760
1761
1762
1763 @Override
1764 @QosPriority(priority=HConstants.ADMIN_QOS)
1765 public SplitRegionResponse splitRegion(final RpcController controller,
1766 final SplitRegionRequest request) throws ServiceException {
1767 try {
1768 checkOpen();
1769 requestCount.increment();
1770 Region region = getRegion(request.getRegion());
1771 region.startRegionOperation(Operation.SPLIT_REGION);
1772 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1773 throw new IOException("Can't split replicas directly. "
1774 + "Replicas are auto-split when their primary is split.");
1775 }
1776 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1777 long startTime = EnvironmentEdgeManager.currentTime();
1778 FlushResult flushResult = region.flush(true);
1779 if (flushResult.isFlushSucceeded()) {
1780 long endTime = EnvironmentEdgeManager.currentTime();
1781 regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1782 }
1783 byte[] splitPoint = null;
1784 if (request.hasSplitPoint()) {
1785 splitPoint = request.getSplitPoint().toByteArray();
1786 }
1787 ((HRegion)region).forceSplit(splitPoint);
1788 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1789 RpcServer.getRequestUser());
1790 return SplitRegionResponse.newBuilder().build();
1791 } catch (DroppedSnapshotException ex) {
1792 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1793 throw new ServiceException(ex);
1794 } catch (IOException ie) {
1795 throw new ServiceException(ie);
1796 }
1797 }
1798
1799
1800
1801
1802
1803
1804
1805
1806 @Override
1807 @QosPriority(priority=HConstants.ADMIN_QOS)
1808 public StopServerResponse stopServer(final RpcController controller,
1809 final StopServerRequest request) throws ServiceException {
1810 requestCount.increment();
1811 String reason = request.getReason();
1812 regionServer.stop(reason);
1813 return StopServerResponse.newBuilder().build();
1814 }
1815
1816 @Override
1817 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1818 UpdateFavoredNodesRequest request) throws ServiceException {
1819 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1820 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1821 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1822 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1823 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1824 regionUpdateInfo.getFavoredNodesList());
1825 }
1826 respBuilder.setResponse(openInfoList.size());
1827 return respBuilder.build();
1828 }
1829
1830
1831
1832
1833
1834
1835 @Override
1836 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1837 final BulkLoadHFileRequest request) throws ServiceException {
1838 try {
1839 checkOpen();
1840 requestCount.increment();
1841 Region region = getRegion(request.getRegion());
1842 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1843 for (FamilyPath familyPath: request.getFamilyPathList()) {
1844 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1845 familyPath.getPath()));
1846 }
1847 boolean bypass = false;
1848 if (region.getCoprocessorHost() != null) {
1849 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1850 }
1851 boolean loaded = false;
1852 if (!bypass) {
1853 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1854 }
1855 if (region.getCoprocessorHost() != null) {
1856 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1857 }
1858 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1859 builder.setLoaded(loaded);
1860 return builder.build();
1861 } catch (IOException ie) {
1862 throw new ServiceException(ie);
1863 }
1864 }
1865
1866 @Override
1867 public CoprocessorServiceResponse execService(final RpcController controller,
1868 final CoprocessorServiceRequest request) throws ServiceException {
1869 try {
1870 checkOpen();
1871 requestCount.increment();
1872 Region region = getRegion(request.getRegion());
1873 Message result = execServiceOnRegion(region, request.getCall());
1874 CoprocessorServiceResponse.Builder builder =
1875 CoprocessorServiceResponse.newBuilder();
1876 builder.setRegion(RequestConverter.buildRegionSpecifier(
1877 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1878 builder.setValue(
1879 builder.getValueBuilder().setName(result.getClass().getName())
1880 .setValue(result.toByteString()));
1881 return builder.build();
1882 } catch (IOException ie) {
1883 throw new ServiceException(ie);
1884 }
1885 }
1886
1887 private Message execServiceOnRegion(Region region,
1888 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1889
1890 ServerRpcController execController = new ServerRpcController();
1891 Message result = region.execService(execController, serviceCall);
1892 if (execController.getFailedOn() != null) {
1893 throw execController.getFailedOn();
1894 }
1895 return result;
1896 }
1897
1898
1899
1900
1901
1902
1903
1904
1905 @Override
1906 public GetResponse get(final RpcController controller,
1907 final GetRequest request) throws ServiceException {
1908 long before = EnvironmentEdgeManager.currentTime();
1909 OperationQuota quota = null;
1910 try {
1911 checkOpen();
1912 requestCount.increment();
1913 Region region = getRegion(request.getRegion());
1914
1915 GetResponse.Builder builder = GetResponse.newBuilder();
1916 ClientProtos.Get get = request.getGet();
1917 Boolean existence = null;
1918 Result r = null;
1919 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
1920
1921 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
1922 if (get.getColumnCount() != 1) {
1923 throw new DoNotRetryIOException(
1924 "get ClosestRowBefore supports one and only one family now, not "
1925 + get.getColumnCount() + " families");
1926 }
1927 byte[] row = get.getRow().toByteArray();
1928 byte[] family = get.getColumn(0).getFamily().toByteArray();
1929 r = region.getClosestRowBefore(row, family);
1930 } else {
1931 Get clientGet = ProtobufUtil.toGet(get);
1932 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
1933 existence = region.getCoprocessorHost().preExists(clientGet);
1934 }
1935 if (existence == null) {
1936 r = region.get(clientGet);
1937 if (get.getExistenceOnly()) {
1938 boolean exists = r.getExists();
1939 if (region.getCoprocessorHost() != null) {
1940 exists = region.getCoprocessorHost().postExists(clientGet, exists);
1941 }
1942 existence = exists;
1943 }
1944 }
1945 }
1946 if (existence != null){
1947 ClientProtos.Result pbr =
1948 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
1949 builder.setResult(pbr);
1950 } else if (r != null) {
1951 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
1952 builder.setResult(pbr);
1953 }
1954 if (r != null) {
1955 quota.addGetResult(r);
1956 }
1957 return builder.build();
1958 } catch (IOException ie) {
1959 throw new ServiceException(ie);
1960 } finally {
1961 if (regionServer.metricsRegionServer != null) {
1962 regionServer.metricsRegionServer.updateGet(
1963 EnvironmentEdgeManager.currentTime() - before);
1964 }
1965 if (quota != null) {
1966 quota.close();
1967 }
1968 }
1969 }
1970
1971
1972
1973
1974
1975
1976
1977
1978 @Override
1979 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
1980 throws ServiceException {
1981 try {
1982 checkOpen();
1983 } catch (IOException ie) {
1984 throw new ServiceException(ie);
1985 }
1986
1987
1988
1989 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
1990 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
1991 if (controller != null) controller.setCellScanner(null);
1992
1993 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
1994
1995
1996 List<CellScannable> cellsToReturn = null;
1997 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
1998 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
1999 Boolean processed = null;
2000
2001 for (RegionAction regionAction : request.getRegionActionList()) {
2002 this.requestCount.add(regionAction.getActionCount());
2003 OperationQuota quota;
2004 Region region;
2005 regionActionResultBuilder.clear();
2006 try {
2007 region = getRegion(regionAction.getRegion());
2008 quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2009 } catch (IOException e) {
2010 rpcServer.getMetrics().exception(e);
2011 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2012 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2013 if (cellScanner != null) {
2014 skipCellsForMutations(regionAction.getActionList(), cellScanner);
2015 }
2016 continue;
2017 }
2018
2019 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2020
2021
2022 try {
2023 if (request.hasCondition()) {
2024 Condition condition = request.getCondition();
2025 byte[] row = condition.getRow().toByteArray();
2026 byte[] family = condition.getFamily().toByteArray();
2027 byte[] qualifier = condition.getQualifier().toByteArray();
2028 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2029 ByteArrayComparable comparator =
2030 ProtobufUtil.toComparator(condition.getComparator());
2031 processed = checkAndRowMutate(region, regionAction.getActionList(),
2032 cellScanner, row, family, qualifier, compareOp, comparator);
2033 } else {
2034 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2035 cellScanner);
2036
2037 if(stats != null) {
2038 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2039 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2040 }
2041 processed = Boolean.TRUE;
2042 }
2043 } catch (IOException e) {
2044 rpcServer.getMetrics().exception(e);
2045
2046 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2047 }
2048 } else {
2049
2050 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2051 regionActionResultBuilder, cellsToReturn, nonceGroup);
2052 }
2053 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2054 quota.close();
2055 }
2056
2057 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2058 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2059 }
2060 if (processed != null) responseBuilder.setProcessed(processed);
2061 return responseBuilder.build();
2062 }
2063
2064 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2065 for (Action action : actions) {
2066 skipCellsForMutation(action, cellScanner);
2067 }
2068 }
2069
2070 private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2071 try {
2072 if (action.hasMutation()) {
2073 MutationProto m = action.getMutation();
2074 if (m.hasAssociatedCellCount()) {
2075 for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2076 cellScanner.advance();
2077 }
2078 }
2079 }
2080 } catch (IOException e) {
2081
2082
2083
2084 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2085 }
2086 }
2087
2088
2089
2090
2091
2092
2093
2094
2095 @Override
2096 public MutateResponse mutate(final RpcController rpcc,
2097 final MutateRequest request) throws ServiceException {
2098
2099
2100 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2101 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2102 OperationQuota quota = null;
2103
2104 if (controller != null) controller.setCellScanner(null);
2105 try {
2106 checkOpen();
2107 requestCount.increment();
2108 Region region = getRegion(request.getRegion());
2109 MutateResponse.Builder builder = MutateResponse.newBuilder();
2110 MutationProto mutation = request.getMutation();
2111 if (!region.getRegionInfo().isMetaTable()) {
2112 regionServer.cacheFlusher.reclaimMemStoreMemory();
2113 }
2114 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2115 Result r = null;
2116 Boolean processed = null;
2117 MutationType type = mutation.getMutateType();
2118 long mutationSize = 0;
2119 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2120 switch (type) {
2121 case APPEND:
2122
2123 r = append(region, quota, mutation, cellScanner, nonceGroup);
2124 break;
2125 case INCREMENT:
2126
2127 r = increment(region, quota, mutation, cellScanner, nonceGroup);
2128 break;
2129 case PUT:
2130 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2131 quota.addMutation(put);
2132 if (request.hasCondition()) {
2133 Condition condition = request.getCondition();
2134 byte[] row = condition.getRow().toByteArray();
2135 byte[] family = condition.getFamily().toByteArray();
2136 byte[] qualifier = condition.getQualifier().toByteArray();
2137 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2138 ByteArrayComparable comparator =
2139 ProtobufUtil.toComparator(condition.getComparator());
2140 if (region.getCoprocessorHost() != null) {
2141 processed = region.getCoprocessorHost().preCheckAndPut(
2142 row, family, qualifier, compareOp, comparator, put);
2143 }
2144 if (processed == null) {
2145 boolean result = region.checkAndMutate(row, family,
2146 qualifier, compareOp, comparator, put, true);
2147 if (region.getCoprocessorHost() != null) {
2148 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2149 qualifier, compareOp, comparator, put, result);
2150 }
2151 processed = result;
2152 }
2153 } else {
2154 region.put(put);
2155 processed = Boolean.TRUE;
2156 }
2157 break;
2158 case DELETE:
2159 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2160 quota.addMutation(delete);
2161 if (request.hasCondition()) {
2162 Condition condition = request.getCondition();
2163 byte[] row = condition.getRow().toByteArray();
2164 byte[] family = condition.getFamily().toByteArray();
2165 byte[] qualifier = condition.getQualifier().toByteArray();
2166 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2167 ByteArrayComparable comparator =
2168 ProtobufUtil.toComparator(condition.getComparator());
2169 if (region.getCoprocessorHost() != null) {
2170 processed = region.getCoprocessorHost().preCheckAndDelete(
2171 row, family, qualifier, compareOp, comparator, delete);
2172 }
2173 if (processed == null) {
2174 boolean result = region.checkAndMutate(row, family,
2175 qualifier, compareOp, comparator, delete, true);
2176 if (region.getCoprocessorHost() != null) {
2177 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2178 qualifier, compareOp, comparator, delete, result);
2179 }
2180 processed = result;
2181 }
2182 } else {
2183 region.delete(delete);
2184 processed = Boolean.TRUE;
2185 }
2186 break;
2187 default:
2188 throw new DoNotRetryIOException(
2189 "Unsupported mutate type: " + type.name());
2190 }
2191 if (processed != null) builder.setProcessed(processed.booleanValue());
2192 addResult(builder, r, controller);
2193 return builder.build();
2194 } catch (IOException ie) {
2195 regionServer.checkFileSystem();
2196 throw new ServiceException(ie);
2197 } finally {
2198 if (quota != null) {
2199 quota.close();
2200 }
2201 }
2202 }
2203
2204
2205
2206
2207
2208
2209
2210
2211 @Override
2212 public ScanResponse scan(final RpcController controller, final ScanRequest request)
2213 throws ServiceException {
2214 OperationQuota quota = null;
2215 Leases.Lease lease = null;
2216 String scannerName = null;
2217 try {
2218 if (!request.hasScannerId() && !request.hasScan()) {
2219 throw new DoNotRetryIOException(
2220 "Missing required input: scannerId or scan");
2221 }
2222 long scannerId = -1;
2223 if (request.hasScannerId()) {
2224 scannerId = request.getScannerId();
2225 scannerName = String.valueOf(scannerId);
2226 }
2227 try {
2228 checkOpen();
2229 } catch (IOException e) {
2230
2231
2232 if (scannerName != null) {
2233 LOG.debug("Server shutting down and client tried to access missing scanner "
2234 + scannerName);
2235 if (regionServer.leases != null) {
2236 try {
2237 regionServer.leases.cancelLease(scannerName);
2238 } catch (LeaseException le) {
2239
2240 }
2241 }
2242 }
2243 throw e;
2244 }
2245 requestCount.increment();
2246
2247 int ttl = 0;
2248 Region region = null;
2249 RegionScanner scanner = null;
2250 RegionScannerHolder rsh = null;
2251 boolean moreResults = true;
2252 boolean closeScanner = false;
2253 boolean isSmallScan = false;
2254 ScanResponse.Builder builder = ScanResponse.newBuilder();
2255 if (request.hasCloseScanner()) {
2256 closeScanner = request.getCloseScanner();
2257 }
2258 int rows = closeScanner ? 0 : 1;
2259 if (request.hasNumberOfRows()) {
2260 rows = request.getNumberOfRows();
2261 }
2262 if (request.hasScannerId()) {
2263 rsh = scanners.get(scannerName);
2264 if (rsh == null) {
2265 LOG.warn("Client tried to access missing scanner " + scannerName);
2266 throw new UnknownScannerException(
2267 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2268 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2269 + "long wait between consecutive client checkins, c) Server may be closing down, "
2270 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2271 + "possible fix would be increasing the value of"
2272 + "'hbase.client.scanner.timeout.period' configuration.");
2273 }
2274 scanner = rsh.s;
2275 HRegionInfo hri = scanner.getRegionInfo();
2276 region = regionServer.getRegion(hri.getRegionName());
2277 if (region != rsh.r) {
2278 throw new NotServingRegionException("Region was re-opened after the scanner"
2279 + scannerName + " was created: " + hri.getRegionNameAsString());
2280 }
2281 } else {
2282 region = getRegion(request.getRegion());
2283 ClientProtos.Scan protoScan = request.getScan();
2284 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2285 Scan scan = ProtobufUtil.toScan(protoScan);
2286
2287 if (!isLoadingCfsOnDemandSet) {
2288 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2289 }
2290
2291 isSmallScan = scan.isSmall();
2292 if (!scan.hasFamilies()) {
2293
2294 for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2295 scan.addFamily(family);
2296 }
2297 }
2298
2299 if (region.getCoprocessorHost() != null) {
2300 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2301 }
2302 if (scanner == null) {
2303 scanner = region.getScanner(scan);
2304 }
2305 if (region.getCoprocessorHost() != null) {
2306 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2307 }
2308 scannerId = addScanner(scanner, region);
2309 scannerName = String.valueOf(scannerId);
2310 ttl = this.scannerLeaseTimeoutPeriod;
2311 }
2312 if (request.hasRenew() && request.getRenew()) {
2313 rsh = scanners.get(scannerName);
2314 lease = regionServer.leases.removeLease(scannerName);
2315 if (lease != null && rsh != null) {
2316 regionServer.leases.addLease(lease);
2317
2318 rsh.incNextCallSeq();
2319 }
2320 return builder.build();
2321 }
2322
2323 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2324 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2325 if (rows > 0) {
2326
2327
2328
2329 if (request.hasNextCallSeq()) {
2330 if (rsh == null) {
2331 rsh = scanners.get(scannerName);
2332 }
2333 if (rsh != null) {
2334 if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2335 throw new OutOfOrderScannerNextException(
2336 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2337 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2338 "; request=" + TextFormat.shortDebugString(request));
2339 }
2340
2341 rsh.incNextCallSeq();
2342 }
2343 }
2344 try {
2345
2346
2347 lease = regionServer.leases.removeLease(scannerName);
2348 List<Result> results = new ArrayList<Result>();
2349 long totalCellSize = 0;
2350 long currentScanResultSize = 0;
2351
2352 boolean done = false;
2353
2354 if (region != null && region.getCoprocessorHost() != null) {
2355 Boolean bypass = region.getCoprocessorHost().preScannerNext(
2356 scanner, results, rows);
2357 if (!results.isEmpty()) {
2358 for (Result r : results) {
2359 for (Cell cell : r.rawCells()) {
2360 totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2361 currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
2362 }
2363 }
2364 }
2365 if (bypass != null && bypass.booleanValue()) {
2366 done = true;
2367 }
2368 }
2369
2370 if (!done) {
2371 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2372 if (maxResultSize <= 0) {
2373 maxResultSize = maxQuotaResultSize;
2374 }
2375
2376
2377
2378 List<Cell> values = new ArrayList<Cell>(32);
2379 region.startRegionOperation(Operation.SCAN);
2380 try {
2381 int i = 0;
2382 synchronized(scanner) {
2383 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2384 boolean clientHandlesPartials =
2385 request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2386 boolean clientHandlesHeartbeats =
2387 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2388
2389
2390
2391
2392
2393
2394 boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
2395 boolean allowPartialResults =
2396 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2397 boolean moreRows = false;
2398
2399
2400
2401
2402
2403
2404
2405
2406 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2407
2408
2409
2410 long timeLimit = -1;
2411
2412
2413
2414
2415 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2416 long timeLimitDelta;
2417 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2418 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2419 } else {
2420 timeLimitDelta =
2421 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2422 }
2423
2424
2425
2426 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2427 timeLimit = System.currentTimeMillis() + timeLimitDelta;
2428 }
2429
2430 final LimitScope sizeScope =
2431 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2432 final LimitScope timeScope =
2433 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2434
2435
2436
2437 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2438 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2439 contextBuilder.setBatchLimit(scanner.getBatch());
2440 contextBuilder.setTimeLimit(timeScope, timeLimit);
2441 ScannerContext scannerContext = contextBuilder.build();
2442
2443 boolean limitReached = false;
2444 while (i < rows) {
2445
2446
2447
2448
2449
2450 scannerContext.setBatchProgress(0);
2451
2452
2453 moreRows = scanner.nextRaw(values, scannerContext);
2454
2455 if (!values.isEmpty()) {
2456 for (Cell cell : values) {
2457 totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2458 }
2459 final boolean partial = scannerContext.partialResultFormed();
2460 results.add(Result.create(values, null, stale, partial));
2461 i++;
2462 }
2463
2464 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2465 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2466 boolean rowLimitReached = i >= rows;
2467 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2468
2469 if (limitReached || !moreRows) {
2470 if (LOG.isTraceEnabled()) {
2471 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2472 + moreRows + " scannerContext: " + scannerContext);
2473 }
2474
2475
2476
2477
2478 if (moreRows) {
2479
2480 builder.setHeartbeatMessage(timeLimitReached);
2481 }
2482 break;
2483 }
2484 values.clear();
2485 }
2486
2487 if (limitReached || moreRows) {
2488
2489 builder.setMoreResultsInRegion(true);
2490 } else {
2491
2492 builder.setMoreResultsInRegion(false);
2493 }
2494 }
2495 region.updateReadRequestsCount(i);
2496 region.getMetrics().updateScanNext(totalCellSize);
2497 if (regionServer.metricsRegionServer != null) {
2498 regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
2499 }
2500 } finally {
2501 region.closeRegionOperation();
2502 }
2503
2504
2505 if (region != null && region.getCoprocessorHost() != null) {
2506 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2507 }
2508 }
2509
2510 quota.addScanResult(results);
2511
2512
2513
2514
2515 if (scanner.isFilterDone() && results.isEmpty()) {
2516 moreResults = false;
2517 results = null;
2518 } else {
2519 addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2520 }
2521 } catch (IOException e) {
2522
2523
2524
2525 closeScanner(region, scanner, scannerName);
2526
2527
2528
2529
2530 RpcCallContext context = RpcServer.getCurrentCall();
2531 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2532
2533 throw new ScannerResetException("Scanner is closed on the server-side", e);
2534 } else {
2535
2536 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2537 + " scanner state for clients older than 1.3.", e);
2538 }
2539 } finally {
2540
2541
2542 if (scanners.containsKey(scannerName)) {
2543 if (lease != null) regionServer.leases.addLease(lease);
2544 ttl = this.scannerLeaseTimeoutPeriod;
2545 }
2546 }
2547 }
2548
2549 if (!moreResults || closeScanner) {
2550 ttl = 0;
2551 moreResults = false;
2552 closeScanner(region, scanner, scannerName);
2553 }
2554
2555 if (ttl > 0) {
2556 builder.setTtl(ttl);
2557 }
2558 builder.setScannerId(scannerId);
2559 builder.setMoreResults(moreResults);
2560 return builder.build();
2561 } catch (IOException ie) {
2562 if (scannerName != null && ie instanceof NotServingRegionException) {
2563 RegionScannerHolder rsh = scanners.remove(scannerName);
2564 if (rsh != null) {
2565 try {
2566 RegionScanner scanner = rsh.s;
2567 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2568 scanner.close();
2569 regionServer.leases.cancelLease(scannerName);
2570 } catch (IOException e) {
2571 LOG.warn("Getting exception closing " + scannerName, e);
2572 }
2573 }
2574 }
2575 throw new ServiceException(ie);
2576 } finally {
2577 if (quota != null) {
2578 quota.close();
2579 }
2580 }
2581 }
2582
2583 private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2584 throws IOException {
2585 if (region != null && region.getCoprocessorHost() != null) {
2586 if (region.getCoprocessorHost().preScannerClose(scanner)) {
2587 return true;
2588 }
2589 }
2590 RegionScannerHolder rsh = scanners.remove(scannerName);
2591 if (rsh != null) {
2592 scanner = rsh.s;
2593 scanner.close();
2594 try {
2595 regionServer.leases.cancelLease(scannerName);
2596 } catch (LeaseException le) {
2597
2598 if (LOG.isTraceEnabled()) {
2599 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2600 }
2601 }
2602 if (region != null && region.getCoprocessorHost() != null) {
2603 region.getCoprocessorHost().postScannerClose(scanner);
2604 }
2605 }
2606 return false;
2607 }
2608
2609 @Override
2610 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2611 CoprocessorServiceRequest request) throws ServiceException {
2612 return regionServer.execRegionServerService(controller, request);
2613 }
2614
2615 @Override
2616 public UpdateConfigurationResponse updateConfiguration(
2617 RpcController controller, UpdateConfigurationRequest request)
2618 throws ServiceException {
2619 try {
2620 this.regionServer.updateConfiguration();
2621 } catch (Exception e) {
2622 throw new ServiceException(e);
2623 }
2624 return UpdateConfigurationResponse.getDefaultInstance();
2625 }
2626 }