1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.io.UnsupportedEncodingException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableMap;
36 import java.util.NavigableSet;
37 import java.util.Random;
38 import java.util.Set;
39 import java.util.TreeMap;
40 import java.util.UUID;
41 import java.util.concurrent.Callable;
42 import java.util.concurrent.CompletionService;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ConcurrentSkipListMap;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ExecutorCompletionService;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.FutureTask;
52 import java.util.concurrent.ThreadFactory;
53 import java.util.concurrent.ThreadPoolExecutor;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56 import java.util.concurrent.atomic.AtomicBoolean;
57 import java.util.concurrent.atomic.AtomicInteger;
58 import java.util.concurrent.atomic.AtomicLong;
59 import java.util.concurrent.locks.Lock;
60 import java.util.concurrent.locks.ReentrantReadWriteLock;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64 import org.apache.hadoop.classification.InterfaceAudience;
65 import org.apache.hadoop.conf.Configuration;
66 import org.apache.hadoop.fs.FileStatus;
67 import org.apache.hadoop.fs.FileSystem;
68 import org.apache.hadoop.fs.Path;
69 import org.apache.hadoop.hbase.Cell;
70 import org.apache.hadoop.hbase.CompoundConfiguration;
71 import org.apache.hadoop.hbase.HBaseConfiguration;
72 import org.apache.hadoop.hbase.HColumnDescriptor;
73 import org.apache.hadoop.hbase.HConstants;
74 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
75 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
76 import org.apache.hadoop.hbase.HRegionInfo;
77 import org.apache.hadoop.hbase.HTableDescriptor;
78 import org.apache.hadoop.hbase.KeyValue;
79 import org.apache.hadoop.hbase.KeyValueUtil;
80 import org.apache.hadoop.hbase.backup.HFileArchiver;
81 import org.apache.hadoop.hbase.client.Append;
82 import org.apache.hadoop.hbase.client.Delete;
83 import org.apache.hadoop.hbase.client.Durability;
84 import org.apache.hadoop.hbase.client.Get;
85 import org.apache.hadoop.hbase.client.Increment;
86 import org.apache.hadoop.hbase.client.IsolationLevel;
87 import org.apache.hadoop.hbase.client.Mutation;
88 import org.apache.hadoop.hbase.client.Put;
89 import org.apache.hadoop.hbase.client.Result;
90 import org.apache.hadoop.hbase.client.Row;
91 import org.apache.hadoop.hbase.client.RowMutations;
92 import org.apache.hadoop.hbase.client.Scan;
93 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
94 import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
95 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
96 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
97 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
98 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
99 import org.apache.hadoop.hbase.exceptions.RegionTooBusyException;
100 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
101 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
102 import org.apache.hadoop.hbase.exceptions.WrongRegionException;
103 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
104 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
105 import org.apache.hadoop.hbase.filter.Filter;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.RpcServer;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
115 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
117 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
119 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
120 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
121 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
122 import org.apache.hadoop.hbase.regionserver.wal.HLog;
123 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
124 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
126 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
127 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
128 import org.apache.hadoop.hbase.util.Bytes;
129 import org.apache.hadoop.hbase.util.CancelableProgressable;
130 import org.apache.hadoop.hbase.util.ClassSize;
131 import org.apache.hadoop.hbase.util.CompressionTest;
132 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133 import org.apache.hadoop.hbase.util.FSUtils;
134 import org.apache.hadoop.hbase.util.HashedBytes;
135 import org.apache.hadoop.hbase.util.Pair;
136 import org.apache.hadoop.hbase.util.Threads;
137 import org.apache.hadoop.io.MultipleIOException;
138 import org.apache.hadoop.util.StringUtils;
139 import org.cliffc.high_scale_lib.Counter;
140
141 import com.google.common.base.Preconditions;
142 import com.google.common.collect.Lists;
143 import com.google.common.collect.Maps;
144 import com.google.common.io.Closeables;
145 import com.google.protobuf.Descriptors;
146 import com.google.protobuf.Message;
147 import com.google.protobuf.RpcCallback;
148 import com.google.protobuf.RpcController;
149 import com.google.protobuf.Service;
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 @InterfaceAudience.Private
188 public class HRegion implements HeapSize {
189 public static final Log LOG = LogFactory.getLog(HRegion.class);
190
191 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
192 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
193
194 final AtomicBoolean closed = new AtomicBoolean(false);
195
196
197
198
199
200 final AtomicBoolean closing = new AtomicBoolean(false);
201
202 protected long completeSequenceId = -1L;
203
204
205
206
207
208
209
210 protected enum Operation {
211 ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION
212 }
213
214
215
216
217
218 private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
219 new ConcurrentHashMap<HashedBytes, CountDownLatch>();
220 private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
221 new ConcurrentHashMap<Integer, HashedBytes>();
222 private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
223 static private Random rand = new Random();
224
225 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
226 Bytes.BYTES_RAWCOMPARATOR);
227
228
229 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
230
231 public final AtomicLong memstoreSize = new AtomicLong(0);
232
233
234 final Counter numPutsWithoutWAL = new Counter();
235 final Counter dataInMemoryWithoutWAL = new Counter();
236
237
238 final Counter checkAndMutateChecksPassed = new Counter();
239 final Counter checkAndMutateChecksFailed = new Counter();
240
241
242 final Counter readRequestsCount = new Counter();
243 final Counter writeRequestsCount = new Counter();
244
245
246 final Counter updatesBlockedMs = new Counter();
247
248 private final HLog log;
249 private final HRegionFileSystem fs;
250 protected final Configuration conf;
251 private final Configuration baseConf;
252 private final KeyValue.KVComparator comparator;
253 private final int rowLockWaitDuration;
254 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
255
256
257
258
259
260
261
262 final long busyWaitDuration;
263 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
264
265
266
267
268 final int maxBusyWaitMultiplier;
269
270
271
272 final long maxBusyWaitDuration;
273
274
275 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
276 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
277
278 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
279
280
281
282
283 private long openSeqNum = HConstants.NO_SEQNUM;
284
285
286
287
288
289 private boolean isLoadingCfsOnDemandDefault = false;
290
291 private final AtomicInteger majorInProgress = new AtomicInteger(0);
292 private final AtomicInteger minorInProgress = new AtomicInteger(0);
293
294
295
296
297 private long minSeqIdForLogReplay = -1;
298
299
300
301
302
303
304 public long getSmallestReadPoint() {
305 long minimumReadPoint;
306
307
308
309 synchronized(scannerReadPoints) {
310 minimumReadPoint = mvcc.memstoreReadPoint();
311
312 for (Long readPoint: this.scannerReadPoints.values()) {
313 if (readPoint < minimumReadPoint) {
314 minimumReadPoint = readPoint;
315 }
316 }
317 }
318 return minimumReadPoint;
319 }
320
321
322
323
324 static class WriteState {
325
326 volatile boolean flushing = false;
327
328 volatile boolean flushRequested = false;
329
330 volatile int compacting = 0;
331
332 volatile boolean writesEnabled = true;
333
334 volatile boolean readOnly = false;
335
336
337
338
339
340
341 synchronized void setReadOnly(final boolean onOff) {
342 this.writesEnabled = !onOff;
343 this.readOnly = onOff;
344 }
345
346 boolean isReadOnly() {
347 return this.readOnly;
348 }
349
350 boolean isFlushRequested() {
351 return this.flushRequested;
352 }
353
354 static final long HEAP_SIZE = ClassSize.align(
355 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
356 }
357
358 final WriteState writestate = new WriteState();
359
360 long memstoreFlushSize;
361 final long timestampSlop;
362 final long rowProcessorTimeout;
363 private volatile long lastFlushTime;
364 final RegionServerServices rsServices;
365 private RegionServerAccounting rsAccounting;
366 private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
367 private long flushCheckInterval;
368 private long blockingMemStoreSize;
369 final long threadWakeFrequency;
370
371 final ReentrantReadWriteLock lock =
372 new ReentrantReadWriteLock();
373
374
375 private final ReentrantReadWriteLock updatesLock =
376 new ReentrantReadWriteLock();
377 private boolean splitRequest;
378 private byte[] explicitSplitPoint = null;
379
380 private final MultiVersionConsistencyControl mvcc =
381 new MultiVersionConsistencyControl();
382
383
384 private RegionCoprocessorHost coprocessorHost;
385
386 private HTableDescriptor htableDescriptor = null;
387 private RegionSplitPolicy splitPolicy;
388
389 private final MetricsRegion metricsRegion;
390 private final MetricsRegionWrapperImpl metricsRegionWrapper;
391 private final boolean deferredLogSyncDisabled;
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414 @Deprecated
415 public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
416 final Configuration confParam, final HRegionInfo regionInfo,
417 final HTableDescriptor htd, final RegionServerServices rsServices) {
418 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
419 log, confParam, htd, rsServices);
420 }
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439 public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
440 final HTableDescriptor htd, final RegionServerServices rsServices) {
441 if (htd == null) {
442 throw new IllegalArgumentException("Need table descriptor");
443 }
444
445 if (confParam instanceof CompoundConfiguration) {
446 throw new IllegalArgumentException("Need original base configuration");
447 }
448
449 this.comparator = fs.getRegionInfo().getComparator();
450 this.log = log;
451 this.fs = fs;
452
453
454 this.baseConf = confParam;
455 this.conf = new CompoundConfiguration()
456 .add(confParam)
457 .addStringMap(htd.getConfiguration())
458 .addWritableMap(htd.getValues());
459 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
460 DEFAULT_CACHE_FLUSH_INTERVAL);
461 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
462 DEFAULT_ROWLOCK_WAIT_DURATION);
463
464 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
465 this.htableDescriptor = htd;
466 this.rsServices = rsServices;
467 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
468 setHTableSpecificConf();
469 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
470
471 this.busyWaitDuration = conf.getLong(
472 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
473 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
474 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
475 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
476 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
477 + maxBusyWaitMultiplier + "). Their product should be positive");
478 }
479 this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
480 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
481
482
483
484
485
486
487
488 this.timestampSlop = conf.getLong(
489 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
490 HConstants.LATEST_TIMESTAMP);
491
492
493
494
495
496 this.rowProcessorTimeout = conf.getLong(
497 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
498
499 this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
500 1 * 1000) <= 0;
501
502 if (rsServices != null) {
503 this.rsAccounting = this.rsServices.getRegionServerAccounting();
504
505
506 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
507 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
508 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
509 } else {
510 this.metricsRegionWrapper = null;
511 this.metricsRegion = null;
512 }
513 if (LOG.isDebugEnabled()) {
514
515 LOG.debug("Instantiated " + this);
516 }
517 }
518
519 void setHTableSpecificConf() {
520 if (this.htableDescriptor == null) return;
521 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
522
523 if (flushSize <= 0) {
524 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
525 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
526 }
527 this.memstoreFlushSize = flushSize;
528 this.blockingMemStoreSize = this.memstoreFlushSize *
529 conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
530 }
531
532
533
534
535
536
537
538
539
540 @Deprecated
541 public long initialize() throws IOException {
542 return initialize(null);
543 }
544
545
546
547
548
549
550
551
552 private long initialize(final CancelableProgressable reporter) throws IOException {
553 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
554 long nextSeqId = -1;
555 try {
556 nextSeqId = initializeRegionInternals(reporter, status);
557 return nextSeqId;
558 } finally {
559
560
561 if (nextSeqId == -1) {
562 status
563 .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
564 }
565 }
566 }
567
568 private long initializeRegionInternals(final CancelableProgressable reporter,
569 final MonitoredTask status) throws IOException, UnsupportedEncodingException {
570 if (coprocessorHost != null) {
571 status.setStatus("Running coprocessor pre-open hook");
572 coprocessorHost.preOpen();
573 }
574
575
576 status.setStatus("Writing region info on filesystem");
577 fs.checkRegionInfoOnFilesystem();
578
579
580 status.setStatus("Cleaning up temporary data from old regions");
581 fs.cleanupTempDir();
582
583
584 status.setStatus("Initializing all the Stores");
585 long maxSeqId = initializeRegionStores(reporter, status);
586
587 status.setStatus("Cleaning up detritus from prior splits");
588
589
590
591 fs.cleanupAnySplitDetritus();
592 fs.cleanupMergesDir();
593
594 this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
595 this.writestate.flushRequested = false;
596 this.writestate.compacting = 0;
597
598
599 this.splitPolicy = RegionSplitPolicy.create(this, conf);
600
601 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
602
603
604 long nextSeqid = maxSeqId + 1;
605 LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
606
607
608 this.closing.set(false);
609 this.closed.set(false);
610
611 if (coprocessorHost != null) {
612 status.setStatus("Running coprocessor post-open hooks");
613 coprocessorHost.postOpen();
614 }
615
616 status.markComplete("Region opened successfully");
617 return nextSeqid;
618 }
619
620 private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
621 throws IOException, UnsupportedEncodingException {
622
623
624
625
626
627
628 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
629 long maxSeqId = -1;
630
631 long maxMemstoreTS = -1;
632
633 if (!htableDescriptor.getFamilies().isEmpty()) {
634
635 ThreadPoolExecutor storeOpenerThreadPool =
636 getStoreOpenAndCloseThreadPool(
637 "StoreOpenerThread-" + this.getRegionNameAsString());
638 CompletionService<HStore> completionService =
639 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
640
641
642 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
643 status.setStatus("Instantiating store for column family " + family);
644 completionService.submit(new Callable<HStore>() {
645 public HStore call() throws IOException {
646 return instantiateHStore(family);
647 }
648 });
649 }
650 try {
651 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
652 Future<HStore> future = completionService.take();
653 HStore store = future.get();
654
655 this.stores.put(store.getColumnFamilyName().getBytes(), store);
656
657 long storeSeqIdForReplay = store.getMaxSequenceId(false);
658 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
659 storeSeqIdForReplay);
660 if (this.minSeqIdForLogReplay == -1 || storeSeqIdForReplay < this.minSeqIdForLogReplay) {
661 this.minSeqIdForLogReplay = storeSeqIdForReplay;
662 }
663
664 long storeSeqIdForAssignment = store.getMaxSequenceId(true);
665 if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
666 maxSeqId = storeSeqIdForAssignment;
667 }
668 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
669 if (maxStoreMemstoreTS > maxMemstoreTS) {
670 maxMemstoreTS = maxStoreMemstoreTS;
671 }
672 }
673 } catch (InterruptedException e) {
674 throw new IOException(e);
675 } catch (ExecutionException e) {
676 throw new IOException(e.getCause());
677 } finally {
678 storeOpenerThreadPool.shutdownNow();
679 }
680 }
681 mvcc.initialize(maxMemstoreTS + 1);
682
683 maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
684 this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
685 return maxSeqId;
686 }
687
688
689
690
691 public boolean hasReferences() {
692 for (Store store : this.stores.values()) {
693 if (store.hasReferences()) return true;
694 }
695 return false;
696 }
697
698
699
700
701
702
703 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
704 HDFSBlocksDistribution hdfsBlocksDistribution =
705 new HDFSBlocksDistribution();
706 synchronized (this.stores) {
707 for (Store store : this.stores.values()) {
708 for (StoreFile sf : store.getStorefiles()) {
709 HDFSBlocksDistribution storeFileBlocksDistribution =
710 sf.getHDFSBlockDistribution();
711 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
712 }
713 }
714 }
715 return hdfsBlocksDistribution;
716 }
717
718
719
720
721
722
723
724
725
726 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
727 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
728 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
729 Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf), tableDescriptor.getName());
730 FileSystem fs = tablePath.getFileSystem(conf);
731
732 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
733 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
734 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
735 if (storeFiles == null) continue;
736
737 for (StoreFileInfo storeFileInfo : storeFiles) {
738 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
739 }
740 }
741 return hdfsBlocksDistribution;
742 }
743
744 public AtomicLong getMemstoreSize() {
745 return memstoreSize;
746 }
747
748
749
750
751
752
753
754 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
755 if (this.rsAccounting != null) {
756 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
757 }
758 return this.memstoreSize.getAndAdd(memStoreSize);
759 }
760
761
762 public HRegionInfo getRegionInfo() {
763 return this.fs.getRegionInfo();
764 }
765
766
767
768
769
770 RegionServerServices getRegionServerServices() {
771 return this.rsServices;
772 }
773
774
775 long getReadRequestsCount() {
776 return this.readRequestsCount.get();
777 }
778
779
780 long getWriteRequestsCount() {
781 return this.writeRequestsCount.get();
782 }
783
784 MetricsRegion getMetrics() {
785 return metricsRegion;
786 }
787
788
789 public boolean isClosed() {
790 return this.closed.get();
791 }
792
793
794
795
796 public boolean isClosing() {
797 return this.closing.get();
798 }
799
800
801
802
803
804 public void setRecovering(boolean newState) {
805 this.getRegionInfo().setRecovering(newState);
806 }
807
808
809
810
811 public boolean isRecovering() {
812 return this.getRegionInfo().isRecovering();
813 }
814
815
816 public boolean isAvailable() {
817 return !isClosed() && !isClosing();
818 }
819
820
821 public boolean isSplittable() {
822 return isAvailable() && !hasReferences();
823 }
824
825
826
827
828 public boolean isMergeable() {
829 if (!isAvailable()) {
830 LOG.debug("Region " + this.getRegionNameAsString()
831 + " is not mergeable because it is closing or closed");
832 return false;
833 }
834 if (hasReferences()) {
835 LOG.debug("Region " + this.getRegionNameAsString()
836 + " is not mergeable because it has references");
837 return false;
838 }
839
840 return true;
841 }
842
843 public boolean areWritesEnabled() {
844 synchronized(this.writestate) {
845 return this.writestate.writesEnabled;
846 }
847 }
848
849 public MultiVersionConsistencyControl getMVCC() {
850 return mvcc;
851 }
852
853 public boolean isLoadingCfsOnDemandDefault() {
854 return this.isLoadingCfsOnDemandDefault;
855 }
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870 public Map<byte[], List<StoreFile>> close() throws IOException {
871 return close(false);
872 }
873
874 private final Object closeLock = new Object();
875
876
877 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
878 "hbase.regionserver.optionalcacheflushinterval";
879
880 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
897
898
899 MonitoredTask status = TaskMonitor.get().createStatus(
900 "Closing region " + this +
901 (abort ? " due to abort" : ""));
902
903 status.setStatus("Waiting for close lock");
904 try {
905 synchronized (closeLock) {
906 return doClose(abort, status);
907 }
908 } finally {
909 status.cleanup();
910 }
911 }
912
913 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
914 throws IOException {
915 if (isClosed()) {
916 LOG.warn("Region " + this + " already closed");
917 return null;
918 }
919
920 if (coprocessorHost != null) {
921 status.setStatus("Running coprocessor pre-close hooks");
922 this.coprocessorHost.preClose(abort);
923 }
924
925 status.setStatus("Disabling compacts and flushes for region");
926 boolean wasFlushing = false;
927 synchronized (writestate) {
928
929
930 writestate.writesEnabled = false;
931 wasFlushing = writestate.flushing;
932 LOG.debug("Closing " + this + ": disabling compactions & flushes");
933 waitForFlushesAndCompactions();
934 }
935
936
937
938 if (!abort && !wasFlushing && worthPreFlushing()) {
939 status.setStatus("Pre-flushing region before close");
940 LOG.info("Running close preflush of " + this.getRegionNameAsString());
941 internalFlushcache(status);
942 }
943
944 this.closing.set(true);
945 status.setStatus("Disabling writes for close");
946
947 lock.writeLock().lock();
948 try {
949 if (this.isClosed()) {
950 status.abort("Already got closed by another process");
951
952 return null;
953 }
954 LOG.debug("Updates disabled for region " + this);
955
956 if (!abort) {
957 internalFlushcache(status);
958 }
959
960 Map<byte[], List<StoreFile>> result =
961 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
962 if (!stores.isEmpty()) {
963
964 ThreadPoolExecutor storeCloserThreadPool =
965 getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
966 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
967 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
968
969
970 for (final Store store : stores.values()) {
971 completionService
972 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
973 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
974 return new Pair<byte[], Collection<StoreFile>>(
975 store.getFamily().getName(), store.close());
976 }
977 });
978 }
979 try {
980 for (int i = 0; i < stores.size(); i++) {
981 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
982 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
983 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
984 if (familyFiles == null) {
985 familyFiles = new ArrayList<StoreFile>();
986 result.put(storeFiles.getFirst(), familyFiles);
987 }
988 familyFiles.addAll(storeFiles.getSecond());
989 }
990 } catch (InterruptedException e) {
991 throw new IOException(e);
992 } catch (ExecutionException e) {
993 throw new IOException(e.getCause());
994 } finally {
995 storeCloserThreadPool.shutdownNow();
996 }
997 }
998 this.closed.set(true);
999
1000 if (coprocessorHost != null) {
1001 status.setStatus("Running coprocessor post-close hooks");
1002 this.coprocessorHost.postClose(abort);
1003 }
1004 if ( this.metricsRegion != null) {
1005 this.metricsRegion.close();
1006 }
1007 if ( this.metricsRegionWrapper != null) {
1008 Closeables.closeQuietly(this.metricsRegionWrapper);
1009 }
1010 status.markComplete("Closed");
1011 LOG.info("Closed " + this);
1012 return result;
1013 } finally {
1014 lock.writeLock().unlock();
1015 }
1016 }
1017
1018
1019
1020
1021
1022
1023 public void waitForFlushesAndCompactions() {
1024 synchronized (writestate) {
1025 while (writestate.compacting > 0 || writestate.flushing) {
1026 LOG.debug("waiting for " + writestate.compacting + " compactions"
1027 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1028 try {
1029 writestate.wait();
1030 } catch (InterruptedException iex) {
1031
1032 Thread.currentThread().interrupt();
1033 }
1034 }
1035 }
1036 }
1037
1038 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1039 final String threadNamePrefix) {
1040 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1041 int maxThreads = Math.min(numStores,
1042 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1043 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1044 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1045 }
1046
1047 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1048 final String threadNamePrefix) {
1049 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1050 int maxThreads = Math.max(1,
1051 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1052 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1053 / numStores);
1054 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1055 }
1056
1057 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1058 final String threadNamePrefix) {
1059 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1060 new ThreadFactory() {
1061 private int count = 1;
1062
1063 public Thread newThread(Runnable r) {
1064 return new Thread(r, threadNamePrefix + "-" + count++);
1065 }
1066 });
1067 }
1068
1069
1070
1071
1072 private boolean worthPreFlushing() {
1073 return this.memstoreSize.get() >
1074 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1075 }
1076
1077
1078
1079
1080
1081
1082 public byte [] getStartKey() {
1083 return this.getRegionInfo().getStartKey();
1084 }
1085
1086
1087 public byte [] getEndKey() {
1088 return this.getRegionInfo().getEndKey();
1089 }
1090
1091
1092 public long getRegionId() {
1093 return this.getRegionInfo().getRegionId();
1094 }
1095
1096
1097 public byte [] getRegionName() {
1098 return this.getRegionInfo().getRegionName();
1099 }
1100
1101
1102 public String getRegionNameAsString() {
1103 return this.getRegionInfo().getRegionNameAsString();
1104 }
1105
1106
1107 public HTableDescriptor getTableDesc() {
1108 return this.htableDescriptor;
1109 }
1110
1111
1112 public HLog getLog() {
1113 return this.log;
1114 }
1115
1116
1117
1118
1119
1120
1121
1122
1123 Configuration getBaseConf() {
1124 return this.baseConf;
1125 }
1126
1127
1128 public FileSystem getFilesystem() {
1129 return fs.getFileSystem();
1130 }
1131
1132
1133 public HRegionFileSystem getRegionFileSystem() {
1134 return this.fs;
1135 }
1136
1137
1138 public long getLastFlushTime() {
1139 return this.lastFlushTime;
1140 }
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150 public long getLargestHStoreSize() {
1151 long size = 0;
1152 for (Store h : stores.values()) {
1153 long storeSize = h.getSize();
1154 if (storeSize > size) {
1155 size = storeSize;
1156 }
1157 }
1158 return size;
1159 }
1160
1161
1162
1163
1164
1165 protected void doRegionCompactionPrep() throws IOException {
1166 }
1167
1168 void triggerMajorCompaction() {
1169 for (Store h : stores.values()) {
1170 h.triggerMajorCompaction();
1171 }
1172 }
1173
1174
1175
1176
1177
1178
1179
1180
1181 public void compactStores(final boolean majorCompaction)
1182 throws IOException {
1183 if (majorCompaction) {
1184 this.triggerMajorCompaction();
1185 }
1186 compactStores();
1187 }
1188
1189
1190
1191
1192
1193
1194
1195 public void compactStores() throws IOException {
1196 for (Store s : getStores().values()) {
1197 CompactionContext compaction = s.requestCompaction();
1198 if (compaction != null) {
1199 compact(compaction, s);
1200 }
1201 }
1202 }
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219 public boolean compact(CompactionContext compaction, Store store) throws IOException {
1220 assert compaction != null && compaction.hasSelection();
1221 assert !compaction.getRequest().getFiles().isEmpty();
1222 if (this.closing.get() || this.closed.get()) {
1223 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1224 store.cancelRequestedCompaction(compaction);
1225 return false;
1226 }
1227 MonitoredTask status = null;
1228 boolean didPerformCompaction = false;
1229
1230 lock.readLock().lock();
1231 try {
1232 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1233 if (this.closed.get()) {
1234 String msg = "Skipping compaction on " + this + " because closed";
1235 LOG.debug(msg);
1236 status.abort(msg);
1237 return false;
1238 }
1239 boolean wasStateSet = false;
1240 try {
1241 synchronized (writestate) {
1242 if (writestate.writesEnabled) {
1243 wasStateSet = true;
1244 ++writestate.compacting;
1245 } else {
1246 String msg = "NOT compacting region " + this + ". Writes disabled.";
1247 LOG.info(msg);
1248 status.abort(msg);
1249 return false;
1250 }
1251 }
1252 LOG.info("Starting compaction on " + store + " in region " + this
1253 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1254 doRegionCompactionPrep();
1255 try {
1256 status.setStatus("Compacting store " + store);
1257 didPerformCompaction = true;
1258 store.compact(compaction);
1259 } catch (InterruptedIOException iioe) {
1260 String msg = "compaction interrupted";
1261 LOG.info(msg, iioe);
1262 status.abort(msg);
1263 return false;
1264 }
1265 } finally {
1266 if (wasStateSet) {
1267 synchronized (writestate) {
1268 --writestate.compacting;
1269 if (writestate.compacting <= 0) {
1270 writestate.notifyAll();
1271 }
1272 }
1273 }
1274 }
1275 status.markComplete("Compaction complete");
1276 return true;
1277 } finally {
1278 try {
1279 if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1280 if (status != null) status.cleanup();
1281 } finally {
1282 lock.readLock().unlock();
1283 }
1284 }
1285 }
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307 public boolean flushcache() throws IOException {
1308
1309 if (this.closing.get()) {
1310 LOG.debug("Skipping flush on " + this + " because closing");
1311 return false;
1312 }
1313 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1314 status.setStatus("Acquiring readlock on region");
1315
1316 lock.readLock().lock();
1317 try {
1318 if (this.closed.get()) {
1319 LOG.debug("Skipping flush on " + this + " because closed");
1320 status.abort("Skipped: closed");
1321 return false;
1322 }
1323 if (coprocessorHost != null) {
1324 status.setStatus("Running coprocessor pre-flush hooks");
1325 coprocessorHost.preFlush();
1326 }
1327 if (numPutsWithoutWAL.get() > 0) {
1328 numPutsWithoutWAL.set(0);
1329 dataInMemoryWithoutWAL.set(0);
1330 }
1331 synchronized (writestate) {
1332 if (!writestate.flushing && writestate.writesEnabled) {
1333 this.writestate.flushing = true;
1334 } else {
1335 if (LOG.isDebugEnabled()) {
1336 LOG.debug("NOT flushing memstore for region " + this
1337 + ", flushing=" + writestate.flushing + ", writesEnabled="
1338 + writestate.writesEnabled);
1339 }
1340 status.abort("Not flushing since "
1341 + (writestate.flushing ? "already flushing"
1342 : "writes not enabled"));
1343 return false;
1344 }
1345 }
1346 try {
1347 boolean result = internalFlushcache(status);
1348
1349 if (coprocessorHost != null) {
1350 status.setStatus("Running post-flush coprocessor hooks");
1351 coprocessorHost.postFlush();
1352 }
1353
1354 status.markComplete("Flush successful");
1355 return result;
1356 } finally {
1357 synchronized (writestate) {
1358 writestate.flushing = false;
1359 this.writestate.flushRequested = false;
1360 writestate.notifyAll();
1361 }
1362 }
1363 } finally {
1364 lock.readLock().unlock();
1365 status.cleanup();
1366 }
1367 }
1368
1369
1370
1371
1372 boolean shouldFlush() {
1373 if (flushCheckInterval <= 0) {
1374 return false;
1375 }
1376 long now = EnvironmentEdgeManager.currentTimeMillis();
1377
1378 if ((now - getLastFlushTime() < flushCheckInterval)) {
1379 return false;
1380 }
1381
1382
1383 for (Store s : this.getStores().values()) {
1384 if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1385
1386 return true;
1387 }
1388 }
1389 return false;
1390 }
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427 protected boolean internalFlushcache(MonitoredTask status)
1428 throws IOException {
1429 return internalFlushcache(this.log, -1, status);
1430 }
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441 protected boolean internalFlushcache(
1442 final HLog wal, final long myseqid, MonitoredTask status)
1443 throws IOException {
1444 if (this.rsServices != null && this.rsServices.isAborted()) {
1445
1446 throw new IOException("Aborting flush because server is abortted...");
1447 }
1448 final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1449
1450
1451 this.lastFlushTime = startTime;
1452
1453 if (this.memstoreSize.get() <= 0) {
1454 return false;
1455 }
1456 if (LOG.isDebugEnabled()) {
1457 LOG.debug("Started memstore flush for " + this +
1458 ", current region memstore size " +
1459 StringUtils.humanReadableInt(this.memstoreSize.get()) +
1460 ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1461 }
1462
1463
1464
1465
1466
1467
1468
1469
1470 MultiVersionConsistencyControl.WriteEntry w = null;
1471
1472
1473
1474
1475 status.setStatus("Obtaining lock to block concurrent updates");
1476
1477 this.updatesLock.writeLock().lock();
1478 long flushsize = this.memstoreSize.get();
1479 status.setStatus("Preparing to flush by snapshotting stores");
1480 List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1481 long flushSeqId = -1L;
1482 try {
1483
1484 w = mvcc.beginMemstoreInsert();
1485 mvcc.advanceMemstore(w);
1486
1487 if (wal != null) {
1488 Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1489 if (startSeqId == null) {
1490 status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName()
1491 + "] - WAL is going away");
1492 return false;
1493 }
1494 flushSeqId = startSeqId.longValue();
1495 } else {
1496 flushSeqId = myseqid;
1497 }
1498
1499 for (Store s : stores.values()) {
1500 storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1501 }
1502
1503
1504 for (StoreFlushContext flush : storeFlushCtxs) {
1505 flush.prepare();
1506 }
1507 } finally {
1508 this.updatesLock.writeLock().unlock();
1509 }
1510 String s = "Finished snapshotting " + this +
1511 ", commencing wait for mvcc, flushsize=" + flushsize;
1512 status.setStatus(s);
1513 LOG.debug(s);
1514
1515
1516
1517 if (wal != null && isDeferredLogSyncEnabled()) {
1518 wal.sync();
1519 }
1520
1521
1522
1523
1524
1525
1526 mvcc.waitForRead(w);
1527
1528 status.setStatus("Flushing stores");
1529 LOG.debug("Finished snapshotting, commencing flushing stores");
1530
1531
1532
1533
1534
1535 boolean compactionRequested = false;
1536 try {
1537
1538
1539
1540
1541
1542 for (StoreFlushContext flush : storeFlushCtxs) {
1543 flush.flushCache(status);
1544 }
1545
1546
1547
1548 for (StoreFlushContext flush : storeFlushCtxs) {
1549 boolean needsCompaction = flush.commit(status);
1550 if (needsCompaction) {
1551 compactionRequested = true;
1552 }
1553 }
1554 storeFlushCtxs.clear();
1555
1556
1557 this.addAndGetGlobalMemstoreSize(-flushsize);
1558 } catch (Throwable t) {
1559
1560
1561
1562
1563
1564
1565 if (wal != null) {
1566 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1567 }
1568 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1569 Bytes.toStringBinary(getRegionName()));
1570 dse.initCause(t);
1571 status.abort("Flush failed: " + StringUtils.stringifyException(t));
1572 throw dse;
1573 }
1574
1575
1576 if (wal != null) {
1577 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1578 }
1579
1580
1581 if (this.rsServices != null) {
1582 completeSequenceId = flushSeqId;
1583 }
1584
1585
1586
1587 synchronized (this) {
1588 notifyAll();
1589 }
1590
1591 long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1592 long memstoresize = this.memstoreSize.get();
1593 String msg = "Finished memstore flush of ~" +
1594 StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
1595 ", currentsize=" +
1596 StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1597 " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1598 ", compaction requested=" + compactionRequested +
1599 ((wal == null)? "; wal=null": "");
1600 LOG.info(msg);
1601 status.setStatus(msg);
1602 this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
1603
1604 return compactionRequested;
1605 }
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619 Result getClosestRowBefore(final byte [] row)
1620 throws IOException{
1621 return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1622 }
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634 public Result getClosestRowBefore(final byte [] row, final byte [] family)
1635 throws IOException {
1636 if (coprocessorHost != null) {
1637 Result result = new Result();
1638 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1639 return result;
1640 }
1641 }
1642
1643
1644 checkRow(row, "getClosestRowBefore");
1645 startRegionOperation(Operation.GET);
1646 this.readRequestsCount.increment();
1647 try {
1648 Store store = getStore(family);
1649
1650 KeyValue key = store.getRowKeyAtOrBefore(row);
1651 Result result = null;
1652 if (key != null) {
1653 Get get = new Get(key.getRow());
1654 get.addFamily(family);
1655 result = get(get);
1656 }
1657 if (coprocessorHost != null) {
1658 coprocessorHost.postGetClosestRowBefore(row, family, result);
1659 }
1660 return result;
1661 } finally {
1662 closeRegionOperation();
1663 }
1664 }
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676 public RegionScanner getScanner(Scan scan) throws IOException {
1677 return getScanner(scan, null);
1678 }
1679
1680 void prepareScanner(Scan scan) throws IOException {
1681 if(!scan.hasFamilies()) {
1682
1683 for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1684 scan.addFamily(family);
1685 }
1686 }
1687 }
1688
1689 protected RegionScanner getScanner(Scan scan,
1690 List<KeyValueScanner> additionalScanners) throws IOException {
1691 startRegionOperation(Operation.SCAN);
1692 try {
1693
1694 prepareScanner(scan);
1695 if(scan.hasFamilies()) {
1696 for(byte [] family : scan.getFamilyMap().keySet()) {
1697 checkFamily(family);
1698 }
1699 }
1700 return instantiateRegionScanner(scan, additionalScanners);
1701 } finally {
1702 closeRegionOperation();
1703 }
1704 }
1705
1706 protected RegionScanner instantiateRegionScanner(Scan scan,
1707 List<KeyValueScanner> additionalScanners) throws IOException {
1708 return new RegionScannerImpl(scan, additionalScanners, this);
1709 }
1710
1711
1712
1713
1714 void prepareDelete(Delete delete) throws IOException {
1715
1716 if(delete.getFamilyMap().isEmpty()){
1717 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1718
1719 delete.deleteFamily(family, delete.getTimeStamp());
1720 }
1721 } else {
1722 for(byte [] family : delete.getFamilyMap().keySet()) {
1723 if(family == null) {
1724 throw new NoSuchColumnFamilyException("Empty family is invalid");
1725 }
1726 checkFamily(family);
1727 }
1728 }
1729 }
1730
1731
1732
1733
1734
1735
1736
1737
1738 public void delete(Delete delete)
1739 throws IOException {
1740 checkReadOnly();
1741 checkResources();
1742 startRegionOperation(Operation.DELETE);
1743 this.writeRequestsCount.increment();
1744 try {
1745 delete.getRow();
1746
1747 doBatchMutate(delete, null);
1748 } finally {
1749 closeRegionOperation();
1750 }
1751 }
1752
1753
1754
1755
1756 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
1757
1758
1759
1760
1761
1762
1763
1764 void delete(NavigableMap<byte[], List<? extends Cell>> familyMap, UUID clusterId,
1765 Durability durability) throws IOException {
1766 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
1767 delete.setFamilyMap(familyMap);
1768 delete.setClusterId(clusterId);
1769 delete.setDurability(durability);
1770 doBatchMutate(delete, null);
1771 }
1772
1773
1774
1775
1776
1777
1778
1779
1780 void prepareDeleteTimestamps(Map<byte[], List<? extends Cell>> familyMap, byte[] byteNow)
1781 throws IOException {
1782 for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
1783
1784 byte[] family = e.getKey();
1785 List<? extends Cell> cells = e.getValue();
1786 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1787
1788 for (Cell cell: cells) {
1789 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1790
1791
1792 if (kv.isLatestTimestamp() && kv.isDeleteType()) {
1793 byte[] qual = kv.getQualifier();
1794 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
1795
1796 Integer count = kvCount.get(qual);
1797 if (count == null) {
1798 kvCount.put(qual, 1);
1799 } else {
1800 kvCount.put(qual, count + 1);
1801 }
1802 count = kvCount.get(qual);
1803
1804 Get get = new Get(kv.getRow());
1805 get.setMaxVersions(count);
1806 get.addColumn(family, qual);
1807
1808 List<KeyValue> result = get(get, false);
1809
1810 if (result.size() < count) {
1811
1812 kv.updateLatestStamp(byteNow);
1813 continue;
1814 }
1815 if (result.size() > count) {
1816 throw new RuntimeException("Unexpected size: " + result.size());
1817 }
1818 KeyValue getkv = result.get(count - 1);
1819 Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
1820 getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
1821 } else {
1822 kv.updateLatestStamp(byteNow);
1823 }
1824 }
1825 }
1826 }
1827
1828
1829
1830
1831
1832 public void put(Put put)
1833 throws IOException {
1834 checkReadOnly();
1835
1836
1837
1838
1839
1840 checkResources();
1841 startRegionOperation(Operation.PUT);
1842 this.writeRequestsCount.increment();
1843 try {
1844
1845 doBatchMutate(put, null);
1846 } finally {
1847 closeRegionOperation();
1848 }
1849 }
1850
1851
1852
1853
1854
1855
1856 private static class BatchOperationInProgress<T> {
1857 T[] operations;
1858 int nextIndexToProcess = 0;
1859 OperationStatus[] retCodeDetails;
1860 WALEdit[] walEditsFromCoprocessors;
1861
1862 public BatchOperationInProgress(T[] operations) {
1863 this.operations = operations;
1864 this.retCodeDetails = new OperationStatus[operations.length];
1865 this.walEditsFromCoprocessors = new WALEdit[operations.length];
1866 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
1867 }
1868
1869 public boolean isDone() {
1870 return nextIndexToProcess == operations.length;
1871 }
1872 }
1873
1874
1875
1876
1877
1878 public OperationStatus[] put(Put[] puts) throws IOException {
1879 @SuppressWarnings("unchecked")
1880 Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
1881
1882 for (int i = 0; i < puts.length; i++) {
1883 putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
1884 }
1885 return batchMutate(putsAndLocks);
1886 }
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897 public OperationStatus[] batchMutate(
1898 Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
1899 return batchMutate(mutationsAndLocks, false);
1900 }
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911 OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
1912 throws IOException {
1913 BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
1914 new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
1915
1916 boolean initialized = false;
1917
1918 while (!batchOp.isDone()) {
1919 if (!isReplay) {
1920 checkReadOnly();
1921 }
1922 checkResources();
1923
1924 long newSize;
1925 startRegionOperation();
1926
1927 try {
1928 if (!initialized) {
1929 if (!isReplay) {
1930 this.writeRequestsCount.increment();
1931 doPreMutationHook(batchOp);
1932 }
1933 initialized = true;
1934 }
1935 long addedSize = doMiniBatchMutation(batchOp, isReplay);
1936 newSize = this.addAndGetGlobalMemstoreSize(addedSize);
1937 } finally {
1938 closeRegionOperation();
1939 }
1940 if (isFlushSize(newSize)) {
1941 requestFlush();
1942 }
1943 }
1944 return batchOp.retCodeDetails;
1945 }
1946
1947
1948 private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
1949 throws IOException {
1950
1951 WALEdit walEdit = new WALEdit();
1952 if (coprocessorHost != null) {
1953 for (int i = 0 ; i < batchOp.operations.length; i++) {
1954 Pair<Mutation, Integer> nextPair = batchOp.operations[i];
1955 Mutation m = nextPair.getFirst();
1956 if (m instanceof Put) {
1957 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
1958
1959
1960 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
1961 }
1962 } else if (m instanceof Delete) {
1963 if (coprocessorHost.preDelete((Delete) m, walEdit, m.getDurability())) {
1964
1965
1966 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
1967 }
1968 } else {
1969
1970
1971
1972 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
1973 "Put/Delete mutations only supported in batchMutate() now");
1974 }
1975 if (!walEdit.isEmpty()) {
1976 batchOp.walEditsFromCoprocessors[i] = walEdit;
1977 walEdit = new WALEdit();
1978 }
1979 }
1980 }
1981 }
1982
1983 @SuppressWarnings("unchecked")
1984 private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
1985 boolean isInReplay) throws IOException {
1986
1987
1988 boolean putsCfSetConsistent = true;
1989
1990 Set<byte[]> putsCfSet = null;
1991
1992 boolean deletesCfSetConsistent = true;
1993
1994 Set<byte[]> deletesCfSet = null;
1995
1996 WALEdit walEdit = new WALEdit(isInReplay);
1997 MultiVersionConsistencyControl.WriteEntry w = null;
1998 long txid = 0;
1999 boolean walSyncSuccessful = false;
2000 boolean locked = false;
2001
2002
2003 List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2004
2005 Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
2006
2007 int firstIndex = batchOp.nextIndexToProcess;
2008 int lastIndexExclusive = firstIndex;
2009 boolean success = false;
2010 int noOfPuts = 0, noOfDeletes = 0;
2011 try {
2012
2013
2014
2015
2016 int numReadyToWrite = 0;
2017 long now = EnvironmentEdgeManager.currentTimeMillis();
2018 while (lastIndexExclusive < batchOp.operations.length) {
2019 Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
2020 Mutation mutation = nextPair.getFirst();
2021 boolean isPutMutation = mutation instanceof Put;
2022 Integer providedLockId = nextPair.getSecond();
2023
2024 Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
2025
2026 familyMaps[lastIndexExclusive] = familyMap;
2027
2028
2029 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2030 != OperationStatusCode.NOT_RUN) {
2031 lastIndexExclusive++;
2032 continue;
2033 }
2034
2035 try {
2036 if (isPutMutation) {
2037
2038 if (isInReplay) {
2039 removeNonExistentColumnFamilyForReplay(familyMap);
2040 } else {
2041 checkFamilies(familyMap.keySet());
2042 }
2043 checkTimestamps(mutation.getFamilyMap(), now);
2044 } else {
2045 prepareDelete((Delete) mutation);
2046 }
2047 } catch (NoSuchColumnFamilyException nscf) {
2048 LOG.warn("No such column family in batch mutation", nscf);
2049 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2050 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2051 lastIndexExclusive++;
2052 continue;
2053 } catch (FailedSanityCheckException fsce) {
2054 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2055 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2056 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2057 lastIndexExclusive++;
2058 continue;
2059 }
2060
2061
2062 boolean shouldBlock = numReadyToWrite == 0;
2063 Integer acquiredLockId = null;
2064 try {
2065 acquiredLockId = getLock(providedLockId, mutation.getRow(),
2066 shouldBlock);
2067 } catch (IOException ioe) {
2068 LOG.warn("Failed getting lock in batch put, row="
2069 + Bytes.toStringBinary(mutation.getRow()), ioe);
2070 }
2071 if (acquiredLockId == null) {
2072
2073 assert !shouldBlock : "Should never fail to get lock when blocking";
2074 break;
2075 }
2076 if (providedLockId == null) {
2077 acquiredLocks.add(acquiredLockId);
2078 }
2079 lastIndexExclusive++;
2080 numReadyToWrite++;
2081
2082 if (isPutMutation) {
2083
2084
2085
2086 if (putsCfSet == null) {
2087 putsCfSet = mutation.getFamilyMap().keySet();
2088 } else {
2089 putsCfSetConsistent = putsCfSetConsistent
2090 && mutation.getFamilyMap().keySet().equals(putsCfSet);
2091 }
2092 } else {
2093 if (deletesCfSet == null) {
2094 deletesCfSet = mutation.getFamilyMap().keySet();
2095 } else {
2096 deletesCfSetConsistent = deletesCfSetConsistent
2097 && mutation.getFamilyMap().keySet().equals(deletesCfSet);
2098 }
2099 }
2100 }
2101
2102
2103
2104 now = EnvironmentEdgeManager.currentTimeMillis();
2105 byte[] byteNow = Bytes.toBytes(now);
2106
2107
2108 if (numReadyToWrite <= 0) return 0L;
2109
2110
2111
2112
2113
2114
2115 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2116
2117 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2118 != OperationStatusCode.NOT_RUN) continue;
2119
2120 Mutation mutation = batchOp.operations[i].getFirst();
2121 if (mutation instanceof Put) {
2122 updateKVTimestamps(familyMaps[i].values(), byteNow);
2123 noOfPuts++;
2124 } else {
2125 prepareDeleteTimestamps(familyMaps[i], byteNow);
2126 noOfDeletes++;
2127 }
2128 }
2129
2130 lock(this.updatesLock.readLock(), numReadyToWrite);
2131 locked = true;
2132
2133
2134
2135
2136
2137 w = mvcc.beginMemstoreInsert();
2138
2139
2140 if (!isInReplay && coprocessorHost != null) {
2141 MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
2142 new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
2143 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2144 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2145 }
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156 long addedSize = 0;
2157 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2158 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2159 != OperationStatusCode.NOT_RUN) {
2160 continue;
2161 }
2162 addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2163 }
2164
2165
2166
2167
2168 Durability durability = Durability.USE_DEFAULT;
2169 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2170
2171 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2172 != OperationStatusCode.NOT_RUN) {
2173 continue;
2174 }
2175 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2176
2177 Mutation m = batchOp.operations[i].getFirst();
2178 Durability tmpDur = m.getDurability();
2179 if (tmpDur.ordinal() > durability.ordinal()) {
2180 durability = tmpDur;
2181 }
2182 if (tmpDur == Durability.SKIP_WAL) {
2183 if (m instanceof Put) {
2184 recordPutWithoutWal(m.getFamilyMap());
2185 }
2186 continue;
2187 }
2188
2189
2190 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2191 if (fromCP != null) {
2192 for (KeyValue kv : fromCP.getKeyValues()) {
2193 walEdit.add(kv);
2194 }
2195 }
2196 addFamilyMapToWALEdit(familyMaps[i], walEdit);
2197 }
2198
2199
2200
2201
2202 Mutation first = batchOp.operations[firstIndex].getFirst();
2203 txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
2204 walEdit, first.getClusterId(), now, this.htableDescriptor);
2205
2206
2207
2208
2209 if (locked) {
2210 this.updatesLock.readLock().unlock();
2211 locked = false;
2212 }
2213 if (acquiredLocks != null) {
2214 for (Integer toRelease : acquiredLocks) {
2215 releaseRowLock(toRelease);
2216 }
2217 acquiredLocks = null;
2218 }
2219
2220
2221
2222 if (walEdit.size() > 0) {
2223 syncOrDefer(txid, durability);
2224 }
2225 walSyncSuccessful = true;
2226
2227 if (!isInReplay && coprocessorHost != null) {
2228 MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
2229 new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
2230 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2231 coprocessorHost.postBatchMutate(miniBatchOp);
2232 }
2233
2234
2235
2236
2237 if (w != null) {
2238 mvcc.completeMemstoreInsert(w);
2239 w = null;
2240 }
2241
2242
2243
2244
2245
2246 if (!isInReplay && coprocessorHost != null) {
2247 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2248
2249 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2250 != OperationStatusCode.SUCCESS) {
2251 continue;
2252 }
2253 Mutation m = batchOp.operations[i].getFirst();
2254 if (m instanceof Put) {
2255 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2256 } else {
2257 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2258 }
2259 }
2260 }
2261
2262 success = true;
2263 return addedSize;
2264 } finally {
2265
2266
2267 if (!walSyncSuccessful) {
2268 rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2269 }
2270 if (w != null) mvcc.completeMemstoreInsert(w);
2271
2272 if (locked) {
2273 this.updatesLock.readLock().unlock();
2274 }
2275
2276 if (acquiredLocks != null) {
2277 for (Integer toRelease : acquiredLocks) {
2278 releaseRowLock(toRelease);
2279 }
2280 }
2281
2282
2283
2284
2285
2286
2287
2288 if (noOfPuts > 0) {
2289
2290 if (this.metricsRegion != null) {
2291 this.metricsRegion.updatePut();
2292 }
2293 }
2294 if (noOfDeletes > 0) {
2295
2296 if (this.metricsRegion != null) {
2297 this.metricsRegion.updateDelete();
2298 }
2299 }
2300 if (!success) {
2301 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2302 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2303 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2304 }
2305 }
2306 }
2307 batchOp.nextIndexToProcess = lastIndexExclusive;
2308 }
2309 }
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2328 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2329 boolean writeToWAL)
2330 throws IOException{
2331 checkReadOnly();
2332
2333
2334 checkResources();
2335 boolean isPut = w instanceof Put;
2336 if (!isPut && !(w instanceof Delete))
2337 throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException("Action must be Put or Delete");
2338 Row r = (Row)w;
2339 if (!Bytes.equals(row, r.getRow())) {
2340 throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException("Action's getRow must match the passed row");
2341 }
2342
2343 startRegionOperation();
2344 try {
2345 Get get = new Get(row);
2346 checkFamily(family);
2347 get.addColumn(family, qualifier);
2348
2349
2350 Integer lid = getLock(null, get.getRow(), true);
2351
2352 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2353 List<KeyValue> result = null;
2354 try {
2355 result = get(get, false);
2356
2357 boolean valueIsNull = comparator.getValue() == null ||
2358 comparator.getValue().length == 0;
2359 boolean matches = false;
2360 if (result.size() == 0 && valueIsNull) {
2361 matches = true;
2362 } else if (result.size() > 0 && result.get(0).getValue().length == 0 &&
2363 valueIsNull) {
2364 matches = true;
2365 } else if (result.size() == 1 && !valueIsNull) {
2366 KeyValue kv = result.get(0);
2367 int compareResult = comparator.compareTo(kv.getBuffer(),
2368 kv.getValueOffset(), kv.getValueLength());
2369 switch (compareOp) {
2370 case LESS:
2371 matches = compareResult <= 0;
2372 break;
2373 case LESS_OR_EQUAL:
2374 matches = compareResult < 0;
2375 break;
2376 case EQUAL:
2377 matches = compareResult == 0;
2378 break;
2379 case NOT_EQUAL:
2380 matches = compareResult != 0;
2381 break;
2382 case GREATER_OR_EQUAL:
2383 matches = compareResult > 0;
2384 break;
2385 case GREATER:
2386 matches = compareResult >= 0;
2387 break;
2388 default:
2389 throw new RuntimeException("Unknown Compare op " + compareOp.name());
2390 }
2391 }
2392
2393 if (matches) {
2394
2395
2396 doBatchMutate((Mutation)w, lid);
2397 this.checkAndMutateChecksPassed.increment();
2398 return true;
2399 }
2400 this.checkAndMutateChecksFailed.increment();
2401 return false;
2402 } finally {
2403 releaseRowLock(lid);
2404 }
2405 } finally {
2406 closeRegionOperation();
2407 }
2408 }
2409
2410 @SuppressWarnings("unchecked")
2411 private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
2412 org.apache.hadoop.hbase.exceptions.DoNotRetryIOException {
2413 Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] {
2414 new Pair<Mutation, Integer>(mutation, lid)
2415 };
2416 OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
2417 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2418 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2419 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2420 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2421 }
2422 }
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437 public void addRegionToSnapshot(SnapshotDescription desc,
2438 ForeignExceptionSnare exnSnare) throws IOException {
2439
2440
2441 Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2442 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2443
2444
2445 LOG.debug("Storing region-info for snapshot.");
2446 HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
2447 this.fs.getFileSystem(), snapshotDir, getRegionInfo());
2448
2449
2450 LOG.debug("Creating references for hfiles");
2451
2452
2453
2454
2455
2456
2457 for (Store store : stores.values()) {
2458
2459 Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
2460 List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
2461 if (LOG.isDebugEnabled()) {
2462 LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
2463 }
2464
2465
2466 int sz = storeFiles.size();
2467 for (int i = 0; i < sz; i++) {
2468 if (exnSnare != null) {
2469 exnSnare.rethrowException();
2470 }
2471 Path file = storeFiles.get(i).getPath();
2472
2473
2474
2475
2476 LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2477 Path referenceFile = new Path(dstStoreDir, file.getName());
2478 boolean success = fs.getFileSystem().createNewFile(referenceFile);
2479 if (!success) {
2480 throw new IOException("Failed to create reference file:" + referenceFile);
2481 }
2482 }
2483 }
2484 }
2485
2486
2487
2488
2489
2490 void updateKVTimestamps(final Iterable<List<? extends Cell>> keyLists, final byte[] now) {
2491 for (List<? extends Cell> cells: keyLists) {
2492 if (cells == null) continue;
2493 for (Cell cell : cells) {
2494 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2495 kv.updateLatestStamp(now);
2496 }
2497 }
2498 }
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509 private void checkResources()
2510 throws RegionTooBusyException, InterruptedIOException {
2511
2512
2513 if (this.getRegionInfo().isMetaRegion()) return;
2514
2515 boolean blocked = false;
2516 long startTime = 0;
2517 while (this.memstoreSize.get() > this.blockingMemStoreSize) {
2518 requestFlush();
2519 if (!blocked) {
2520 startTime = EnvironmentEdgeManager.currentTimeMillis();
2521 LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
2522 "' on region " + Bytes.toStringBinary(getRegionName()) +
2523 ": memstore size " +
2524 StringUtils.humanReadableInt(this.memstoreSize.get()) +
2525 " is >= than blocking " +
2526 StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
2527 }
2528 long now = EnvironmentEdgeManager.currentTimeMillis();
2529 long timeToWait = startTime + busyWaitDuration - now;
2530 if (timeToWait <= 0L) {
2531 final long totalTime = now - startTime;
2532 this.updatesBlockedMs.add(totalTime);
2533 LOG.info("Failed to unblock updates for region " + this + " '"
2534 + Thread.currentThread().getName() + "' in " + totalTime
2535 + "ms. The region is still busy.");
2536 throw new RegionTooBusyException("region is flushing");
2537 }
2538 blocked = true;
2539 synchronized(this) {
2540 try {
2541 wait(Math.min(timeToWait, threadWakeFrequency));
2542 } catch (InterruptedException ie) {
2543 final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2544 if (totalTime > 0) {
2545 this.updatesBlockedMs.add(totalTime);
2546 }
2547 LOG.info("Interrupted while waiting to unblock updates for region "
2548 + this + " '" + Thread.currentThread().getName() + "'");
2549 InterruptedIOException iie = new InterruptedIOException();
2550 iie.initCause(ie);
2551 throw iie;
2552 }
2553 }
2554 }
2555 if (blocked) {
2556
2557 final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
2558 if(totalTime > 0 ){
2559 this.updatesBlockedMs.add(totalTime);
2560 }
2561 LOG.info("Unblocking updates for region " + this + " '"
2562 + Thread.currentThread().getName() + "'");
2563 }
2564 }
2565
2566
2567
2568
2569 protected void checkReadOnly() throws IOException {
2570 if (this.writestate.isReadOnly()) {
2571 throw new IOException("region is read only");
2572 }
2573 }
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583 private void put(final byte [] row, byte [] family, List<? extends Cell> edits)
2584 throws IOException {
2585 NavigableMap<byte[], List<? extends Cell>> familyMap;
2586 familyMap = new TreeMap<byte[], List<? extends Cell>>(Bytes.BYTES_COMPARATOR);
2587
2588 familyMap.put(family, edits);
2589 Put p = new Put(row);
2590 p.setFamilyMap(familyMap);
2591 p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
2592 doBatchMutate(p, null);
2593 }
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607 private long applyFamilyMapToMemstore(Map<byte[], List<? extends Cell>> familyMap,
2608 MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
2609 long size = 0;
2610 boolean freemvcc = false;
2611
2612 try {
2613 if (localizedWriteEntry == null) {
2614 localizedWriteEntry = mvcc.beginMemstoreInsert();
2615 freemvcc = true;
2616 }
2617
2618 for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
2619 byte[] family = e.getKey();
2620 List<? extends Cell> cells = e.getValue();
2621
2622 Store store = getStore(family);
2623 for (Cell cell: cells) {
2624 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2625 kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
2626 size += store.add(kv);
2627 }
2628 }
2629 } finally {
2630 if (freemvcc) {
2631 mvcc.completeMemstoreInsert(localizedWriteEntry);
2632 }
2633 }
2634
2635 return size;
2636 }
2637
2638
2639
2640
2641
2642
2643 private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
2644 Map<byte[], List<? extends Cell>>[] familyMaps,
2645 int start, int end) {
2646 int kvsRolledback = 0;
2647 for (int i = start; i < end; i++) {
2648
2649 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2650 != OperationStatusCode.SUCCESS) {
2651 continue;
2652 }
2653
2654
2655 Map<byte[], List<? extends Cell>> familyMap = familyMaps[i];
2656 for (Map.Entry<byte[], List<? extends Cell>> e : familyMap.entrySet()) {
2657 byte[] family = e.getKey();
2658 List<? extends Cell> cells = e.getValue();
2659
2660
2661
2662
2663 Store store = getStore(family);
2664 for (Cell cell: cells) {
2665 store.rollback(KeyValueUtil.ensureKeyValue(cell));
2666 kvsRolledback++;
2667 }
2668 }
2669 }
2670 LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
2671 " keyvalues from start:" + start + " to end:" + end);
2672 }
2673
2674
2675
2676
2677
2678 void checkFamilies(Collection<byte[]> families)
2679 throws NoSuchColumnFamilyException {
2680 for (byte[] family : families) {
2681 checkFamily(family);
2682 }
2683 }
2684
2685
2686
2687
2688
2689 private void removeNonExistentColumnFamilyForReplay(
2690 final Map<byte[], List<? extends Cell>> familyMap) {
2691 List<byte[]> nonExistentList = null;
2692 for (byte[] family : familyMap.keySet()) {
2693 if (!this.htableDescriptor.hasFamily(family)) {
2694 if (nonExistentList == null) {
2695 nonExistentList = new ArrayList<byte[]>();
2696 }
2697 nonExistentList.add(family);
2698 }
2699 }
2700 if (nonExistentList != null) {
2701 for (byte[] family : nonExistentList) {
2702
2703 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
2704 familyMap.remove(family);
2705 }
2706 }
2707 }
2708
2709 void checkTimestamps(final Map<byte[], List<? extends Cell>> familyMap,
2710 long now) throws FailedSanityCheckException {
2711 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
2712 return;
2713 }
2714 long maxTs = now + timestampSlop;
2715 for (List<? extends Cell> kvs : familyMap.values()) {
2716 for (Cell cell : kvs) {
2717
2718 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2719 if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
2720 throw new FailedSanityCheckException("Timestamp for KV out of range "
2721 + cell + " (too.new=" + timestampSlop + ")");
2722 }
2723 }
2724 }
2725 }
2726
2727
2728
2729
2730
2731
2732
2733 private void addFamilyMapToWALEdit(Map<byte[], List<? extends Cell>> familyMap,
2734 WALEdit walEdit) {
2735 for (List<? extends Cell> edits : familyMap.values()) {
2736 for (Cell cell : edits) {
2737 walEdit.add(KeyValueUtil.ensureKeyValue(cell));
2738 }
2739 }
2740 }
2741
2742 private void requestFlush() {
2743 if (this.rsServices == null) {
2744 return;
2745 }
2746 synchronized (writestate) {
2747 if (this.writestate.isFlushRequested()) {
2748 return;
2749 }
2750 writestate.flushRequested = true;
2751 }
2752
2753 this.rsServices.getFlushRequester().requestFlush(this);
2754 if (LOG.isDebugEnabled()) {
2755 LOG.debug("Flush requested on " + this);
2756 }
2757 }
2758
2759
2760
2761
2762
2763 private boolean isFlushSize(final long size) {
2764 return size > this.memstoreFlushSize;
2765 }
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803 protected long replayRecoveredEditsIfAny(final Path regiondir,
2804 Map<byte[], Long> maxSeqIdInStores,
2805 final CancelableProgressable reporter, final MonitoredTask status)
2806 throws UnsupportedEncodingException, IOException {
2807 long minSeqIdForTheRegion = -1;
2808 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
2809 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
2810 minSeqIdForTheRegion = maxSeqIdInStore;
2811 }
2812 }
2813 long seqid = minSeqIdForTheRegion;
2814
2815 FileSystem fs = this.fs.getFileSystem();
2816 NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
2817 if (files == null || files.isEmpty()) return seqid;
2818
2819 for (Path edits: files) {
2820 if (edits == null || !fs.exists(edits)) {
2821 LOG.warn("Null or non-existent edits file: " + edits);
2822 continue;
2823 }
2824 if (isZeroLengthThenDelete(fs, edits)) continue;
2825
2826 long maxSeqId = Long.MAX_VALUE;
2827 String fileName = edits.getName();
2828 maxSeqId = Math.abs(Long.parseLong(fileName));
2829 if (maxSeqId <= minSeqIdForTheRegion) {
2830 String msg = "Maximum sequenceid for this log is " + maxSeqId
2831 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
2832 + ", skipped the whole file, path=" + edits;
2833 LOG.debug(msg);
2834 continue;
2835 }
2836
2837 try {
2838 seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
2839 } catch (IOException e) {
2840 boolean skipErrors = conf.getBoolean(
2841 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
2842 conf.getBoolean(
2843 "hbase.skip.errors",
2844 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
2845 if (conf.get("hbase.skip.errors") != null) {
2846 LOG.warn(
2847 "The property 'hbase.skip.errors' has been deprecated. Please use " +
2848 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
2849 }
2850 if (skipErrors) {
2851 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
2852 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
2853 + "=true so continuing. Renamed " + edits +
2854 " as " + p, e);
2855 } else {
2856 throw e;
2857 }
2858 }
2859
2860
2861 if (this.rsAccounting != null) {
2862 this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
2863 }
2864 }
2865 if (seqid > minSeqIdForTheRegion) {
2866
2867 internalFlushcache(null, seqid, status);
2868 }
2869
2870 for (Path file: files) {
2871 if (!fs.delete(file, false)) {
2872 LOG.error("Failed delete of " + file);
2873 } else {
2874 LOG.debug("Deleted recovered.edits file=" + file);
2875 }
2876 }
2877 return seqid;
2878 }
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889 private long replayRecoveredEdits(final Path edits,
2890 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
2891 throws IOException {
2892 String msg = "Replaying edits from " + edits;
2893 LOG.info(msg);
2894 MonitoredTask status = TaskMonitor.get().createStatus(msg);
2895 FileSystem fs = this.fs.getFileSystem();
2896
2897 status.setStatus("Opening logs");
2898 HLog.Reader reader = null;
2899 try {
2900 reader = HLogFactory.createReader(fs, edits, conf);
2901 long currentEditSeqId = -1;
2902 long firstSeqIdInLog = -1;
2903 long skippedEdits = 0;
2904 long editsCount = 0;
2905 long intervalEdits = 0;
2906 HLog.Entry entry;
2907 Store store = null;
2908 boolean reported_once = false;
2909
2910 try {
2911
2912 int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2913 2000);
2914
2915 int period = this.conf.getInt("hbase.hstore.report.period",
2916 this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
2917 180000) / 2);
2918 long lastReport = EnvironmentEdgeManager.currentTimeMillis();
2919
2920 while ((entry = reader.next()) != null) {
2921 HLogKey key = entry.getKey();
2922 WALEdit val = entry.getEdit();
2923
2924 if (reporter != null) {
2925 intervalEdits += val.size();
2926 if (intervalEdits >= interval) {
2927
2928 intervalEdits = 0;
2929 long cur = EnvironmentEdgeManager.currentTimeMillis();
2930 if (lastReport + period <= cur) {
2931 status.setStatus("Replaying edits..." +
2932 " skipped=" + skippedEdits +
2933 " edits=" + editsCount);
2934
2935 if(!reporter.progress()) {
2936 msg = "Progressable reporter failed, stopping replay";
2937 LOG.warn(msg);
2938 status.abort(msg);
2939 throw new IOException(msg);
2940 }
2941 reported_once = true;
2942 lastReport = cur;
2943 }
2944 }
2945 }
2946
2947
2948
2949 if (coprocessorHost != null) {
2950 status.setStatus("Running pre-WAL-restore hook in coprocessors");
2951 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
2952
2953 continue;
2954 }
2955 }
2956
2957 if (firstSeqIdInLog == -1) {
2958 firstSeqIdInLog = key.getLogSeqNum();
2959 }
2960 boolean flush = false;
2961 for (KeyValue kv: val.getKeyValues()) {
2962
2963
2964 if (kv.matchingFamily(WALEdit.METAFAMILY) ||
2965 !Bytes.equals(key.getEncodedRegionName(),
2966 this.getRegionInfo().getEncodedNameAsBytes())) {
2967
2968 CompactionDescriptor compaction = WALEdit.getCompaction(kv);
2969 if (compaction != null) {
2970
2971 completeCompactionMarker(compaction);
2972 }
2973
2974 skippedEdits++;
2975 continue;
2976 }
2977
2978 if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
2979 store = this.stores.get(kv.getFamily());
2980 }
2981 if (store == null) {
2982
2983
2984 LOG.warn("No family for " + kv);
2985 skippedEdits++;
2986 continue;
2987 }
2988
2989 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
2990 .getName())) {
2991 skippedEdits++;
2992 continue;
2993 }
2994 currentEditSeqId = key.getLogSeqNum();
2995
2996
2997
2998 flush = restoreEdit(store, kv);
2999 editsCount++;
3000 }
3001 if (flush) internalFlushcache(null, currentEditSeqId, status);
3002
3003 if (coprocessorHost != null) {
3004 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3005 }
3006 }
3007 } catch (EOFException eof) {
3008 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3009 msg = "Encountered EOF. Most likely due to Master failure during " +
3010 "log spliting, so we have this data in another edit. " +
3011 "Continuing, but renaming " + edits + " as " + p;
3012 LOG.warn(msg, eof);
3013 status.abort(msg);
3014 } catch (IOException ioe) {
3015
3016
3017 if (ioe.getCause() instanceof ParseException) {
3018 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3019 msg = "File corruption encountered! " +
3020 "Continuing, but renaming " + edits + " as " + p;
3021 LOG.warn(msg, ioe);
3022 status.setStatus(msg);
3023 } else {
3024 status.abort(StringUtils.stringifyException(ioe));
3025
3026
3027 throw ioe;
3028 }
3029 }
3030 if (reporter != null && !reported_once) {
3031 reporter.progress();
3032 }
3033 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3034 ", firstSequenceidInLog=" + firstSeqIdInLog +
3035 ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3036 status.markComplete(msg);
3037 LOG.debug(msg);
3038 return currentEditSeqId;
3039 } finally {
3040 status.cleanup();
3041 if (reader != null) {
3042 reader.close();
3043 }
3044 }
3045 }
3046
3047
3048
3049
3050
3051
3052
3053
3054 void completeCompactionMarker(CompactionDescriptor compaction)
3055 throws IOException {
3056 Store store = this.getStore(compaction.getFamilyName().toByteArray());
3057 if (store == null) {
3058 LOG.warn("Found Compaction WAL edit for deleted family:" +
3059 Bytes.toString(compaction.getFamilyName().toByteArray()));
3060 return;
3061 }
3062 store.completeCompactionMarker(compaction);
3063 }
3064
3065
3066
3067
3068
3069
3070
3071 protected boolean restoreEdit(final Store s, final KeyValue kv) {
3072 long kvSize = s.add(kv);
3073 if (this.rsAccounting != null) {
3074 rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3075 }
3076 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3077 }
3078
3079
3080
3081
3082
3083
3084
3085 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3086 throws IOException {
3087 FileStatus stat = fs.getFileStatus(p);
3088 if (stat.getLen() > 0) return false;
3089 LOG.warn("File " + p + " is zero-length, deleting.");
3090 fs.delete(p, false);
3091 return true;
3092 }
3093
3094 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3095 return new HStore(this, family, this.conf);
3096 }
3097
3098
3099
3100
3101
3102
3103
3104
3105 public Store getStore(final byte[] column) {
3106 return this.stores.get(column);
3107 }
3108
3109 public Map<byte[], Store> getStores() {
3110 return this.stores;
3111 }
3112
3113
3114
3115
3116
3117
3118
3119
3120 public List<String> getStoreFileList(final byte [][] columns)
3121 throws IllegalArgumentException {
3122 List<String> storeFileNames = new ArrayList<String>();
3123 synchronized(closeLock) {
3124 for(byte[] column : columns) {
3125 Store store = this.stores.get(column);
3126 if (store == null) {
3127 throw new IllegalArgumentException("No column family : " +
3128 new String(column) + " available");
3129 }
3130 for (StoreFile storeFile: store.getStorefiles()) {
3131 storeFileNames.add(storeFile.getPath().toString());
3132 }
3133 }
3134 }
3135 return storeFileNames;
3136 }
3137
3138
3139
3140
3141
3142 void checkRow(final byte [] row, String op) throws IOException {
3143 if (!rowIsInRange(getRegionInfo(), row)) {
3144 throw new WrongRegionException("Requested row out of range for " +
3145 op + " on HRegion " + this + ", startKey='" +
3146 Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3147 Bytes.toStringBinary(getEndKey()) + "', row='" +
3148 Bytes.toStringBinary(row) + "'");
3149 }
3150 }
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170 public Integer obtainRowLock(final byte [] row) throws IOException {
3171 startRegionOperation();
3172 this.writeRequestsCount.increment();
3173 try {
3174 return internalObtainRowLock(row, true);
3175 } finally {
3176 closeRegionOperation();
3177 }
3178 }
3179
3180
3181
3182
3183
3184
3185
3186 private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
3187 throws IOException {
3188 checkRow(row, "row lock");
3189 startRegionOperation();
3190 try {
3191 HashedBytes rowKey = new HashedBytes(row);
3192 CountDownLatch rowLatch = new CountDownLatch(1);
3193
3194
3195 while (true) {
3196 CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
3197 if (existingLatch == null) {
3198 break;
3199 } else {
3200
3201 if (!waitForLock) {
3202 return null;
3203 }
3204 try {
3205 if (!existingLatch.await(this.rowLockWaitDuration,
3206 TimeUnit.MILLISECONDS)) {
3207 throw new IOException("Timed out on getting lock for row="
3208 + Bytes.toStringBinary(row));
3209 }
3210 } catch (InterruptedException ie) {
3211 LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
3212 InterruptedIOException iie = new InterruptedIOException();
3213 iie.initCause(ie);
3214 throw iie;
3215 }
3216 }
3217 }
3218
3219
3220 while (true) {
3221 Integer lockId = lockIdGenerator.incrementAndGet();
3222 HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
3223 if (existingRowKey == null) {
3224 return lockId;
3225 } else {
3226
3227 lockIdGenerator.set(rand.nextInt());
3228 }
3229 }
3230 } finally {
3231 closeRegionOperation();
3232 }
3233 }
3234
3235
3236
3237
3238
3239 public void releaseRowLock(final Integer lockId) {
3240 if (lockId == null) return;
3241 HashedBytes rowKey = lockIds.remove(lockId);
3242 if (rowKey == null) {
3243 LOG.warn("Release unknown lockId: " + lockId);
3244 return;
3245 }
3246 CountDownLatch rowLatch = lockedRows.remove(rowKey);
3247 if (rowLatch == null) {
3248 LOG.error("Releases row not locked, lockId: " + lockId + " row: "
3249 + rowKey);
3250 return;
3251 }
3252 rowLatch.countDown();
3253 }
3254
3255
3256
3257
3258
3259
3260 boolean isRowLocked(final Integer lockId) {
3261 return lockIds.containsKey(lockId);
3262 }
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273 public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
3274 throws IOException {
3275 Integer lid = null;
3276 if (lockid == null) {
3277 lid = internalObtainRowLock(row, waitForLock);
3278 } else {
3279 if (!isRowLocked(lockid)) {
3280 throw new IOException("Invalid row lock");
3281 }
3282 lid = lockid;
3283 }
3284 return lid;
3285 }
3286
3287
3288
3289
3290
3291
3292
3293 private static boolean hasMultipleColumnFamilies(
3294 List<Pair<byte[], String>> familyPaths) {
3295 boolean multipleFamilies = false;
3296 byte[] family = null;
3297 for (Pair<byte[], String> pair : familyPaths) {
3298 byte[] fam = pair.getFirst();
3299 if (family == null) {
3300 family = fam;
3301 } else if (!Bytes.equals(family, fam)) {
3302 multipleFamilies = true;
3303 break;
3304 }
3305 }
3306 return multipleFamilies;
3307 }
3308
3309
3310 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3311 boolean assignSeqId) throws IOException {
3312 return bulkLoadHFiles(familyPaths, assignSeqId, null);
3313 }
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3327 BulkLoadListener bulkLoadListener) throws IOException {
3328 Preconditions.checkNotNull(familyPaths);
3329
3330 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3331 try {
3332 this.writeRequestsCount.increment();
3333
3334
3335
3336
3337 List<IOException> ioes = new ArrayList<IOException>();
3338 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3339 for (Pair<byte[], String> p : familyPaths) {
3340 byte[] familyName = p.getFirst();
3341 String path = p.getSecond();
3342
3343 Store store = getStore(familyName);
3344 if (store == null) {
3345 IOException ioe = new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
3346 "No such column family " + Bytes.toStringBinary(familyName));
3347 ioes.add(ioe);
3348 } else {
3349 try {
3350 store.assertBulkLoadHFileOk(new Path(path));
3351 } catch (WrongRegionException wre) {
3352
3353 failures.add(p);
3354 } catch (IOException ioe) {
3355
3356 ioes.add(ioe);
3357 }
3358 }
3359 }
3360
3361
3362 if (ioes.size() != 0) {
3363 IOException e = MultipleIOException.createIOException(ioes);
3364 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3365 throw e;
3366 }
3367
3368
3369 if (failures.size() != 0) {
3370 StringBuilder list = new StringBuilder();
3371 for (Pair<byte[], String> p : failures) {
3372 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3373 .append(p.getSecond());
3374 }
3375
3376 LOG.warn("There was a recoverable bulk load failure likely due to a" +
3377 " split. These (family, HFile) pairs were not loaded: " + list);
3378 return false;
3379 }
3380
3381 for (Pair<byte[], String> p : familyPaths) {
3382 byte[] familyName = p.getFirst();
3383 String path = p.getSecond();
3384 Store store = getStore(familyName);
3385 try {
3386 String finalPath = path;
3387 if(bulkLoadListener != null) {
3388 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3389 }
3390 store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
3391 if(bulkLoadListener != null) {
3392 bulkLoadListener.doneBulkLoad(familyName, path);
3393 }
3394 } catch (IOException ioe) {
3395
3396
3397
3398
3399 LOG.error("There was a partial failure due to IO when attempting to" +
3400 " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3401 if(bulkLoadListener != null) {
3402 try {
3403 bulkLoadListener.failedBulkLoad(familyName, path);
3404 } catch (Exception ex) {
3405 LOG.error("Error while calling failedBulkLoad for family "+
3406 Bytes.toString(familyName)+" with path "+path, ex);
3407 }
3408 }
3409 throw ioe;
3410 }
3411 }
3412 return true;
3413 } finally {
3414 closeBulkRegionOperation();
3415 }
3416 }
3417
3418 @Override
3419 public boolean equals(Object o) {
3420 if (!(o instanceof HRegion)) {
3421 return false;
3422 }
3423 return Bytes.equals(this.getRegionName(), ((HRegion) o).getRegionName());
3424 }
3425
3426 @Override
3427 public int hashCode() {
3428 return Bytes.hashCode(this.getRegionName());
3429 }
3430
3431 @Override
3432 public String toString() {
3433 return this.getRegionNameAsString();
3434 }
3435
3436
3437
3438
3439 class RegionScannerImpl implements RegionScanner {
3440
3441 KeyValueHeap storeHeap = null;
3442
3443
3444 KeyValueHeap joinedHeap = null;
3445
3446
3447
3448 private KeyValue joinedContinuationRow = null;
3449
3450 private final KeyValue KV_LIMIT = new KeyValue();
3451 private final byte [] stopRow;
3452 private Filter filter;
3453 private int batch;
3454 private int isScan;
3455 private boolean filterClosed = false;
3456 private long readPt;
3457 private long maxResultSize;
3458 private HRegion region;
3459
3460 public HRegionInfo getRegionInfo() {
3461 return region.getRegionInfo();
3462 }
3463
3464 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3465 throws IOException {
3466
3467 this.region = region;
3468 this.maxResultSize = scan.getMaxResultSize();
3469 if (scan.hasFilter()) {
3470 this.filter = new FilterWrapper(scan.getFilter());
3471 } else {
3472 this.filter = null;
3473 }
3474
3475 this.batch = scan.getBatch();
3476 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3477 this.stopRow = null;
3478 } else {
3479 this.stopRow = scan.getStopRow();
3480 }
3481
3482
3483 this.isScan = scan.isGetScan() ? -1 : 0;
3484
3485
3486
3487 IsolationLevel isolationLevel = scan.getIsolationLevel();
3488 synchronized(scannerReadPoints) {
3489 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
3490
3491 this.readPt = Long.MAX_VALUE;
3492 MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3493 } else {
3494 this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
3495 }
3496 scannerReadPoints.put(this, this.readPt);
3497 }
3498
3499
3500
3501 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3502 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3503 if (additionalScanners != null) {
3504 scanners.addAll(additionalScanners);
3505 }
3506
3507 for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3508 scan.getFamilyMap().entrySet()) {
3509 Store store = stores.get(entry.getKey());
3510 KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
3511 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3512 || this.filter.isFamilyEssential(entry.getKey())) {
3513 scanners.add(scanner);
3514 } else {
3515 joinedScanners.add(scanner);
3516 }
3517 }
3518 this.storeHeap = new KeyValueHeap(scanners, comparator);
3519 if (!joinedScanners.isEmpty()) {
3520 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
3521 }
3522 }
3523
3524 RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3525 this(scan, null, region);
3526 }
3527
3528 @Override
3529 public long getMaxResultSize() {
3530 return maxResultSize;
3531 }
3532
3533 @Override
3534 public long getMvccReadPoint() {
3535 return this.readPt;
3536 }
3537
3538
3539
3540
3541
3542
3543 protected void resetFilters() throws IOException {
3544 if (filter != null) {
3545 filter.reset();
3546 }
3547 }
3548
3549 @Override
3550 public boolean next(List<KeyValue> outResults)
3551 throws IOException {
3552
3553 return next(outResults, batch);
3554 }
3555
3556 @Override
3557 public synchronized boolean next(List<KeyValue> outResults, int limit) throws IOException {
3558 if (this.filterClosed) {
3559 throw new UnknownScannerException("Scanner was closed (timed out?) " +
3560 "after we renewed it. Could be caused by a very slow scanner " +
3561 "or a lengthy garbage collection");
3562 }
3563 startRegionOperation(Operation.SCAN);
3564 readRequestsCount.increment();
3565 try {
3566
3567
3568 MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3569
3570 return nextRaw(outResults, limit);
3571 } finally {
3572 closeRegionOperation();
3573 }
3574 }
3575
3576 @Override
3577 public boolean nextRaw(List<KeyValue> outResults)
3578 throws IOException {
3579 return nextRaw(outResults, batch);
3580 }
3581
3582 @Override
3583 public boolean nextRaw(List<KeyValue> outResults, int limit) throws IOException {
3584 boolean returnResult;
3585 if (outResults.isEmpty()) {
3586
3587
3588 returnResult = nextInternal(outResults, limit);
3589 } else {
3590 List<KeyValue> tmpList = new ArrayList<KeyValue>();
3591 returnResult = nextInternal(tmpList, limit);
3592 outResults.addAll(tmpList);
3593 }
3594 resetFilters();
3595 if (isFilterDone()) {
3596 return false;
3597 }
3598 if (region != null && region.metricsRegion != null) {
3599 long totalSize = 0;
3600 if (outResults != null) {
3601 for(KeyValue kv:outResults) {
3602 totalSize += kv.getLength();
3603 }
3604 }
3605 region.metricsRegion.updateScanNext(totalSize);
3606 }
3607 return returnResult;
3608 }
3609
3610
3611 private void populateFromJoinedHeap(List<KeyValue> results, int limit)
3612 throws IOException {
3613 assert joinedContinuationRow != null;
3614 KeyValue kv = populateResult(results, this.joinedHeap, limit,
3615 joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
3616 joinedContinuationRow.getRowLength());
3617 if (kv != KV_LIMIT) {
3618
3619 joinedContinuationRow = null;
3620 }
3621
3622
3623 Collections.sort(results, comparator);
3624 }
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636 private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
3637 byte[] currentRow, int offset, short length) throws IOException {
3638 KeyValue nextKv;
3639 do {
3640 heap.next(results, limit - results.size());
3641 if (limit > 0 && results.size() == limit) {
3642 return KV_LIMIT;
3643 }
3644 nextKv = heap.peek();
3645 } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
3646
3647 return nextKv;
3648 }
3649
3650
3651
3652
3653 public synchronized boolean isFilterDone() throws IOException {
3654 return this.filter != null && this.filter.filterAllRemaining();
3655 }
3656
3657 private boolean nextInternal(List<KeyValue> results, int limit)
3658 throws IOException {
3659 if (!results.isEmpty()) {
3660 throw new IllegalArgumentException("First parameter should be an empty list");
3661 }
3662 RpcCallContext rpcCall = RpcServer.getCurrentCall();
3663
3664
3665
3666
3667
3668 while (true) {
3669 if (rpcCall != null) {
3670
3671
3672
3673
3674 rpcCall.throwExceptionIfCallerDisconnected();
3675 }
3676
3677
3678 KeyValue current = this.storeHeap.peek();
3679
3680 byte[] currentRow = null;
3681 int offset = 0;
3682 short length = 0;
3683 if (current != null) {
3684 currentRow = current.getBuffer();
3685 offset = current.getRowOffset();
3686 length = current.getRowLength();
3687 }
3688 boolean stopRow = isStopRow(currentRow, offset, length);
3689
3690
3691 if (joinedContinuationRow == null) {
3692
3693 if (stopRow) {
3694 if (filter != null && filter.hasFilterRow()) {
3695 filter.filterRow(results);
3696 }
3697 return false;
3698 }
3699
3700
3701
3702 if (filterRowKey(currentRow, offset, length)) {
3703 boolean moreRows = nextRow(currentRow, offset, length);
3704 if (!moreRows) return false;
3705 results.clear();
3706 continue;
3707 }
3708
3709 KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
3710 length);
3711
3712 if (nextKv == KV_LIMIT) {
3713 if (this.filter != null && filter.hasFilterRow()) {
3714 throw new IncompatibleFilterException(
3715 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
3716 }
3717 return true;
3718 }
3719
3720 stopRow = nextKv == null ||
3721 isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
3722
3723 final boolean isEmptyRow = results.isEmpty();
3724
3725
3726
3727 if (filter != null && filter.hasFilterRow()) {
3728 filter.filterRow(results);
3729 }
3730 if (isEmptyRow) {
3731 boolean moreRows = nextRow(currentRow, offset, length);
3732 if (!moreRows) return false;
3733 results.clear();
3734
3735
3736 if (!stopRow) continue;
3737 return false;
3738 }
3739
3740
3741
3742
3743
3744 if (this.joinedHeap != null) {
3745 KeyValue nextJoinedKv = joinedHeap.peek();
3746
3747 boolean mayHaveData =
3748 (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
3749 || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
3750 true, true)
3751 && joinedHeap.peek() != null
3752 && joinedHeap.peek().matchingRow(currentRow, offset, length));
3753 if (mayHaveData) {
3754 joinedContinuationRow = current;
3755 populateFromJoinedHeap(results, limit);
3756 }
3757 }
3758 } else {
3759
3760 populateFromJoinedHeap(results, limit);
3761 }
3762
3763
3764
3765 if (joinedContinuationRow != null) {
3766 return true;
3767 }
3768
3769
3770
3771
3772 if (results.isEmpty()) {
3773 boolean moreRows = nextRow(currentRow, offset, length);
3774 if (!moreRows) return false;
3775 if (!stopRow) continue;
3776 }
3777
3778
3779 return !stopRow;
3780 }
3781 }
3782
3783 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
3784 return filter != null
3785 && filter.filterRowKey(row, offset, length);
3786 }
3787
3788 protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
3789 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
3790 KeyValue next;
3791 while ((next = this.storeHeap.peek()) != null &&
3792 next.matchingRow(currentRow, offset, length)) {
3793 this.storeHeap.next(MOCKED_LIST);
3794 }
3795 resetFilters();
3796
3797 if (this.region.getCoprocessorHost() != null) {
3798 return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow);
3799 }
3800 return true;
3801 }
3802
3803 private boolean isStopRow(byte [] currentRow, int offset, short length) {
3804 return currentRow == null ||
3805 (stopRow != null &&
3806 comparator.compareRows(stopRow, 0, stopRow.length,
3807 currentRow, offset, length) <= isScan);
3808 }
3809
3810 @Override
3811 public synchronized void close() {
3812 if (storeHeap != null) {
3813 storeHeap.close();
3814 storeHeap = null;
3815 }
3816 if (joinedHeap != null) {
3817 joinedHeap.close();
3818 joinedHeap = null;
3819 }
3820
3821 scannerReadPoints.remove(this);
3822 this.filterClosed = true;
3823 }
3824
3825 KeyValueHeap getStoreHeapForTesting() {
3826 return storeHeap;
3827 }
3828
3829 @Override
3830 public synchronized boolean reseek(byte[] row) throws IOException {
3831 if (row == null) {
3832 throw new IllegalArgumentException("Row cannot be null.");
3833 }
3834 boolean result = false;
3835 startRegionOperation();
3836 try {
3837
3838 MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
3839 KeyValue kv = KeyValue.createFirstOnRow(row);
3840
3841 result = this.storeHeap.requestSeek(kv, true, true);
3842 if (this.joinedHeap != null) {
3843 result = this.joinedHeap.requestSeek(kv, true, true) || result;
3844 }
3845 } finally {
3846 closeRegionOperation();
3847 }
3848 return result;
3849 }
3850 }
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873 static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
3874 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
3875 RegionServerServices rsServices) {
3876 try {
3877 @SuppressWarnings("unchecked")
3878 Class<? extends HRegion> regionClass =
3879 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
3880
3881 Constructor<? extends HRegion> c =
3882 regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
3883 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
3884 RegionServerServices.class);
3885
3886 return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
3887 } catch (Throwable e) {
3888
3889 throw new IllegalStateException("Could not instantiate a region instance.", e);
3890 }
3891 }
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3912 final Configuration conf, final HTableDescriptor hTableDescriptor)
3913 throws IOException {
3914 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
3915 }
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928 public static void closeHRegion(final HRegion r) throws IOException {
3929 if (r == null) return;
3930 r.close();
3931 if (r.getLog() == null) return;
3932 r.getLog().closeAndDelete();
3933 }
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
3947
3948
3949
3950 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3951 final Configuration conf,
3952 final HTableDescriptor hTableDescriptor,
3953 final HLog hlog,
3954 final boolean initialize)
3955 throws IOException {
3956 return createHRegion(info, rootDir, conf, hTableDescriptor,
3957 hlog, initialize, false);
3958 }
3959
3960
3961
3962
3963
3964
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3977 final Configuration conf,
3978 final HTableDescriptor hTableDescriptor,
3979 final HLog hlog,
3980 final boolean initialize, final boolean ignoreHLog)
3981 throws IOException {
3982 LOG.info("creating HRegion " + info.getTableNameAsString()
3983 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
3984 " Table name == " + info.getTableNameAsString());
3985
3986 Path tableDir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
3987 FileSystem fs = FileSystem.get(conf);
3988 HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
3989 HLog effectiveHLog = hlog;
3990 if (hlog == null && !ignoreHLog) {
3991 effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
3992 HConstants.HREGION_LOGDIR_NAME, conf);
3993 }
3994 HRegion region = HRegion.newHRegion(tableDir,
3995 effectiveHLog, fs, conf, info, hTableDescriptor, null);
3996 if (initialize) {
3997 region.initialize();
3998 }
3999 return region;
4000 }
4001
4002 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4003 final Configuration conf,
4004 final HTableDescriptor hTableDescriptor,
4005 final HLog hlog)
4006 throws IOException {
4007 return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4008 }
4009
4010
4011
4012
4013
4014
4015
4016
4017
4018
4019
4020
4021
4022
4023 public static HRegion openHRegion(final HRegionInfo info,
4024 final HTableDescriptor htd, final HLog wal,
4025 final Configuration conf)
4026 throws IOException {
4027 return openHRegion(info, htd, wal, conf, null, null);
4028 }
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045 public static HRegion openHRegion(final HRegionInfo info,
4046 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4047 final RegionServerServices rsServices,
4048 final CancelableProgressable reporter)
4049 throws IOException {
4050 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4051 }
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4067 final HTableDescriptor htd, final HLog wal, final Configuration conf)
4068 throws IOException {
4069 return openHRegion(rootDir, info, htd, wal, conf, null, null);
4070 }
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4088 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4089 final RegionServerServices rsServices,
4090 final CancelableProgressable reporter)
4091 throws IOException {
4092 FileSystem fs = null;
4093 if (rsServices != null) {
4094 fs = rsServices.getFileSystem();
4095 }
4096 if (fs == null) {
4097 fs = FileSystem.get(conf);
4098 }
4099 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4100 }
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4117 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4118 throws IOException {
4119 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4120 }
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4139 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4140 final RegionServerServices rsServices, final CancelableProgressable reporter)
4141 throws IOException {
4142 if (info == null) throw new NullPointerException("Passed region info is null");
4143 LOG.info("HRegion.openHRegion Region name ==" + info.getRegionNameAsString());
4144 if (LOG.isDebugEnabled()) {
4145 LOG.debug("Opening region: " + info);
4146 }
4147 Path dir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
4148 HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
4149 return r.openHRegion(reporter);
4150 }
4151
4152
4153
4154
4155
4156
4157
4158
4159 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4160 throws IOException {
4161 HRegionFileSystem regionFs = other.getRegionFileSystem();
4162 HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4163 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4164 return r.openHRegion(reporter);
4165 }
4166
4167
4168
4169
4170
4171
4172
4173
4174 protected HRegion openHRegion(final CancelableProgressable reporter)
4175 throws IOException {
4176 checkCompressionCodecs();
4177
4178 this.openSeqNum = initialize(reporter);
4179 if (this.log != null) {
4180 this.log.setSequenceNumber(this.openSeqNum);
4181 }
4182
4183 return this;
4184 }
4185
4186 private void checkCompressionCodecs() throws IOException {
4187 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4188 CompressionTest.testCompression(fam.getCompression());
4189 CompressionTest.testCompression(fam.getCompactionCompression());
4190 }
4191 }
4192
4193
4194
4195
4196
4197
4198 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4199 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4200 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4201 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4202 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4203 fs.commitDaughterRegion(hri);
4204 return r;
4205 }
4206
4207
4208
4209
4210
4211
4212
4213
4214 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4215 final HRegion region_b) throws IOException {
4216 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4217 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4218 this.getTableDesc(), this.rsServices);
4219 r.readRequestsCount.set(this.getReadRequestsCount()
4220 + region_b.getReadRequestsCount());
4221 r.writeRequestsCount.set(this.getWriteRequestsCount()
4222 + region_b.getWriteRequestsCount());
4223 this.fs.commitMergedRegion(mergedRegionInfo);
4224 return r;
4225 }
4226
4227
4228
4229
4230
4231
4232
4233
4234
4235
4236
4237
4238 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4239 meta.checkResources();
4240
4241 byte[] row = r.getRegionName();
4242 final long now = EnvironmentEdgeManager.currentTimeMillis();
4243 final List<KeyValue> cells = new ArrayList<KeyValue>(2);
4244 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4245 HConstants.REGIONINFO_QUALIFIER, now,
4246 r.getRegionInfo().toByteArray()));
4247
4248 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4249 HConstants.META_VERSION_QUALIFIER, now,
4250 Bytes.toBytes(HConstants.META_VERSION)));
4251 meta.put(row, HConstants.CATALOG_FAMILY, cells);
4252 }
4253
4254
4255
4256
4257
4258
4259
4260
4261 @Deprecated
4262 public static Path getRegionDir(final Path tabledir, final String name) {
4263 return new Path(tabledir, name);
4264 }
4265
4266
4267
4268
4269
4270
4271
4272
4273 @Deprecated
4274 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4275 return new Path(
4276 HTableDescriptor.getTableDir(rootdir, info.getTableName()),
4277 info.getEncodedName());
4278 }
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4289 return ((info.getStartKey().length == 0) ||
4290 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4291 ((info.getEndKey().length == 0) ||
4292 (Bytes.compareTo(info.getEndKey(), row) > 0));
4293 }
4294
4295
4296
4297
4298
4299
4300
4301
4302
4303 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4304 throws IOException {
4305 HRegion a = srcA;
4306 HRegion b = srcB;
4307
4308
4309
4310 if (srcA.getStartKey() == null) {
4311 if (srcB.getStartKey() == null) {
4312 throw new IOException("Cannot merge two regions with null start key");
4313 }
4314
4315 } else if ((srcB.getStartKey() == null) ||
4316 (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4317 a = srcB;
4318 b = srcA;
4319 }
4320
4321 if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4322 throw new IOException("Cannot merge non-adjacent regions");
4323 }
4324 return merge(a, b);
4325 }
4326
4327
4328
4329
4330
4331
4332
4333
4334
4335 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4336 if (!a.getRegionInfo().getTableNameAsString().equals(
4337 b.getRegionInfo().getTableNameAsString())) {
4338 throw new IOException("Regions do not belong to the same table");
4339 }
4340
4341 FileSystem fs = a.getRegionFileSystem().getFileSystem();
4342
4343 a.flushcache();
4344 b.flushcache();
4345
4346
4347 a.compactStores(true);
4348 if (LOG.isDebugEnabled()) {
4349 LOG.debug("Files for region: " + a);
4350 a.getRegionFileSystem().logFileSystemState(LOG);
4351 }
4352 b.compactStores(true);
4353 if (LOG.isDebugEnabled()) {
4354 LOG.debug("Files for region: " + b);
4355 b.getRegionFileSystem().logFileSystemState(LOG);
4356 }
4357
4358 RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4359 if (!rmt.prepare(null)) {
4360 throw new IOException("Unable to merge regions " + a + " and " + b);
4361 }
4362 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4363 LOG.info("starting merge of regions: " + a + " and " + b
4364 + " into new region " + mergedRegionInfo.getRegionNameAsString()
4365 + " with start key <"
4366 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4367 + "> and end key <"
4368 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4369 HRegion dstRegion = null;
4370 try {
4371 dstRegion = rmt.execute(null, null);
4372 } catch (IOException ioe) {
4373 rmt.rollback(null, null);
4374 throw new IOException("Failed merging region " + a + " and " + b
4375 + ", and succssfully rolled back");
4376 }
4377 dstRegion.compactStores(true);
4378
4379 if (LOG.isDebugEnabled()) {
4380 LOG.debug("Files for new region");
4381 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4382 }
4383
4384 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4385 throw new IOException("Merged region " + dstRegion
4386 + " still has references after the compaction, is compaction canceled?");
4387 }
4388
4389
4390 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4391
4392 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4393
4394 LOG.info("merge completed. New region is " + dstRegion);
4395 return dstRegion;
4396 }
4397
4398
4399
4400
4401
4402 boolean isMajorCompaction() throws IOException {
4403 for (Store store : this.stores.values()) {
4404 if (store.isMajorCompaction()) {
4405 return true;
4406 }
4407 }
4408 return false;
4409 }
4410
4411
4412
4413
4414
4415
4416
4417
4418
4419 public Result get(final Get get) throws IOException {
4420 checkRow(get.getRow(), "Get");
4421
4422 if (get.hasFamilies()) {
4423 for (byte [] family: get.familySet()) {
4424 checkFamily(family);
4425 }
4426 } else {
4427 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4428 get.addFamily(family);
4429 }
4430 }
4431 List<KeyValue> results = get(get, true);
4432 return new Result(results);
4433 }
4434
4435
4436
4437
4438
4439
4440 private List<KeyValue> get(Get get, boolean withCoprocessor)
4441 throws IOException {
4442
4443 List<KeyValue> results = new ArrayList<KeyValue>();
4444
4445
4446 if (withCoprocessor && (coprocessorHost != null)) {
4447 if (coprocessorHost.preGet(get, results)) {
4448 return results;
4449 }
4450 }
4451
4452 Scan scan = new Scan(get);
4453
4454 RegionScanner scanner = null;
4455 try {
4456 scanner = getScanner(scan);
4457 scanner.next(results);
4458 } finally {
4459 if (scanner != null)
4460 scanner.close();
4461 }
4462
4463
4464 if (withCoprocessor && (coprocessorHost != null)) {
4465 coprocessorHost.postGet(get, results);
4466 }
4467
4468
4469 if (this.metricsRegion != null) {
4470 long totalSize = 0l;
4471 if (results != null) {
4472 for (KeyValue kv:results) {
4473 totalSize += kv.getLength();
4474 }
4475 }
4476 this.metricsRegion.updateGet(totalSize);
4477 }
4478
4479 return results;
4480 }
4481
4482 public void mutateRow(RowMutations rm) throws IOException {
4483 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4484 }
4485
4486
4487
4488
4489
4490
4491
4492
4493
4494
4495
4496 public void mutateRowsWithLocks(Collection<Mutation> mutations,
4497 Collection<byte[]> rowsToLock) throws IOException {
4498
4499 MultiRowMutationProcessor proc =
4500 new MultiRowMutationProcessor(mutations, rowsToLock);
4501 processRowsWithLocks(proc, -1);
4502 }
4503
4504
4505
4506
4507
4508
4509 public void processRowsWithLocks(RowProcessor<?,?> processor)
4510 throws IOException {
4511 processRowsWithLocks(processor, rowProcessorTimeout);
4512 }
4513
4514
4515
4516
4517
4518
4519
4520
4521 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
4522 throws IOException {
4523
4524 for (byte[] row : processor.getRowsToLock()) {
4525 checkRow(row, "processRowsWithLocks");
4526 }
4527 if (!processor.readOnly()) {
4528 checkReadOnly();
4529 }
4530 checkResources();
4531
4532 startRegionOperation();
4533 WALEdit walEdit = new WALEdit();
4534
4535
4536 processor.preProcess(this, walEdit);
4537
4538
4539 if (processor.readOnly()) {
4540 try {
4541 long now = EnvironmentEdgeManager.currentTimeMillis();
4542 doProcessRowWithTimeout(
4543 processor, now, this, null, null, timeout);
4544 processor.postProcess(this, walEdit);
4545 } catch (IOException e) {
4546 throw e;
4547 } finally {
4548 closeRegionOperation();
4549 }
4550 return;
4551 }
4552
4553 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4554 boolean locked = false;
4555 boolean walSyncSuccessful = false;
4556 List<Integer> acquiredLocks = null;
4557 long addedSize = 0;
4558 List<KeyValue> mutations = new ArrayList<KeyValue>();
4559 Collection<byte[]> rowsToLock = processor.getRowsToLock();
4560 try {
4561
4562 acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
4563 for (byte[] row : rowsToLock) {
4564
4565 Integer lid = getLock(null, row, true);
4566 if (lid == null) {
4567 throw new IOException("Failed to acquire lock on "
4568 + Bytes.toStringBinary(row));
4569 }
4570 acquiredLocks.add(lid);
4571 }
4572
4573 lock(this.updatesLock.readLock(), acquiredLocks.size());
4574 locked = true;
4575
4576 long now = EnvironmentEdgeManager.currentTimeMillis();
4577 try {
4578
4579
4580 doProcessRowWithTimeout(
4581 processor, now, this, mutations, walEdit, timeout);
4582
4583 if (!mutations.isEmpty()) {
4584
4585 writeEntry = mvcc.beginMemstoreInsert();
4586
4587 for (KeyValue kv : mutations) {
4588 kv.setMemstoreTS(writeEntry.getWriteNumber());
4589 byte[] family = kv.getFamily();
4590 checkFamily(family);
4591 addedSize += stores.get(family).add(kv);
4592 }
4593
4594 long txid = 0;
4595
4596 if (!walEdit.isEmpty()) {
4597 txid = this.log.appendNoSync(this.getRegionInfo(),
4598 this.htableDescriptor.getName(), walEdit,
4599 processor.getClusterId(), now, this.htableDescriptor);
4600 }
4601
4602 if (locked) {
4603 this.updatesLock.readLock().unlock();
4604 locked = false;
4605 }
4606
4607
4608 if (acquiredLocks != null) {
4609 for (Integer lid : acquiredLocks) {
4610 releaseRowLock(lid);
4611 }
4612 acquiredLocks = null;
4613 }
4614
4615 if (txid != 0) {
4616 syncOrDefer(txid, processor.useDurability());
4617 }
4618 walSyncSuccessful = true;
4619 }
4620 } finally {
4621 if (!mutations.isEmpty() && !walSyncSuccessful) {
4622 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
4623 " memstore keyvalues for row(s):" +
4624 processor.getRowsToLock().iterator().next() + "...");
4625 for (KeyValue kv : mutations) {
4626 stores.get(kv.getFamily()).rollback(kv);
4627 }
4628 }
4629
4630 if (writeEntry != null) {
4631 mvcc.completeMemstoreInsert(writeEntry);
4632 writeEntry = null;
4633 }
4634 if (locked) {
4635 this.updatesLock.readLock().unlock();
4636 locked = false;
4637 }
4638 if (acquiredLocks != null) {
4639 for (Integer lid : acquiredLocks) {
4640 releaseRowLock(lid);
4641 }
4642 }
4643
4644 }
4645
4646
4647 processor.postProcess(this, walEdit);
4648
4649 } catch (IOException e) {
4650 throw e;
4651 } finally {
4652 closeRegionOperation();
4653 if (!mutations.isEmpty() &&
4654 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
4655 requestFlush();
4656 }
4657 }
4658 }
4659
4660 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
4661 final long now,
4662 final HRegion region,
4663 final List<KeyValue> mutations,
4664 final WALEdit walEdit,
4665 final long timeout) throws IOException {
4666
4667 if (timeout < 0) {
4668 try {
4669 processor.process(now, region, mutations, walEdit);
4670 } catch (IOException e) {
4671 LOG.warn("RowProcessor:" + processor.getClass().getName() +
4672 " throws Exception on row(s):" +
4673 Bytes.toStringBinary(
4674 processor.getRowsToLock().iterator().next()) + "...", e);
4675 throw e;
4676 }
4677 return;
4678 }
4679
4680
4681 FutureTask<Void> task =
4682 new FutureTask<Void>(new Callable<Void>() {
4683 @Override
4684 public Void call() throws IOException {
4685 try {
4686 processor.process(now, region, mutations, walEdit);
4687 return null;
4688 } catch (IOException e) {
4689 LOG.warn("RowProcessor:" + processor.getClass().getName() +
4690 " throws Exception on row(s):" +
4691 Bytes.toStringBinary(
4692 processor.getRowsToLock().iterator().next()) + "...", e);
4693 throw e;
4694 }
4695 }
4696 });
4697 rowProcessorExecutor.execute(task);
4698 try {
4699 task.get(timeout, TimeUnit.MILLISECONDS);
4700 } catch (TimeoutException te) {
4701 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
4702 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
4703 "...");
4704 throw new IOException(te);
4705 } catch (Exception e) {
4706 throw new IOException(e);
4707 }
4708 }
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719 public Result append(Append append)
4720 throws IOException {
4721 byte[] row = append.getRow();
4722 checkRow(row, "append");
4723 boolean flush = false;
4724 boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL;
4725 WALEdit walEdits = null;
4726 List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
4727 Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
4728
4729 long size = 0;
4730 long txid = 0;
4731
4732 checkReadOnly();
4733
4734 startRegionOperation(Operation.APPEND);
4735 this.writeRequestsCount.increment();
4736 WriteEntry w = null;
4737 try {
4738 Integer lid = getLock(null, row, true);
4739 lock(this.updatesLock.readLock());
4740
4741
4742 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
4743
4744 w = mvcc.beginMemstoreInsert();
4745 try {
4746 long now = EnvironmentEdgeManager.currentTimeMillis();
4747
4748 for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
4749
4750 Store store = stores.get(family.getKey());
4751 List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
4752
4753
4754 Get get = new Get(row);
4755 for (Cell cell : family.getValue()) {
4756 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4757 get.addColumn(family.getKey(), kv.getQualifier());
4758 }
4759 List<KeyValue> results = get(get, false);
4760
4761
4762
4763
4764
4765
4766
4767 int idx = 0;
4768 for (Cell cell : family.getValue()) {
4769 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4770 KeyValue newKV;
4771 if (idx < results.size()
4772 && results.get(idx).matchingQualifier(kv.getBuffer(),
4773 kv.getQualifierOffset(), kv.getQualifierLength())) {
4774 KeyValue oldKv = results.get(idx);
4775
4776 newKV = new KeyValue(row.length, kv.getFamilyLength(),
4777 kv.getQualifierLength(), now, KeyValue.Type.Put,
4778 oldKv.getValueLength() + kv.getValueLength());
4779
4780 System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
4781 newKV.getBuffer(), newKV.getValueOffset(),
4782 oldKv.getValueLength());
4783 System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
4784 newKV.getBuffer(),
4785 newKV.getValueOffset() + oldKv.getValueLength(),
4786 kv.getValueLength());
4787 idx++;
4788 } else {
4789
4790 newKV = new KeyValue(row.length, kv.getFamilyLength(),
4791 kv.getQualifierLength(), now, KeyValue.Type.Put,
4792 kv.getValueLength());
4793
4794 System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
4795 newKV.getBuffer(), newKV.getValueOffset(),
4796 kv.getValueLength());
4797 }
4798
4799 System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
4800 newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
4801 System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
4802 newKV.getBuffer(), newKV.getFamilyOffset(),
4803 kv.getFamilyLength());
4804 System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
4805 newKV.getBuffer(), newKV.getQualifierOffset(),
4806 kv.getQualifierLength());
4807
4808 newKV.setMemstoreTS(w.getWriteNumber());
4809 kvs.add(newKV);
4810
4811
4812 if (writeToWAL) {
4813 if (walEdits == null) {
4814 walEdits = new WALEdit();
4815 }
4816 walEdits.add(newKV);
4817 }
4818 }
4819
4820
4821 tempMemstore.put(store, kvs);
4822 }
4823
4824
4825 if (writeToWAL) {
4826
4827
4828
4829 txid = this.log.appendNoSync(this.getRegionInfo(),
4830 this.htableDescriptor.getName(), walEdits,
4831 HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
4832 this.htableDescriptor);
4833 }
4834
4835
4836 for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
4837 Store store = entry.getKey();
4838 if (store.getFamily().getMaxVersions() == 1) {
4839
4840 size += store.upsert(entry.getValue(), getSmallestReadPoint());
4841 } else {
4842
4843 for (Cell cell: entry.getValue()) {
4844 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4845 size += store.add(kv);
4846 }
4847 }
4848 allKVs.addAll(entry.getValue());
4849 }
4850 size = this.addAndGetGlobalMemstoreSize(size);
4851 flush = isFlushSize(size);
4852 } finally {
4853 this.updatesLock.readLock().unlock();
4854 releaseRowLock(lid);
4855 }
4856 if (writeToWAL) {
4857
4858 syncOrDefer(txid, append.getDurability());
4859 }
4860 } finally {
4861 if (w != null) {
4862 mvcc.completeMemstoreInsert(w);
4863 }
4864 closeRegionOperation();
4865 }
4866
4867 if (this.metricsRegion != null) {
4868 this.metricsRegion.updateAppend();
4869 }
4870
4871 if (flush) {
4872
4873 requestFlush();
4874 }
4875
4876
4877 return append.isReturnResults() ? new Result(allKVs) : null;
4878 }
4879
4880
4881
4882
4883
4884
4885
4886 public Result increment(Increment increment)
4887 throws IOException {
4888 byte [] row = increment.getRow();
4889 checkRow(row, "increment");
4890 TimeRange tr = increment.getTimeRange();
4891 boolean flush = false;
4892 boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL;
4893 WALEdit walEdits = null;
4894 List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.size());
4895 Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
4896
4897 long size = 0;
4898 long txid = 0;
4899
4900 checkReadOnly();
4901
4902 startRegionOperation(Operation.INCREMENT);
4903 this.writeRequestsCount.increment();
4904 WriteEntry w = null;
4905 try {
4906 Integer lid = getLock(null, row, true);
4907 lock(this.updatesLock.readLock());
4908
4909
4910 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
4911
4912 w = mvcc.beginMemstoreInsert();
4913 try {
4914 long now = EnvironmentEdgeManager.currentTimeMillis();
4915
4916 for (Map.Entry<byte [], List<? extends Cell>> family:
4917 increment.getFamilyMap().entrySet()) {
4918
4919 Store store = stores.get(family.getKey());
4920 List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
4921
4922
4923 Get get = new Get(row);
4924 for (Cell cell: family.getValue()) {
4925 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4926 get.addColumn(family.getKey(), kv.getQualifier());
4927 }
4928 get.setTimeRange(tr.getMin(), tr.getMax());
4929 List<KeyValue> results = get(get, false);
4930
4931
4932
4933 int idx = 0;
4934 for (Cell cell: family.getValue()) {
4935 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4936 long amount = Bytes.toLong(kv.getValue());
4937 byte [] qualifier = kv.getQualifier();
4938 if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
4939 kv = results.get(idx);
4940 if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
4941 amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
4942 } else {
4943
4944 throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
4945 "Attempted to increment field that isn't 64 bits wide");
4946 }
4947 idx++;
4948 }
4949
4950
4951 KeyValue newKV =
4952 new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
4953 newKV.setMemstoreTS(w.getWriteNumber());
4954 kvs.add(newKV);
4955
4956
4957 if (writeToWAL) {
4958 if (walEdits == null) {
4959 walEdits = new WALEdit();
4960 }
4961 walEdits.add(newKV);
4962 }
4963 }
4964
4965
4966 tempMemstore.put(store, kvs);
4967 }
4968
4969
4970 if (writeToWAL) {
4971
4972
4973
4974 txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
4975 walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
4976 this.htableDescriptor);
4977 }
4978
4979
4980 for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
4981 Store store = entry.getKey();
4982 if (store.getFamily().getMaxVersions() == 1) {
4983
4984 size += store.upsert(entry.getValue(), getSmallestReadPoint());
4985 } else {
4986
4987 for (Cell cell : entry.getValue()) {
4988 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4989 size += store.add(kv);
4990 }
4991 }
4992 allKVs.addAll(entry.getValue());
4993 }
4994 size = this.addAndGetGlobalMemstoreSize(size);
4995 flush = isFlushSize(size);
4996 } finally {
4997 this.updatesLock.readLock().unlock();
4998 releaseRowLock(lid);
4999 }
5000 if (writeToWAL) {
5001
5002 syncOrDefer(txid, increment.getDurability());
5003 }
5004 } finally {
5005 if (w != null) {
5006 mvcc.completeMemstoreInsert(w);
5007 }
5008 closeRegionOperation();
5009 if (this.metricsRegion != null) {
5010 this.metricsRegion.updateIncrement();
5011 }
5012 }
5013
5014 if (flush) {
5015
5016 requestFlush();
5017 }
5018
5019 return new Result(allKVs);
5020 }
5021
5022
5023
5024
5025
5026 private void checkFamily(final byte [] family)
5027 throws NoSuchColumnFamilyException {
5028 if (!this.htableDescriptor.hasFamily(family)) {
5029 throw new NoSuchColumnFamilyException("Column family " +
5030 Bytes.toString(family) + " does not exist in region " + this
5031 + " in table " + this.htableDescriptor);
5032 }
5033 }
5034
5035 public static final long FIXED_OVERHEAD = ClassSize.align(
5036 ClassSize.OBJECT +
5037 ClassSize.ARRAY +
5038 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5039 (12 * Bytes.SIZEOF_LONG) +
5040 Bytes.SIZEOF_BOOLEAN);
5041
5042 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5043 ClassSize.OBJECT +
5044 (2 * ClassSize.ATOMIC_BOOLEAN) +
5045 (3 * ClassSize.ATOMIC_LONG) +
5046 ClassSize.ATOMIC_INTEGER +
5047 (3 * ClassSize.CONCURRENT_HASHMAP) +
5048 WriteState.HEAP_SIZE +
5049 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
5050 (2 * ClassSize.REENTRANT_LOCK) +
5051 ClassSize.ARRAYLIST +
5052 MultiVersionConsistencyControl.FIXED_SIZE
5053 ;
5054
5055 @Override
5056 public long heapSize() {
5057 long heapSize = DEEP_OVERHEAD;
5058 for (Store store : this.stores.values()) {
5059 heapSize += store.heapSize();
5060 }
5061
5062 return heapSize;
5063 }
5064
5065
5066
5067
5068
5069 private static void printUsageAndExit(final String message) {
5070 if (message != null && message.length() > 0) System.out.println(message);
5071 System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5072 System.out.println("Options:");
5073 System.out.println(" major_compact Pass this option to major compact " +
5074 "passed region.");
5075 System.out.println("Default outputs scan of passed region.");
5076 System.exit(1);
5077 }
5078
5079
5080
5081
5082
5083
5084
5085
5086
5087
5088
5089
5090
5091
5092
5093
5094
5095 public boolean registerService(Service instance) {
5096
5097
5098
5099 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5100 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5101 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5102 " already registered, rejecting request from "+instance
5103 );
5104 return false;
5105 }
5106
5107 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5108 if (LOG.isDebugEnabled()) {
5109 LOG.debug("Registered coprocessor service: region="+
5110 Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5111 }
5112 return true;
5113 }
5114
5115
5116
5117
5118
5119
5120
5121
5122
5123
5124
5125
5126
5127
5128
5129 public Message execService(RpcController controller, CoprocessorServiceCall call)
5130 throws IOException {
5131 String serviceName = call.getServiceName();
5132 String methodName = call.getMethodName();
5133 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5134 throw new UnknownProtocolException(null,
5135 "No registered coprocessor service found for name "+serviceName+
5136 " in region "+Bytes.toStringBinary(getRegionName()));
5137 }
5138
5139 Service service = coprocessorServiceHandlers.get(serviceName);
5140 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5141 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5142 if (methodDesc == null) {
5143 throw new UnknownProtocolException(service.getClass(),
5144 "Unknown method "+methodName+" called on service "+serviceName+
5145 " in region "+Bytes.toStringBinary(getRegionName()));
5146 }
5147
5148 Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5149 .mergeFrom(call.getRequest()).build();
5150 final Message.Builder responseBuilder =
5151 service.getResponsePrototype(methodDesc).newBuilderForType();
5152 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5153 @Override
5154 public void run(Message message) {
5155 if (message != null) {
5156 responseBuilder.mergeFrom(message);
5157 }
5158 }
5159 });
5160
5161 return responseBuilder.build();
5162 }
5163
5164
5165
5166
5167
5168
5169
5170
5171
5172
5173
5174 private static void processTable(final FileSystem fs, final Path p,
5175 final HLog log, final Configuration c,
5176 final boolean majorCompact)
5177 throws IOException {
5178 HRegion region = null;
5179 String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
5180
5181 if (p.getName().startsWith(metaStr)) {
5182 region = HRegion.newHRegion(p, log, fs, c,
5183 HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5184 } else {
5185 throw new IOException("Not a known catalog table: " + p.toString());
5186 }
5187 try {
5188 region.initialize();
5189 if (majorCompact) {
5190 region.compactStores(true);
5191 } else {
5192
5193 Scan scan = new Scan();
5194
5195 RegionScanner scanner = region.getScanner(scan);
5196 try {
5197 List<KeyValue> kvs = new ArrayList<KeyValue>();
5198 boolean done = false;
5199 do {
5200 kvs.clear();
5201 done = scanner.next(kvs);
5202 if (kvs.size() > 0) LOG.info(kvs);
5203 } while (done);
5204 } finally {
5205 scanner.close();
5206 }
5207 }
5208 } finally {
5209 region.close();
5210 }
5211 }
5212
5213 boolean shouldForceSplit() {
5214 return this.splitRequest;
5215 }
5216
5217 byte[] getExplicitSplitPoint() {
5218 return this.explicitSplitPoint;
5219 }
5220
5221 void forceSplit(byte[] sp) {
5222
5223
5224 this.splitRequest = true;
5225 if (sp != null) {
5226 this.explicitSplitPoint = sp;
5227 }
5228 }
5229
5230 void clearSplit_TESTS_ONLY() {
5231 this.splitRequest = false;
5232 }
5233
5234
5235
5236
5237 protected void prepareToSplit() {
5238
5239 }
5240
5241
5242
5243
5244
5245
5246
5247 public byte[] checkSplit() {
5248
5249 if (this.getRegionInfo().isMetaTable()) {
5250 if (shouldForceSplit()) {
5251 LOG.warn("Cannot split meta region in HBase 0.20 and above");
5252 }
5253 return null;
5254 }
5255
5256 if (!splitPolicy.shouldSplit()) {
5257 return null;
5258 }
5259
5260 byte[] ret = splitPolicy.getSplitPoint();
5261
5262 if (ret != null) {
5263 try {
5264 checkRow(ret, "calculated split");
5265 } catch (IOException e) {
5266 LOG.error("Ignoring invalid split", e);
5267 return null;
5268 }
5269 }
5270 return ret;
5271 }
5272
5273
5274
5275
5276 public int getCompactPriority() {
5277 int count = Integer.MAX_VALUE;
5278 for (Store store : stores.values()) {
5279 count = Math.min(count, store.getCompactPriority());
5280 }
5281 return count;
5282 }
5283
5284
5285
5286
5287
5288
5289 public boolean needsCompaction() {
5290 for (Store store : stores.values()) {
5291 if(store.needsCompaction()) {
5292 return true;
5293 }
5294 }
5295 return false;
5296 }
5297
5298
5299 public RegionCoprocessorHost getCoprocessorHost() {
5300 return coprocessorHost;
5301 }
5302
5303
5304 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5305 this.coprocessorHost = coprocessorHost;
5306 }
5307
5308
5309
5310
5311
5312
5313
5314
5315
5316
5317 public void startRegionOperation()
5318 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5319 startRegionOperation(Operation.ANY);
5320 }
5321
5322
5323
5324
5325
5326
5327
5328 protected void startRegionOperation(Operation op) throws NotServingRegionException,
5329 RegionTooBusyException, InterruptedIOException {
5330 switch (op) {
5331 case INCREMENT:
5332 case APPEND:
5333 case GET:
5334 case SCAN:
5335 case SPLIT_REGION:
5336 case MERGE_REGION:
5337
5338 if (this.isRecovering()) {
5339 throw new RegionInRecoveryException(this.getRegionNameAsString()
5340 + " is recovering");
5341 }
5342 break;
5343 default:
5344 break;
5345 }
5346 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION) {
5347
5348 return;
5349 }
5350 if (this.closing.get()) {
5351 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5352 }
5353 lock(lock.readLock());
5354 if (this.closed.get()) {
5355 lock.readLock().unlock();
5356 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5357 }
5358 }
5359
5360
5361
5362
5363
5364 public void closeRegionOperation() {
5365 lock.readLock().unlock();
5366 }
5367
5368
5369
5370
5371
5372
5373
5374
5375
5376
5377 private void startBulkRegionOperation(boolean writeLockNeeded)
5378 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5379 if (this.closing.get()) {
5380 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5381 }
5382 if (writeLockNeeded) lock(lock.writeLock());
5383 else lock(lock.readLock());
5384 if (this.closed.get()) {
5385 if (writeLockNeeded) lock.writeLock().unlock();
5386 else lock.readLock().unlock();
5387 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5388 }
5389 }
5390
5391
5392
5393
5394
5395 private void closeBulkRegionOperation(){
5396 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5397 else lock.readLock().unlock();
5398 }
5399
5400
5401
5402
5403
5404 private void recordPutWithoutWal(final Map<byte [], List<? extends Cell>> familyMap) {
5405 numPutsWithoutWAL.increment();
5406 if (numPutsWithoutWAL.get() <= 1) {
5407 LOG.info("writing data to region " + this +
5408 " with WAL disabled. Data may be lost in the event of a crash.");
5409 }
5410
5411 long putSize = 0;
5412 for (List<? extends Cell> cells: familyMap.values()) {
5413 for (Cell cell : cells) {
5414 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5415 putSize += kv.getKeyLength() + kv.getValueLength();
5416 }
5417 }
5418
5419 dataInMemoryWithoutWAL.add(putSize);
5420 }
5421
5422 private void lock(final Lock lock)
5423 throws RegionTooBusyException, InterruptedIOException {
5424 lock(lock, 1);
5425 }
5426
5427
5428
5429
5430
5431
5432 private void lock(final Lock lock, final int multiplier)
5433 throws RegionTooBusyException, InterruptedIOException {
5434 try {
5435 final long waitTime = Math.min(maxBusyWaitDuration,
5436 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5437 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5438 throw new RegionTooBusyException(
5439 "failed to get a lock in " + waitTime + "ms");
5440 }
5441 } catch (InterruptedException ie) {
5442 LOG.info("Interrupted while waiting for a lock");
5443 InterruptedIOException iie = new InterruptedIOException();
5444 iie.initCause(ie);
5445 throw iie;
5446 }
5447 }
5448
5449
5450
5451
5452
5453
5454
5455 private void syncOrDefer(long txid, Durability durability) throws IOException {
5456 if (this.getRegionInfo().isMetaRegion()) {
5457 this.log.sync(txid);
5458 } else {
5459 switch(durability) {
5460 case USE_DEFAULT:
5461
5462 if (!isDeferredLogSyncEnabled()) {
5463 this.log.sync(txid);
5464 }
5465 break;
5466 case SKIP_WAL:
5467
5468 break;
5469 case ASYNC_WAL:
5470
5471 if (this.deferredLogSyncDisabled) {
5472 this.log.sync(txid);
5473 }
5474 break;
5475 case SYNC_WAL:
5476 case FSYNC_WAL:
5477
5478 this.log.sync(txid);
5479 break;
5480 }
5481 }
5482 }
5483
5484
5485
5486
5487 private boolean isDeferredLogSyncEnabled() {
5488 return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled);
5489 }
5490
5491
5492
5493
5494 private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
5495
5496 @Override
5497 public void add(int index, KeyValue element) {
5498
5499 }
5500
5501 @Override
5502 public boolean addAll(int index, Collection<? extends KeyValue> c) {
5503 return false;
5504 }
5505
5506 @Override
5507 public KeyValue get(int index) {
5508 throw new UnsupportedOperationException();
5509 }
5510
5511 @Override
5512 public int size() {
5513 return 0;
5514 }
5515 };
5516
5517
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527 public static void main(String[] args) throws IOException {
5528 if (args.length < 1) {
5529 printUsageAndExit(null);
5530 }
5531 boolean majorCompact = false;
5532 if (args.length > 1) {
5533 if (!args[1].toLowerCase().startsWith("major")) {
5534 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
5535 }
5536 majorCompact = true;
5537 }
5538 final Path tableDir = new Path(args[0]);
5539 final Configuration c = HBaseConfiguration.create();
5540 final FileSystem fs = FileSystem.get(c);
5541 final Path logdir = new Path(c.get("hbase.tmp.dir"));
5542 final String logname = "hlog" + tableDir.getName()
5543 + EnvironmentEdgeManager.currentTimeMillis();
5544
5545 final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
5546 try {
5547 processTable(fs, tableDir, log, c, majorCompact);
5548 } finally {
5549 log.close();
5550
5551 BlockCache bc = new CacheConfig(c).getBlockCache();
5552 if (bc != null) bc.shutdown();
5553 }
5554 }
5555
5556
5557
5558
5559 public long getOpenSeqNum() {
5560 return this.openSeqNum;
5561 }
5562
5563
5564
5565
5566
5567 public long getMinSeqIdForLogReplay() {
5568 return this.minSeqIdForLogReplay;
5569 }
5570
5571
5572
5573
5574 public CompactionState getCompactionState() {
5575 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
5576 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
5577 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
5578 }
5579
5580 public void reportCompactionRequestStart(boolean isMajor){
5581 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
5582 }
5583
5584 public void reportCompactionRequestEnd(boolean isMajor){
5585 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
5586 assert newValue >= 0;
5587 }
5588
5589
5590
5591
5592
5593
5594 public static interface BulkLoadListener {
5595
5596
5597
5598
5599
5600
5601
5602
5603 String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
5604
5605
5606
5607
5608
5609
5610
5611 void doneBulkLoad(byte[] family, String srcPath) throws IOException;
5612
5613
5614
5615
5616
5617
5618
5619 void failedBulkLoad(byte[] family, String srcPath) throws IOException;
5620 }
5621 }