1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.NavigableMap;
40 import java.util.NavigableSet;
41 import java.util.RandomAccess;
42 import java.util.Set;
43 import java.util.TreeMap;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.CompletionService;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.ConcurrentMap;
48 import java.util.concurrent.ConcurrentSkipListMap;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.ExecutorCompletionService;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.FutureTask;
55 import java.util.concurrent.ThreadFactory;
56 import java.util.concurrent.ThreadPoolExecutor;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicLong;
62 import java.util.concurrent.locks.Lock;
63 import java.util.concurrent.locks.ReadWriteLock;
64 import java.util.concurrent.locks.ReentrantReadWriteLock;
65
66 import org.apache.commons.lang.RandomStringUtils;
67 import org.apache.commons.logging.Log;
68 import org.apache.commons.logging.LogFactory;
69 import org.apache.hadoop.conf.Configuration;
70 import org.apache.hadoop.fs.FileStatus;
71 import org.apache.hadoop.fs.FileSystem;
72 import org.apache.hadoop.fs.Path;
73 import org.apache.hadoop.hbase.Cell;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.CellUtil;
76 import org.apache.hadoop.hbase.CompoundConfiguration;
77 import org.apache.hadoop.hbase.DoNotRetryIOException;
78 import org.apache.hadoop.hbase.DroppedSnapshotException;
79 import org.apache.hadoop.hbase.HBaseConfiguration;
80 import org.apache.hadoop.hbase.HColumnDescriptor;
81 import org.apache.hadoop.hbase.HConstants;
82 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
83 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
84 import org.apache.hadoop.hbase.HRegionInfo;
85 import org.apache.hadoop.hbase.HTableDescriptor;
86 import org.apache.hadoop.hbase.KeyValue;
87 import org.apache.hadoop.hbase.KeyValueUtil;
88 import org.apache.hadoop.hbase.NamespaceDescriptor;
89 import org.apache.hadoop.hbase.NotServingRegionException;
90 import org.apache.hadoop.hbase.RegionTooBusyException;
91 import org.apache.hadoop.hbase.TableName;
92 import org.apache.hadoop.hbase.Tag;
93 import org.apache.hadoop.hbase.TagType;
94 import org.apache.hadoop.hbase.UnknownScannerException;
95 import org.apache.hadoop.hbase.backup.HFileArchiver;
96 import org.apache.hadoop.hbase.classification.InterfaceAudience;
97 import org.apache.hadoop.hbase.client.Append;
98 import org.apache.hadoop.hbase.client.Delete;
99 import org.apache.hadoop.hbase.client.Durability;
100 import org.apache.hadoop.hbase.client.Get;
101 import org.apache.hadoop.hbase.client.Increment;
102 import org.apache.hadoop.hbase.client.IsolationLevel;
103 import org.apache.hadoop.hbase.client.Mutation;
104 import org.apache.hadoop.hbase.client.Put;
105 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
106 import org.apache.hadoop.hbase.client.Result;
107 import org.apache.hadoop.hbase.client.RowMutations;
108 import org.apache.hadoop.hbase.client.Scan;
109 import org.apache.hadoop.hbase.conf.ConfigurationManager;
110 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
111 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
112 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
113 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
114 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
115 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
116 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
117 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
118 import org.apache.hadoop.hbase.filter.FilterWrapper;
119 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
120 import org.apache.hadoop.hbase.filter.PrefixFilter;
121 import org.apache.hadoop.hbase.io.HeapSize;
122 import org.apache.hadoop.hbase.io.TimeRange;
123 import org.apache.hadoop.hbase.io.hfile.BlockCache;
124 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
125 import org.apache.hadoop.hbase.io.hfile.HFile;
126 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
127 import org.apache.hadoop.hbase.ipc.RpcCallContext;
128 import org.apache.hadoop.hbase.ipc.RpcServer;
129 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
130 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
131 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
132 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
136 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
137 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
138 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
139 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
140 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
146 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
147 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
148 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
149 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
150 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
152 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
153 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
154 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
155 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
156 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
157 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
158 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
159 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
160 import org.apache.hadoop.hbase.security.User;
161 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
162 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
163 import org.apache.hadoop.hbase.util.ByteStringer;
164 import org.apache.hadoop.hbase.util.Bytes;
165 import org.apache.hadoop.hbase.util.CancelableProgressable;
166 import org.apache.hadoop.hbase.util.ClassSize;
167 import org.apache.hadoop.hbase.util.CompressionTest;
168 import org.apache.hadoop.hbase.util.Counter;
169 import org.apache.hadoop.hbase.util.EncryptionTest;
170 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
171 import org.apache.hadoop.hbase.util.FSTableDescriptors;
172 import org.apache.hadoop.hbase.util.FSUtils;
173 import org.apache.hadoop.hbase.util.HashedBytes;
174 import org.apache.hadoop.hbase.util.Pair;
175 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
176 import org.apache.hadoop.hbase.util.Threads;
177 import org.apache.hadoop.hbase.wal.WAL;
178 import org.apache.hadoop.hbase.wal.WALFactory;
179 import org.apache.hadoop.hbase.wal.WALKey;
180 import org.apache.hadoop.hbase.wal.WALSplitter;
181 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
182 import org.apache.hadoop.io.MultipleIOException;
183 import org.apache.hadoop.util.StringUtils;
184 import org.apache.htrace.Trace;
185 import org.apache.htrace.TraceScope;
186
187 import com.google.common.annotations.VisibleForTesting;
188 import com.google.common.base.Optional;
189 import com.google.common.base.Preconditions;
190 import com.google.common.collect.Lists;
191 import com.google.common.collect.Maps;
192 import com.google.common.io.Closeables;
193 import com.google.protobuf.ByteString;
194 import com.google.protobuf.Descriptors;
195 import com.google.protobuf.Message;
196 import com.google.protobuf.RpcCallback;
197 import com.google.protobuf.RpcController;
198 import com.google.protobuf.Service;
199 import com.google.protobuf.TextFormat;
200
201 @InterfaceAudience.Private
202 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
203 private static final Log LOG = LogFactory.getLog(HRegion.class);
204
205 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
206 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
207
208
209
210
211
212
213
214
215
216 private final int maxWaitForSeqId;
217 private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
218 private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
219
220
221
222
223
224 private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
225
226 final AtomicBoolean closed = new AtomicBoolean(false);
227
228
229
230
231
232
233 final AtomicBoolean closing = new AtomicBoolean(false);
234
235
236
237
238
239 private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
240
241
242
243
244
245
246 private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
247
248
249
250
251
252
253 protected volatile long lastReplayedOpenRegionSeqId = -1L;
254 protected volatile long lastReplayedCompactionSeqId = -1L;
255
256
257 protected List<Map> storeSeqIds = new ArrayList<>();
258
259
260
261
262
263
264
265
266
267
268 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
269 new ConcurrentHashMap<HashedBytes, RowLockContext>();
270
271 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
272 Bytes.BYTES_RAWCOMPARATOR);
273
274
275 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
276
277 private final AtomicLong memstoreSize = new AtomicLong(0);
278
279
280 final Counter numMutationsWithoutWAL = new Counter();
281 final Counter dataInMemoryWithoutWAL = new Counter();
282
283
284 final Counter checkAndMutateChecksPassed = new Counter();
285 final Counter checkAndMutateChecksFailed = new Counter();
286
287
288 final Counter readRequestsCount = new Counter();
289 final Counter writeRequestsCount = new Counter();
290
291
292 private final Counter blockedRequestsCount = new Counter();
293
294
295 final AtomicLong compactionsFinished = new AtomicLong(0L);
296 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
297 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
298
299 private final WAL wal;
300 private final HRegionFileSystem fs;
301 protected final Configuration conf;
302 private final Configuration baseConf;
303 private final KeyValue.KVComparator comparator;
304 private final int rowLockWaitDuration;
305 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
306
307
308
309
310
311
312
313 final long busyWaitDuration;
314 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
315
316
317
318
319 final int maxBusyWaitMultiplier;
320
321
322
323 final long maxBusyWaitDuration;
324
325
326 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
327 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
328
329 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
330
331
332
333
334 private long openSeqNum = HConstants.NO_SEQNUM;
335
336
337
338
339
340 private boolean isLoadingCfsOnDemandDefault = false;
341
342 private final AtomicInteger majorInProgress = new AtomicInteger(0);
343 private final AtomicInteger minorInProgress = new AtomicInteger(0);
344
345
346
347
348
349
350
351 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
352
353
354 private PrepareFlushResult prepareFlushResult = null;
355
356
357
358
359 private boolean disallowWritesInRecovering = false;
360
361
362 private volatile boolean recovering = false;
363
364 private volatile Optional<ConfigurationManager> configurationManager;
365
366
367
368
369
370
371 public long getSmallestReadPoint() {
372 long minimumReadPoint;
373
374
375
376 synchronized(scannerReadPoints) {
377 minimumReadPoint = mvcc.getReadPoint();
378
379 for (Long readPoint: this.scannerReadPoints.values()) {
380 if (readPoint < minimumReadPoint) {
381 minimumReadPoint = readPoint;
382 }
383 }
384 }
385 return minimumReadPoint;
386 }
387
388
389
390
391
392 static class WriteState {
393
394 volatile boolean flushing = false;
395
396 volatile boolean flushRequested = false;
397
398 AtomicInteger compacting = new AtomicInteger(0);
399
400 volatile boolean writesEnabled = true;
401
402 volatile boolean readOnly = false;
403
404
405 volatile boolean readsEnabled = true;
406
407
408
409
410
411
412 synchronized void setReadOnly(final boolean onOff) {
413 this.writesEnabled = !onOff;
414 this.readOnly = onOff;
415 }
416
417 boolean isReadOnly() {
418 return this.readOnly;
419 }
420
421 boolean isFlushRequested() {
422 return this.flushRequested;
423 }
424
425 void setReadsEnabled(boolean readsEnabled) {
426 this.readsEnabled = readsEnabled;
427 }
428
429 static final long HEAP_SIZE = ClassSize.align(
430 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
431 }
432
433
434
435
436
437
438
439 public static class FlushResultImpl implements FlushResult {
440 final Result result;
441 final String failureReason;
442 final long flushSequenceId;
443 final boolean wroteFlushWalMarker;
444
445
446
447
448
449
450
451
452 FlushResultImpl(Result result, long flushSequenceId) {
453 this(result, flushSequenceId, null, false);
454 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
455 .FLUSHED_COMPACTION_NEEDED;
456 }
457
458
459
460
461
462
463 FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
464 this(result, -1, failureReason, wroteFlushMarker);
465 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
466 }
467
468
469
470
471
472
473
474 FlushResultImpl(Result result, long flushSequenceId, String failureReason,
475 boolean wroteFlushMarker) {
476 this.result = result;
477 this.flushSequenceId = flushSequenceId;
478 this.failureReason = failureReason;
479 this.wroteFlushWalMarker = wroteFlushMarker;
480 }
481
482
483
484
485
486
487 @Override
488 public boolean isFlushSucceeded() {
489 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
490 .FLUSHED_COMPACTION_NEEDED;
491 }
492
493
494
495
496
497 @Override
498 public boolean isCompactionNeeded() {
499 return result == Result.FLUSHED_COMPACTION_NEEDED;
500 }
501
502 @Override
503 public String toString() {
504 return new StringBuilder()
505 .append("flush result:").append(result).append(", ")
506 .append("failureReason:").append(failureReason).append(",")
507 .append("flush seq id").append(flushSequenceId).toString();
508 }
509
510 @Override
511 public Result getResult() {
512 return result;
513 }
514 }
515
516
517 @VisibleForTesting
518 static class PrepareFlushResult {
519 final FlushResult result;
520 final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
521 final TreeMap<byte[], List<Path>> committedFiles;
522 final TreeMap<byte[], Long> storeFlushableSize;
523 final long startTime;
524 final long flushOpSeqId;
525 final long flushedSeqId;
526 final long totalFlushableSize;
527
528
529 PrepareFlushResult(FlushResult result, long flushSeqId) {
530 this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
531 }
532
533
534 PrepareFlushResult(
535 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
536 TreeMap<byte[], List<Path>> committedFiles,
537 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
538 long flushedSeqId, long totalFlushableSize) {
539 this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
540 flushSeqId, flushedSeqId, totalFlushableSize);
541 }
542
543 private PrepareFlushResult(
544 FlushResult result,
545 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
546 TreeMap<byte[], List<Path>> committedFiles,
547 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
548 long flushedSeqId, long totalFlushableSize) {
549 this.result = result;
550 this.storeFlushCtxs = storeFlushCtxs;
551 this.committedFiles = committedFiles;
552 this.storeFlushableSize = storeFlushableSize;
553 this.startTime = startTime;
554 this.flushOpSeqId = flushSeqId;
555 this.flushedSeqId = flushedSeqId;
556 this.totalFlushableSize = totalFlushableSize;
557 }
558
559 public FlushResult getResult() {
560 return this.result;
561 }
562 }
563
564 final WriteState writestate = new WriteState();
565
566 long memstoreFlushSize;
567 final long timestampSlop;
568 final long rowProcessorTimeout;
569
570
571 private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
572 new ConcurrentHashMap<Store, Long>();
573
574 final RegionServerServices rsServices;
575 private RegionServerAccounting rsAccounting;
576 private long flushCheckInterval;
577
578 private long flushPerChanges;
579 private long blockingMemStoreSize;
580 final long threadWakeFrequency;
581
582 final ReentrantReadWriteLock lock =
583 new ReentrantReadWriteLock();
584
585
586 private final ReentrantReadWriteLock updatesLock =
587 new ReentrantReadWriteLock();
588 private boolean splitRequest;
589 private byte[] explicitSplitPoint = null;
590
591 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
592
593
594 private RegionCoprocessorHost coprocessorHost;
595
596 private HTableDescriptor htableDescriptor = null;
597 private RegionSplitPolicy splitPolicy;
598 private FlushPolicy flushPolicy;
599
600 private final MetricsRegion metricsRegion;
601 private final MetricsRegionWrapperImpl metricsRegionWrapper;
602 private final Durability durability;
603 private final boolean regionStatsEnabled;
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626 @Deprecated
627 @VisibleForTesting
628 public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
629 final Configuration confParam, final HRegionInfo regionInfo,
630 final HTableDescriptor htd, final RegionServerServices rsServices) {
631 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
632 wal, confParam, htd, rsServices);
633 }
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
652 final HTableDescriptor htd, final RegionServerServices rsServices) {
653 if (htd == null) {
654 throw new IllegalArgumentException("Need table descriptor");
655 }
656
657 if (confParam instanceof CompoundConfiguration) {
658 throw new IllegalArgumentException("Need original base configuration");
659 }
660
661 this.comparator = fs.getRegionInfo().getComparator();
662 this.wal = wal;
663 this.fs = fs;
664
665
666 this.baseConf = confParam;
667 this.conf = new CompoundConfiguration()
668 .add(confParam)
669 .addStringMap(htd.getConfiguration())
670 .addWritableMap(htd.getValues());
671 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
672 DEFAULT_CACHE_FLUSH_INTERVAL);
673 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
674 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
675 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
676 + MAX_FLUSH_PER_CHANGES);
677 }
678 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
679 DEFAULT_ROWLOCK_WAIT_DURATION);
680
681 this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
682 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
683 this.htableDescriptor = htd;
684 this.rsServices = rsServices;
685 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
686 setHTableSpecificConf();
687 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
688
689 this.busyWaitDuration = conf.getLong(
690 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
691 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
692 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
693 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
694 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
695 + maxBusyWaitMultiplier + "). Their product should be positive");
696 }
697 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
698 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
699
700
701
702
703
704
705
706 this.timestampSlop = conf.getLong(
707 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
708 HConstants.LATEST_TIMESTAMP);
709
710
711
712
713
714 this.rowProcessorTimeout = conf.getLong(
715 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
716 this.durability = htd.getDurability() == Durability.USE_DEFAULT
717 ? DEFAULT_DURABILITY
718 : htd.getDurability();
719 if (rsServices != null) {
720 this.rsAccounting = this.rsServices.getRegionServerAccounting();
721
722
723 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
724 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
725 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
726
727 Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
728 String encodedName = getRegionInfo().getEncodedName();
729 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
730 this.recovering = true;
731 recoveringRegions.put(encodedName, this);
732 }
733 } else {
734 this.metricsRegionWrapper = null;
735 this.metricsRegion = null;
736 }
737 if (LOG.isDebugEnabled()) {
738
739 LOG.debug("Instantiated " + this);
740 }
741
742
743 this.disallowWritesInRecovering =
744 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
745 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
746 configurationManager = Optional.absent();
747
748
749 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
750 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
751 false :
752 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
753 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
754 }
755
756 void setHTableSpecificConf() {
757 if (this.htableDescriptor == null) return;
758 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
759
760 if (flushSize <= 0) {
761 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
762 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
763 }
764 this.memstoreFlushSize = flushSize;
765 this.blockingMemStoreSize = this.memstoreFlushSize *
766 conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
767 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
768 }
769
770
771
772
773
774
775
776
777
778 @Deprecated
779 public long initialize() throws IOException {
780 return initialize(null);
781 }
782
783
784
785
786
787
788
789
790 private long initialize(final CancelableProgressable reporter) throws IOException {
791 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
792 long nextSeqId = -1;
793 try {
794 nextSeqId = initializeRegionInternals(reporter, status);
795 return nextSeqId;
796 } finally {
797
798
799 if (nextSeqId == -1) {
800 status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
801 " initialization.");
802 }
803 }
804 }
805
806 private long initializeRegionInternals(final CancelableProgressable reporter,
807 final MonitoredTask status) throws IOException {
808 if (coprocessorHost != null) {
809 status.setStatus("Running coprocessor pre-open hook");
810 coprocessorHost.preOpen();
811 }
812
813
814
815 if (this.getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
816 status.setStatus("Writing region info on filesystem");
817 fs.checkRegionInfoOnFilesystem();
818 } else {
819 if (LOG.isDebugEnabled()) {
820 LOG.debug("Skipping creation of .regioninfo file for " + this.getRegionInfo());
821 }
822 }
823
824
825 status.setStatus("Initializing all the Stores");
826 long maxSeqId = initializeStores(reporter, status);
827 this.mvcc.advanceTo(maxSeqId);
828 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
829
830 maxSeqId = Math.max(maxSeqId,
831 replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
832
833 this.mvcc.advanceTo(maxSeqId);
834 }
835 this.lastReplayedOpenRegionSeqId = maxSeqId;
836
837 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
838 this.writestate.flushRequested = false;
839 this.writestate.compacting.set(0);
840
841 if (this.writestate.writesEnabled) {
842
843 status.setStatus("Cleaning up temporary data from old regions");
844 fs.cleanupTempDir();
845 }
846
847 if (this.writestate.writesEnabled) {
848 status.setStatus("Cleaning up detritus from prior splits");
849
850
851
852 fs.cleanupAnySplitDetritus();
853 fs.cleanupMergesDir();
854 }
855
856
857 this.splitPolicy = RegionSplitPolicy.create(this, conf);
858
859
860 this.flushPolicy = FlushPolicyFactory.create(this, conf);
861
862 long lastFlushTime = EnvironmentEdgeManager.currentTime();
863 for (Store store: stores.values()) {
864 this.lastStoreFlushTimeMap.put(store, lastFlushTime);
865 }
866
867
868
869 long nextSeqid = maxSeqId;
870
871
872
873
874 if (this.writestate.writesEnabled) {
875 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
876 .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
877 } else {
878 nextSeqid++;
879 }
880
881 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
882 "; next sequenceid=" + nextSeqid);
883
884
885 this.closing.set(false);
886 this.closed.set(false);
887
888 if (coprocessorHost != null) {
889 status.setStatus("Running coprocessor post-open hooks");
890 coprocessorHost.postOpen();
891 }
892
893 status.markComplete("Region opened successfully");
894 return nextSeqid;
895 }
896
897
898
899
900
901
902
903
904 private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
905 throws IOException {
906
907
908 long maxSeqId = -1;
909
910 long maxMemstoreTS = -1;
911
912 if (!htableDescriptor.getFamilies().isEmpty()) {
913
914 ThreadPoolExecutor storeOpenerThreadPool =
915 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
916 CompletionService<HStore> completionService =
917 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
918
919
920 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
921 status.setStatus("Instantiating store for column family " + family);
922 completionService.submit(new Callable<HStore>() {
923 @Override
924 public HStore call() throws IOException {
925 return instantiateHStore(family);
926 }
927 });
928 }
929 boolean allStoresOpened = false;
930 try {
931 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
932 Future<HStore> future = completionService.take();
933 HStore store = future.get();
934 this.stores.put(store.getFamily().getName(), store);
935
936 long storeMaxSequenceId = store.getMaxSequenceId();
937 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
938 storeMaxSequenceId);
939 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
940 maxSeqId = storeMaxSequenceId;
941 }
942 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
943 if (maxStoreMemstoreTS > maxMemstoreTS) {
944 maxMemstoreTS = maxStoreMemstoreTS;
945 }
946 }
947 allStoresOpened = true;
948 } catch (InterruptedException e) {
949 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
950 } catch (ExecutionException e) {
951 throw new IOException(e.getCause());
952 } finally {
953 storeOpenerThreadPool.shutdownNow();
954 if (!allStoresOpened) {
955
956 LOG.error("Could not initialize all stores for the region=" + this);
957 for (Store store : this.stores.values()) {
958 try {
959 store.close();
960 } catch (IOException e) {
961 LOG.warn(e.getMessage());
962 }
963 }
964 }
965 }
966 }
967 return Math.max(maxSeqId, maxMemstoreTS + 1);
968 }
969
970 private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
971 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
972
973 status.setStatus("Warming up all the Stores");
974 try {
975 initializeStores(reporter, status);
976 } finally {
977 status.markComplete("Done warming up.");
978 }
979 }
980
981
982
983
984 private NavigableMap<byte[], List<Path>> getStoreFiles() {
985 NavigableMap<byte[], List<Path>> allStoreFiles =
986 new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
987 for (Store store: getStores()) {
988 Collection<StoreFile> storeFiles = store.getStorefiles();
989 if (storeFiles == null) continue;
990 List<Path> storeFileNames = new ArrayList<Path>();
991 for (StoreFile storeFile: storeFiles) {
992 storeFileNames.add(storeFile.getPath());
993 }
994 allStoreFiles.put(store.getFamily().getName(), storeFileNames);
995 }
996 return allStoreFiles;
997 }
998
999 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
1000 Map<byte[], List<Path>> storeFiles = getStoreFiles();
1001 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1002 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
1003 getRegionServerServices().getServerName(), storeFiles);
1004 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
1005 }
1006
1007 private void writeRegionCloseMarker(WAL wal) throws IOException {
1008 Map<byte[], List<Path>> storeFiles = getStoreFiles();
1009 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
1010 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
1011 getRegionServerServices().getServerName(), storeFiles);
1012 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
1013
1014
1015
1016
1017 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
1018 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
1019 mvcc.getReadPoint(), 0);
1020 }
1021 }
1022
1023
1024
1025
1026 public boolean hasReferences() {
1027 for (Store store : this.stores.values()) {
1028 if (store.hasReferences()) return true;
1029 }
1030 return false;
1031 }
1032
1033 @Override
1034 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1035 HDFSBlocksDistribution hdfsBlocksDistribution =
1036 new HDFSBlocksDistribution();
1037 synchronized (this.stores) {
1038 for (Store store : this.stores.values()) {
1039 Collection<StoreFile> storeFiles = store.getStorefiles();
1040 if (storeFiles == null) continue;
1041 for (StoreFile sf : storeFiles) {
1042 HDFSBlocksDistribution storeFileBlocksDistribution =
1043 sf.getHDFSBlockDistribution();
1044 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1045 }
1046 }
1047 }
1048 return hdfsBlocksDistribution;
1049 }
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1060 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1061 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1062 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1063 }
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1075 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
1076 throws IOException {
1077 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1078 FileSystem fs = tablePath.getFileSystem(conf);
1079
1080 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1081 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1082 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1083 if (storeFiles == null) continue;
1084 for (StoreFileInfo storeFileInfo : storeFiles) {
1085 try {
1086 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1087 } catch (IOException ioe) {
1088 LOG.warn("Error getting hdfs block distribution for " + storeFileInfo);
1089 }
1090 }
1091 }
1092 return hdfsBlocksDistribution;
1093 }
1094
1095
1096
1097
1098
1099
1100 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1101 if (this.rsAccounting != null) {
1102 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1103 }
1104 long size = this.memstoreSize.addAndGet(memStoreSize);
1105
1106
1107 if (size < 0) {
1108 LOG.error("Asked to modify this region's (" + this.toString()
1109 + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
1110 + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
1111 }
1112 return size;
1113 }
1114
1115 @Override
1116 public HRegionInfo getRegionInfo() {
1117 return this.fs.getRegionInfo();
1118 }
1119
1120
1121
1122
1123
1124 RegionServerServices getRegionServerServices() {
1125 return this.rsServices;
1126 }
1127
1128 @Override
1129 public long getReadRequestsCount() {
1130 return readRequestsCount.get();
1131 }
1132
1133 @Override
1134 public void updateReadRequestsCount(long i) {
1135 readRequestsCount.add(i);
1136 }
1137
1138 @Override
1139 public long getWriteRequestsCount() {
1140 return writeRequestsCount.get();
1141 }
1142
1143 @Override
1144 public void updateWriteRequestsCount(long i) {
1145 writeRequestsCount.add(i);
1146 }
1147
1148 @Override
1149 public long getMemstoreSize() {
1150 return memstoreSize.get();
1151 }
1152
1153 @Override
1154 public long getNumMutationsWithoutWAL() {
1155 return numMutationsWithoutWAL.get();
1156 }
1157
1158 @Override
1159 public long getDataInMemoryWithoutWAL() {
1160 return dataInMemoryWithoutWAL.get();
1161 }
1162
1163 @Override
1164 public long getBlockedRequestsCount() {
1165 return blockedRequestsCount.get();
1166 }
1167
1168 @Override
1169 public long getCheckAndMutateChecksPassed() {
1170 return checkAndMutateChecksPassed.get();
1171 }
1172
1173 @Override
1174 public long getCheckAndMutateChecksFailed() {
1175 return checkAndMutateChecksFailed.get();
1176 }
1177
1178 @Override
1179 public MetricsRegion getMetrics() {
1180 return metricsRegion;
1181 }
1182
1183 @Override
1184 public boolean isClosed() {
1185 return this.closed.get();
1186 }
1187
1188 @Override
1189 public boolean isClosing() {
1190 return this.closing.get();
1191 }
1192
1193 @Override
1194 public boolean isReadOnly() {
1195 return this.writestate.isReadOnly();
1196 }
1197
1198
1199
1200
1201 public void setRecovering(boolean newState) {
1202 boolean wasRecovering = this.recovering;
1203
1204
1205 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1206 && wasRecovering && !newState) {
1207
1208
1209 boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1210
1211 MonitoredTask status = TaskMonitor.get().createStatus("Recovering region " + this);
1212
1213 try {
1214
1215 if (forceFlush) {
1216 status.setStatus("Flushing region " + this + " because recovery is finished");
1217 internalFlushcache(status);
1218 }
1219
1220 status.setStatus("Writing region open event marker to WAL because recovery is finished");
1221 try {
1222 long seqId = openSeqNum;
1223
1224 if (wal != null) {
1225 seqId = getNextSequenceId(wal);
1226 }
1227 writeRegionOpenMarker(wal, seqId);
1228 } catch (IOException e) {
1229
1230
1231 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1232 + "event to WAL, continuing", e);
1233 }
1234 } catch (IOException ioe) {
1235
1236
1237 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1238 + "event to WAL, continuing", ioe);
1239 } finally {
1240 status.cleanup();
1241 }
1242 }
1243
1244 this.recovering = newState;
1245 if (wasRecovering && !recovering) {
1246
1247 coprocessorHost.postLogReplay();
1248 }
1249 }
1250
1251 @Override
1252 public boolean isRecovering() {
1253 return this.recovering;
1254 }
1255
1256 @Override
1257 public boolean isAvailable() {
1258 return !isClosed() && !isClosing();
1259 }
1260
1261
1262 public boolean isSplittable() {
1263 return isAvailable() && !hasReferences();
1264 }
1265
1266
1267
1268
1269 public boolean isMergeable() {
1270 if (!isAvailable()) {
1271 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1272 + " is not mergeable because it is closing or closed");
1273 return false;
1274 }
1275 if (hasReferences()) {
1276 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1277 + " is not mergeable because it has references");
1278 return false;
1279 }
1280
1281 return true;
1282 }
1283
1284 public boolean areWritesEnabled() {
1285 synchronized(this.writestate) {
1286 return this.writestate.writesEnabled;
1287 }
1288 }
1289
1290 public MultiVersionConcurrencyControl getMVCC() {
1291 return mvcc;
1292 }
1293
1294 @Override
1295 public long getMaxFlushedSeqId() {
1296 return maxFlushedSeqId;
1297 }
1298
1299 @Override
1300 public long getReadpoint(IsolationLevel isolationLevel) {
1301 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1302
1303 return Long.MAX_VALUE;
1304 }
1305 return mvcc.getReadPoint();
1306 }
1307
1308 @Override
1309 public boolean isLoadingCfsOnDemandDefault() {
1310 return this.isLoadingCfsOnDemandDefault;
1311 }
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329 public Map<byte[], List<StoreFile>> close() throws IOException {
1330 return close(false);
1331 }
1332
1333 private final Object closeLock = new Object();
1334
1335
1336 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1337 "hbase.regionserver.optionalcacheflushinterval";
1338
1339 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1340
1341 public static final int SYSTEM_CACHE_FLUSH_INTERVAL = 300000;
1342
1343
1344 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1345 "hbase.regionserver.flush.per.changes";
1346 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1347
1348
1349
1350
1351 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1371
1372
1373 MonitoredTask status = TaskMonitor.get().createStatus(
1374 "Closing region " + this +
1375 (abort ? " due to abort" : ""));
1376
1377 status.setStatus("Waiting for close lock");
1378 try {
1379 synchronized (closeLock) {
1380 return doClose(abort, status);
1381 }
1382 } finally {
1383 status.cleanup();
1384 }
1385 }
1386
1387
1388
1389
1390 @VisibleForTesting
1391 public void setClosing(boolean closing) {
1392 this.closing.set(closing);
1393 }
1394
1395 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
1396 justification="I think FindBugs is confused")
1397 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1398 throws IOException {
1399 if (isClosed()) {
1400 LOG.warn("Region " + this + " already closed");
1401 return null;
1402 }
1403
1404 if (coprocessorHost != null) {
1405 status.setStatus("Running coprocessor pre-close hooks");
1406 this.coprocessorHost.preClose(abort);
1407 }
1408
1409 status.setStatus("Disabling compacts and flushes for region");
1410 boolean canFlush = true;
1411 synchronized (writestate) {
1412
1413
1414 canFlush = !writestate.readOnly;
1415 writestate.writesEnabled = false;
1416 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1417 waitForFlushesAndCompactions();
1418 }
1419
1420
1421
1422 if (!abort && worthPreFlushing() && canFlush) {
1423 status.setStatus("Pre-flushing region before close");
1424 LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1425 try {
1426 internalFlushcache(status);
1427 } catch (IOException ioe) {
1428
1429 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1430 }
1431 }
1432
1433
1434 lock.writeLock().lock();
1435 this.closing.set(true);
1436 status.setStatus("Disabling writes for close");
1437 try {
1438 if (this.isClosed()) {
1439 status.abort("Already got closed by another process");
1440
1441 return null;
1442 }
1443 LOG.debug("Updates disabled for region " + this);
1444
1445 if (!abort && canFlush) {
1446 int flushCount = 0;
1447 while (this.memstoreSize.get() > 0) {
1448 try {
1449 if (flushCount++ > 0) {
1450 int actualFlushes = flushCount - 1;
1451 if (actualFlushes > 5) {
1452
1453
1454 throw new DroppedSnapshotException("Failed clearing memory after " +
1455 actualFlushes + " attempts on region: " +
1456 Bytes.toStringBinary(getRegionInfo().getRegionName()));
1457 }
1458 LOG.info("Running extra flush, " + actualFlushes +
1459 " (carrying snapshot?) " + this);
1460 }
1461 internalFlushcache(status);
1462 } catch (IOException ioe) {
1463 status.setStatus("Failed flush " + this + ", putting online again");
1464 synchronized (writestate) {
1465 writestate.writesEnabled = true;
1466 }
1467
1468 throw ioe;
1469 }
1470 }
1471 }
1472
1473 Map<byte[], List<StoreFile>> result =
1474 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1475 if (!stores.isEmpty()) {
1476
1477 ThreadPoolExecutor storeCloserThreadPool =
1478 getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1479 getRegionInfo().getRegionNameAsString());
1480 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1481 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1482
1483
1484 for (final Store store : stores.values()) {
1485 long flushableSize = store.getFlushableSize();
1486 if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1487 if (getRegionServerServices() != null) {
1488 getRegionServerServices().abort("Assertion failed while closing store "
1489 + getRegionInfo().getRegionNameAsString() + " " + store
1490 + ". flushableSize expected=0, actual= " + flushableSize
1491 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1492 + "operation failed and left the memstore in a partially updated state.", null);
1493 }
1494 }
1495 completionService
1496 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1497 @Override
1498 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1499 return new Pair<byte[], Collection<StoreFile>>(
1500 store.getFamily().getName(), store.close());
1501 }
1502 });
1503 }
1504 try {
1505 for (int i = 0; i < stores.size(); i++) {
1506 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1507 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1508 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1509 if (familyFiles == null) {
1510 familyFiles = new ArrayList<StoreFile>();
1511 result.put(storeFiles.getFirst(), familyFiles);
1512 }
1513 familyFiles.addAll(storeFiles.getSecond());
1514 }
1515 } catch (InterruptedException e) {
1516 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1517 } catch (ExecutionException e) {
1518 throw new IOException(e.getCause());
1519 } finally {
1520 storeCloserThreadPool.shutdownNow();
1521 }
1522 }
1523
1524 status.setStatus("Writing region close event to WAL");
1525 if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1526 writeRegionCloseMarker(wal);
1527 }
1528
1529 this.closed.set(true);
1530 if (!canFlush) {
1531 addAndGetGlobalMemstoreSize(-memstoreSize.get());
1532 } else if (memstoreSize.get() != 0) {
1533 LOG.error("Memstore size is " + memstoreSize.get());
1534 }
1535 if (coprocessorHost != null) {
1536 status.setStatus("Running coprocessor post-close hooks");
1537 this.coprocessorHost.postClose(abort);
1538 }
1539 if (this.metricsRegion != null) {
1540 this.metricsRegion.close();
1541 }
1542 if (this.metricsRegionWrapper != null) {
1543 Closeables.closeQuietly(this.metricsRegionWrapper);
1544 }
1545 status.markComplete("Closed");
1546 LOG.info("Closed " + this);
1547 return result;
1548 } finally {
1549 lock.writeLock().unlock();
1550 }
1551 }
1552
1553 @Override
1554 public void waitForFlushesAndCompactions() {
1555 synchronized (writestate) {
1556 if (this.writestate.readOnly) {
1557
1558
1559 return;
1560 }
1561 boolean interrupted = false;
1562 try {
1563 while (writestate.compacting.get() > 0 || writestate.flushing) {
1564 LOG.debug("waiting for " + writestate.compacting + " compactions"
1565 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1566 try {
1567 writestate.wait();
1568 } catch (InterruptedException iex) {
1569
1570 LOG.warn("Interrupted while waiting");
1571 interrupted = true;
1572 }
1573 }
1574 } finally {
1575 if (interrupted) {
1576 Thread.currentThread().interrupt();
1577 }
1578 }
1579 }
1580 }
1581
1582 public void waitForFlushes() {
1583 synchronized (writestate) {
1584 if (this.writestate.readOnly) {
1585
1586
1587 return;
1588 }
1589 if (!writestate.flushing) return;
1590 long start = System.currentTimeMillis();
1591 boolean interrupted = false;
1592 try {
1593 while (writestate.flushing) {
1594 LOG.debug("waiting for cache flush to complete for region " + this);
1595 try {
1596 writestate.wait();
1597 } catch (InterruptedException iex) {
1598
1599 LOG.warn("Interrupted while waiting");
1600 interrupted = true;
1601 break;
1602 }
1603 }
1604 } finally {
1605 if (interrupted) {
1606 Thread.currentThread().interrupt();
1607 }
1608 }
1609 long duration = System.currentTimeMillis() - start;
1610 LOG.debug("Waited " + duration + " ms for flush to complete");
1611 }
1612 }
1613 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1614 final String threadNamePrefix) {
1615 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1616 int maxThreads = Math.min(numStores,
1617 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1618 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1619 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1620 }
1621
1622 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1623 final String threadNamePrefix) {
1624 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1625 int maxThreads = Math.max(1,
1626 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1627 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1628 / numStores);
1629 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1630 }
1631
1632 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1633 final String threadNamePrefix) {
1634 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1635 new ThreadFactory() {
1636 private int count = 1;
1637
1638 @Override
1639 public Thread newThread(Runnable r) {
1640 return new Thread(r, threadNamePrefix + "-" + count++);
1641 }
1642 });
1643 }
1644
1645
1646
1647
1648 private boolean worthPreFlushing() {
1649 return this.memstoreSize.get() >
1650 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1651 }
1652
1653
1654
1655
1656
1657 @Override
1658 public HTableDescriptor getTableDesc() {
1659 return this.htableDescriptor;
1660 }
1661
1662
1663 public WAL getWAL() {
1664 return this.wal;
1665 }
1666
1667
1668
1669
1670
1671
1672
1673
1674 Configuration getBaseConf() {
1675 return this.baseConf;
1676 }
1677
1678
1679 public FileSystem getFilesystem() {
1680 return fs.getFileSystem();
1681 }
1682
1683
1684 public HRegionFileSystem getRegionFileSystem() {
1685 return this.fs;
1686 }
1687
1688 @Override
1689 public long getEarliestFlushTimeForAllStores() {
1690 return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1691 .values());
1692 }
1693
1694 @Override
1695 public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1696 long result = Long.MAX_VALUE;
1697 for (Store store : getStores()) {
1698 Collection<StoreFile> storeFiles = store.getStorefiles();
1699 if (storeFiles == null) continue;
1700 for (StoreFile file : storeFiles) {
1701 StoreFile.Reader sfReader = file.getReader();
1702 if (sfReader == null) continue;
1703 HFile.Reader reader = sfReader.getHFileReader();
1704 if (reader == null) continue;
1705 if (majorCompactioOnly) {
1706 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1707 if (val == null) continue;
1708 if (val == null || !Bytes.toBoolean(val)) {
1709 continue;
1710 }
1711 }
1712 result = Math.min(result, reader.getFileContext().getFileCreateTime());
1713 }
1714 }
1715 return result == Long.MAX_VALUE ? 0 : result;
1716 }
1717
1718 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1719 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1720 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1721 regionLoadBldr.clearStoreCompleteSequenceId();
1722 for (byte[] familyName : this.stores.keySet()) {
1723 long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1724
1725
1726
1727 long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
1728 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
1729 newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
1730 }
1731 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1732 }
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742 public long getLargestHStoreSize() {
1743 long size = 0;
1744 for (Store h : stores.values()) {
1745 long storeSize = h.getSize();
1746 if (storeSize > size) {
1747 size = storeSize;
1748 }
1749 }
1750 return size;
1751 }
1752
1753
1754
1755
1756 public KeyValue.KVComparator getComparator() {
1757 return this.comparator;
1758 }
1759
1760
1761
1762
1763
1764 protected void doRegionCompactionPrep() throws IOException {
1765 }
1766
1767 @Override
1768 public void triggerMajorCompaction() throws IOException {
1769 for (Store s : getStores()) {
1770 s.triggerMajorCompaction();
1771 }
1772 }
1773
1774 @Override
1775 public void compact(final boolean majorCompaction) throws IOException {
1776 if (majorCompaction) {
1777 triggerMajorCompaction();
1778 }
1779 for (Store s : getStores()) {
1780 CompactionContext compaction = s.requestCompaction();
1781 if (compaction != null) {
1782 CompactionThroughputController controller = null;
1783 if (rsServices != null) {
1784 controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1785 }
1786 if (controller == null) {
1787 controller = NoLimitCompactionThroughputController.INSTANCE;
1788 }
1789 compact(compaction, s, controller, null);
1790 }
1791 }
1792 }
1793
1794
1795
1796
1797
1798
1799
1800 public void compactStores() throws IOException {
1801 for (Store s : getStores()) {
1802 CompactionContext compaction = s.requestCompaction();
1803 if (compaction != null) {
1804 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1805 }
1806 }
1807 }
1808
1809
1810
1811
1812
1813
1814
1815 @VisibleForTesting
1816 void compactStore(byte[] family, CompactionThroughputController throughputController)
1817 throws IOException {
1818 Store s = getStore(family);
1819 CompactionContext compaction = s.requestCompaction();
1820 if (compaction != null) {
1821 compact(compaction, s, throughputController, null);
1822 }
1823 }
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840 public boolean compact(CompactionContext compaction, Store store,
1841 CompactionThroughputController throughputController) throws IOException {
1842 return compact(compaction, store, throughputController, null);
1843 }
1844
1845 public boolean compact(CompactionContext compaction, Store store,
1846 CompactionThroughputController throughputController, User user) throws IOException {
1847 assert compaction != null && compaction.hasSelection();
1848 assert !compaction.getRequest().getFiles().isEmpty();
1849 if (this.closing.get() || this.closed.get()) {
1850 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1851 store.cancelRequestedCompaction(compaction);
1852 return false;
1853 }
1854 MonitoredTask status = null;
1855 boolean requestNeedsCancellation = true;
1856
1857 lock.readLock().lock();
1858 try {
1859 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1860 if (stores.get(cf) != store) {
1861 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1862 + " has been re-instantiated, cancel this compaction request. "
1863 + " It may be caused by the roll back of split transaction");
1864 return false;
1865 }
1866
1867 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1868 status.enableStatusJournal(false);
1869 if (this.closed.get()) {
1870 String msg = "Skipping compaction on " + this + " because closed";
1871 LOG.debug(msg);
1872 status.abort(msg);
1873 return false;
1874 }
1875 boolean wasStateSet = false;
1876 try {
1877 synchronized (writestate) {
1878 if (writestate.writesEnabled) {
1879 wasStateSet = true;
1880 writestate.compacting.incrementAndGet();
1881 } else {
1882 String msg = "NOT compacting region " + this + ". Writes disabled.";
1883 LOG.info(msg);
1884 status.abort(msg);
1885 return false;
1886 }
1887 }
1888 LOG.info("Starting compaction on " + store + " in region " + this
1889 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1890 doRegionCompactionPrep();
1891 try {
1892 status.setStatus("Compacting store " + store);
1893
1894
1895 requestNeedsCancellation = false;
1896 store.compact(compaction, throughputController, user);
1897 } catch (InterruptedIOException iioe) {
1898 String msg = "compaction interrupted";
1899 LOG.info(msg, iioe);
1900 status.abort(msg);
1901 return false;
1902 }
1903 } finally {
1904 if (wasStateSet) {
1905 synchronized (writestate) {
1906 writestate.compacting.decrementAndGet();
1907 if (writestate.compacting.get() <= 0) {
1908 writestate.notifyAll();
1909 }
1910 }
1911 }
1912 }
1913 status.markComplete("Compaction complete");
1914 return true;
1915 } finally {
1916 try {
1917 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1918 if (status != null) {
1919 LOG.debug("Compaction status journal:\n\t" + status.prettyPrintJournal());
1920 status.cleanup();
1921 }
1922 } finally {
1923 lock.readLock().unlock();
1924 }
1925 }
1926 }
1927
1928 @Override
1929 public FlushResult flush(boolean force) throws IOException {
1930 return flushcache(force, false);
1931 }
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
1956 throws IOException {
1957
1958 if (this.closing.get()) {
1959 String msg = "Skipping flush on " + this + " because closing";
1960 LOG.debug(msg);
1961 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1962 }
1963 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1964 status.enableStatusJournal(false);
1965 status.setStatus("Acquiring readlock on region");
1966
1967 lock.readLock().lock();
1968 try {
1969 if (this.closed.get()) {
1970 String msg = "Skipping flush on " + this + " because closed";
1971 LOG.debug(msg);
1972 status.abort(msg);
1973 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1974 }
1975 if (coprocessorHost != null) {
1976 status.setStatus("Running coprocessor pre-flush hooks");
1977 coprocessorHost.preFlush();
1978 }
1979
1980
1981 if (numMutationsWithoutWAL.get() > 0) {
1982 numMutationsWithoutWAL.set(0);
1983 dataInMemoryWithoutWAL.set(0);
1984 }
1985 synchronized (writestate) {
1986 if (!writestate.flushing && writestate.writesEnabled) {
1987 this.writestate.flushing = true;
1988 } else {
1989 if (LOG.isDebugEnabled()) {
1990 LOG.debug("NOT flushing memstore for region " + this
1991 + ", flushing=" + writestate.flushing + ", writesEnabled="
1992 + writestate.writesEnabled);
1993 }
1994 String msg = "Not flushing since "
1995 + (writestate.flushing ? "already flushing"
1996 : "writes not enabled");
1997 status.abort(msg);
1998 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1999 }
2000 }
2001
2002 try {
2003 Collection<Store> specificStoresToFlush =
2004 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
2005 FlushResult fs = internalFlushcache(specificStoresToFlush,
2006 status, writeFlushRequestWalMarker);
2007
2008 if (coprocessorHost != null) {
2009 status.setStatus("Running post-flush coprocessor hooks");
2010 coprocessorHost.postFlush();
2011 }
2012
2013 status.markComplete("Flush successful");
2014 return fs;
2015 } finally {
2016 synchronized (writestate) {
2017 writestate.flushing = false;
2018 this.writestate.flushRequested = false;
2019 writestate.notifyAll();
2020 }
2021 }
2022 } finally {
2023 lock.readLock().unlock();
2024 LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal());
2025 status.cleanup();
2026 }
2027 }
2028
2029
2030
2031
2032
2033
2034
2035
2036 boolean shouldFlushStore(Store store) {
2037 long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
2038 store.getFamily().getName()) - 1;
2039 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
2040 if (LOG.isDebugEnabled()) {
2041 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
2042 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
2043 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
2044 }
2045 return true;
2046 }
2047 if (this.flushCheckInterval <= 0) {
2048 return false;
2049 }
2050 long now = EnvironmentEdgeManager.currentTime();
2051 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
2052 if (LOG.isDebugEnabled()) {
2053 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
2054 getRegionInfo().getEncodedName() + " because time of oldest edit=" +
2055 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
2056 }
2057 return true;
2058 }
2059 return false;
2060 }
2061
2062
2063
2064
2065 boolean shouldFlush(final StringBuffer whyFlush) {
2066 whyFlush.setLength(0);
2067
2068 if (this.maxFlushedSeqId > 0
2069 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
2070 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
2071 return true;
2072 }
2073 long modifiedFlushCheckInterval = flushCheckInterval;
2074 if (getRegionInfo().isSystemTable() &&
2075 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2076 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL;
2077 }
2078 if (modifiedFlushCheckInterval <= 0) {
2079 return false;
2080 }
2081 long now = EnvironmentEdgeManager.currentTime();
2082
2083 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2084 return false;
2085 }
2086
2087
2088 for (Store s : getStores()) {
2089 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2090
2091 whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
2092 return true;
2093 }
2094 }
2095 return false;
2096 }
2097
2098
2099
2100
2101
2102
2103 private FlushResult internalFlushcache(MonitoredTask status)
2104 throws IOException {
2105 return internalFlushcache(stores.values(), status, false);
2106 }
2107
2108
2109
2110
2111
2112
2113 private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2114 MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2115 return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2116 status, writeFlushWalMarker);
2117 }
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2148 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2149 throws IOException {
2150 PrepareFlushResult result
2151 = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2152 if (result.result == null) {
2153 return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2154 } else {
2155 return result.result;
2156 }
2157 }
2158
2159 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
2160 justification="FindBugs seems confused about trxId")
2161 protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
2162 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2163 throws IOException {
2164 if (this.rsServices != null && this.rsServices.isAborted()) {
2165
2166 throw new IOException("Aborting flush because server is aborted...");
2167 }
2168 final long startTime = EnvironmentEdgeManager.currentTime();
2169
2170 if (this.memstoreSize.get() <= 0) {
2171
2172
2173 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2174 this.updatesLock.writeLock().lock();
2175 try {
2176 if (this.memstoreSize.get() <= 0) {
2177
2178
2179
2180
2181
2182
2183 if (wal != null) {
2184 writeEntry = mvcc.begin();
2185 long flushOpSeqId = writeEntry.getWriteNumber();
2186 FlushResult flushResult = new FlushResultImpl(
2187 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2188 flushOpSeqId,
2189 "Nothing to flush",
2190 writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2191
2192
2193 mvcc.completeAndWait(writeEntry);
2194 writeEntry = null;
2195 return new PrepareFlushResult(flushResult, myseqid);
2196 } else {
2197 return new PrepareFlushResult(
2198 new FlushResultImpl(
2199 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2200 "Nothing to flush",
2201 false),
2202 myseqid);
2203 }
2204 }
2205 } finally {
2206 this.updatesLock.writeLock().unlock();
2207 if (writeEntry != null) {
2208 mvcc.complete(writeEntry);
2209 }
2210 }
2211 }
2212
2213 if (LOG.isInfoEnabled()) {
2214
2215 StringBuilder perCfExtras = null;
2216 if (!isAllFamilies(storesToFlush)) {
2217 perCfExtras = new StringBuilder();
2218 for (Store store: storesToFlush) {
2219 perCfExtras.append("; ").append(store.getColumnFamilyName());
2220 perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
2221 }
2222 }
2223 LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
2224 " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
2225 ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
2226 ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
2227 }
2228
2229
2230
2231
2232
2233
2234
2235 status.setStatus("Obtaining lock to block concurrent updates");
2236
2237 this.updatesLock.writeLock().lock();
2238 status.setStatus("Preparing to flush by snapshotting stores in " +
2239 getRegionInfo().getEncodedName());
2240 long totalFlushableSizeOfFlushableStores = 0;
2241
2242 Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2243 for (Store store: storesToFlush) {
2244 flushedFamilyNames.add(store.getFamily().getName());
2245 }
2246
2247 TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2248 = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2249 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2250 Bytes.BYTES_COMPARATOR);
2251 TreeMap<byte[], Long> storeFlushableSize
2252 = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2253
2254
2255
2256 long flushOpSeqId = HConstants.NO_SEQNUM;
2257
2258
2259 long flushedSeqId = HConstants.NO_SEQNUM;
2260 byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2261
2262 long trxId = 0;
2263 MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
2264
2265
2266
2267
2268
2269 mvcc.completeAndWait(writeEntry);
2270
2271
2272 writeEntry = null;
2273 try {
2274 try {
2275 if (wal != null) {
2276 Long earliestUnflushedSequenceIdForTheRegion =
2277 wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2278 if (earliestUnflushedSequenceIdForTheRegion == null) {
2279
2280 String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2281 status.setStatus(msg);
2282 return new PrepareFlushResult(
2283 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2284 myseqid);
2285 }
2286 flushOpSeqId = getNextSequenceId(wal);
2287
2288 flushedSeqId =
2289 earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2290 flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2291 } else {
2292
2293 flushedSeqId = flushOpSeqId = myseqid;
2294 }
2295
2296 for (Store s : storesToFlush) {
2297 totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2298 storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2299 committedFiles.put(s.getFamily().getName(), null);
2300 storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2301 }
2302
2303
2304 if (wal != null && !writestate.readOnly) {
2305 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2306 getRegionInfo(), flushOpSeqId, committedFiles);
2307
2308 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2309 desc, false, mvcc);
2310 }
2311
2312
2313 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2314 flush.prepare();
2315 }
2316 } catch (IOException ex) {
2317 if (wal != null) {
2318 if (trxId > 0) {
2319 try {
2320 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2321 getRegionInfo(), flushOpSeqId, committedFiles);
2322 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2323 desc, false, mvcc);
2324 } catch (Throwable t) {
2325 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2326 StringUtils.stringifyException(t));
2327
2328 }
2329 }
2330
2331 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2332 throw ex;
2333 }
2334 } finally {
2335 this.updatesLock.writeLock().unlock();
2336 }
2337 String s = "Finished memstore snapshotting " + this +
2338 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2339 status.setStatus(s);
2340 if (LOG.isTraceEnabled()) LOG.trace(s);
2341
2342
2343 if (wal != null) {
2344 try {
2345 wal.sync();
2346 } catch (IOException ioe) {
2347 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2348 throw ioe;
2349 }
2350 }
2351 } finally {
2352 if (writeEntry != null) {
2353
2354 mvcc.complete(writeEntry);
2355 }
2356 }
2357 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
2358 flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
2359 }
2360
2361
2362
2363
2364
2365 private boolean isAllFamilies(final Collection<Store> families) {
2366 return families == null || this.stores.size() == families.size();
2367 }
2368
2369
2370
2371
2372
2373
2374
2375 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2376 if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2377 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2378 getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
2379 try {
2380 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2381 desc, true, mvcc);
2382 return true;
2383 } catch (IOException e) {
2384 LOG.warn(getRegionInfo().getEncodedName() + " : "
2385 + "Received exception while trying to write the flush request to wal", e);
2386 }
2387 }
2388 return false;
2389 }
2390
2391 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
2392 justification="Intentional; notify is about completed flush")
2393 protected FlushResult internalFlushCacheAndCommit(
2394 final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2395 final Collection<Store> storesToFlush)
2396 throws IOException {
2397
2398
2399 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2400 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2401 long startTime = prepareResult.startTime;
2402 long flushOpSeqId = prepareResult.flushOpSeqId;
2403 long flushedSeqId = prepareResult.flushedSeqId;
2404 long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2405
2406 String s = "Flushing stores of " + this;
2407 status.setStatus(s);
2408 if (LOG.isTraceEnabled()) LOG.trace(s);
2409
2410
2411
2412
2413
2414 boolean compactionRequested = false;
2415 try {
2416
2417
2418
2419
2420
2421 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2422 flush.flushCache(status);
2423 }
2424
2425
2426
2427 Iterator<Store> it = storesToFlush.iterator();
2428
2429 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2430 boolean needsCompaction = flush.commit(status);
2431 if (needsCompaction) {
2432 compactionRequested = true;
2433 }
2434 byte[] storeName = it.next().getFamily().getName();
2435 List<Path> storeCommittedFiles = flush.getCommittedFiles();
2436 committedFiles.put(storeName, storeCommittedFiles);
2437
2438 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2439 totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2440 }
2441 }
2442 storeFlushCtxs.clear();
2443
2444
2445 this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2446
2447 if (wal != null) {
2448
2449 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2450 getRegionInfo(), flushOpSeqId, committedFiles);
2451 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2452 desc, true, mvcc);
2453 }
2454 } catch (Throwable t) {
2455
2456
2457
2458
2459
2460
2461 if (wal != null) {
2462 try {
2463 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2464 getRegionInfo(), flushOpSeqId, committedFiles);
2465 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2466 desc, false, mvcc);
2467 } catch (Throwable ex) {
2468 LOG.warn(getRegionInfo().getEncodedName() + " : "
2469 + "failed writing ABORT_FLUSH marker to WAL", ex);
2470
2471 }
2472 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2473 }
2474 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2475 Bytes.toStringBinary(getRegionInfo().getRegionName()));
2476 dse.initCause(t);
2477 status.abort("Flush failed: " + StringUtils.stringifyException(t));
2478
2479
2480
2481
2482
2483 this.closing.set(true);
2484
2485 if (rsServices != null) {
2486
2487 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2488 }
2489
2490 throw dse;
2491 }
2492
2493
2494 if (wal != null) {
2495 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2496 }
2497
2498
2499 for (Store store: storesToFlush) {
2500 this.lastStoreFlushTimeMap.put(store, startTime);
2501 }
2502
2503 this.maxFlushedSeqId = flushedSeqId;
2504 this.lastFlushOpSeqId = flushOpSeqId;
2505
2506
2507
2508 synchronized (this) {
2509 notifyAll();
2510 }
2511
2512 long time = EnvironmentEdgeManager.currentTime() - startTime;
2513 long memstoresize = this.memstoreSize.get();
2514 String msg = "Finished memstore flush of ~"
2515 + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2516 + totalFlushableSizeOfFlushableStores + ", currentsize="
2517 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2518 + " for region " + this + " in " + time + "ms, sequenceid="
2519 + flushOpSeqId + ", compaction requested=" + compactionRequested
2520 + ((wal == null) ? "; wal=null" : "");
2521 LOG.info(msg);
2522 status.setStatus(msg);
2523
2524 return new FlushResultImpl(compactionRequested ?
2525 FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2526 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
2527 flushOpSeqId);
2528 }
2529
2530
2531
2532
2533
2534
2535 @VisibleForTesting
2536 protected long getNextSequenceId(final WAL wal) throws IOException {
2537
2538
2539
2540
2541
2542
2543 WALKey key = this.appendEmptyEdit(wal);
2544 mvcc.complete(key.getWriteEntry());
2545 return key.getSequenceId(this.maxWaitForSeqId);
2546 }
2547
2548
2549
2550
2551
2552 @Override
2553 public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException {
2554 if (coprocessorHost != null) {
2555 Result result = new Result();
2556 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2557 return result;
2558 }
2559 }
2560
2561
2562 checkRow(row, "getClosestRowBefore");
2563 startRegionOperation(Operation.GET);
2564 this.readRequestsCount.increment();
2565 try {
2566 Result result = null;
2567 Get get = new Get(row);
2568 get.addFamily(family);
2569 get.setClosestRowBefore(true);
2570 result = get(get);
2571
2572 result = result.isEmpty() ? null : result;
2573 if (coprocessorHost != null) {
2574 coprocessorHost.postGetClosestRowBefore(row, family, result);
2575 }
2576 return result;
2577 } finally {
2578 closeRegionOperation(Operation.GET);
2579 }
2580 }
2581
2582 @Override
2583 public RegionScanner getScanner(Scan scan) throws IOException {
2584 return getScanner(scan, null);
2585 }
2586
2587 @Override
2588 public RegionScanner getScanner(Scan scan,
2589 List<KeyValueScanner> additionalScanners) throws IOException {
2590 startRegionOperation(Operation.SCAN);
2591 try {
2592
2593 if (!scan.hasFamilies()) {
2594
2595 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
2596 scan.addFamily(family);
2597 }
2598 } else {
2599 for (byte [] family : scan.getFamilyMap().keySet()) {
2600 checkFamily(family);
2601 }
2602 }
2603 return instantiateRegionScanner(scan, additionalScanners);
2604 } finally {
2605 closeRegionOperation(Operation.SCAN);
2606 }
2607 }
2608
2609 protected RegionScanner instantiateRegionScanner(Scan scan,
2610 List<KeyValueScanner> additionalScanners) throws IOException {
2611 if (scan.isReversed()) {
2612 if (scan.getFilter() != null) {
2613 scan.getFilter().setReversed(true);
2614 }
2615 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2616 }
2617 return new RegionScannerImpl(scan, additionalScanners, this);
2618 }
2619
2620 @Override
2621 public void prepareDelete(Delete delete) throws IOException {
2622
2623 if(delete.getFamilyCellMap().isEmpty()){
2624 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2625
2626 delete.addFamily(family, delete.getTimeStamp());
2627 }
2628 } else {
2629 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2630 if(family == null) {
2631 throw new NoSuchColumnFamilyException("Empty family is invalid");
2632 }
2633 checkFamily(family);
2634 }
2635 }
2636 }
2637
2638 @Override
2639 public void delete(Delete delete) throws IOException {
2640 checkReadOnly();
2641 checkResources();
2642 startRegionOperation(Operation.DELETE);
2643 try {
2644
2645 doBatchMutate(delete);
2646 } finally {
2647 closeRegionOperation(Operation.DELETE);
2648 }
2649 }
2650
2651
2652
2653
2654 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2655
2656
2657
2658
2659
2660
2661 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2662 Durability durability) throws IOException {
2663 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2664 delete.setFamilyCellMap(familyMap);
2665 delete.setDurability(durability);
2666 doBatchMutate(delete);
2667 }
2668
2669 @Override
2670 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2671 byte[] byteNow) throws IOException {
2672 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2673
2674 byte[] family = e.getKey();
2675 List<Cell> cells = e.getValue();
2676 assert cells instanceof RandomAccess;
2677
2678 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2679 int listSize = cells.size();
2680 for (int i=0; i < listSize; i++) {
2681 Cell cell = cells.get(i);
2682
2683
2684 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2685 byte[] qual = CellUtil.cloneQualifier(cell);
2686 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2687
2688 Integer count = kvCount.get(qual);
2689 if (count == null) {
2690 kvCount.put(qual, 1);
2691 } else {
2692 kvCount.put(qual, count + 1);
2693 }
2694 count = kvCount.get(qual);
2695
2696 Get get = new Get(CellUtil.cloneRow(cell));
2697 get.setMaxVersions(count);
2698 get.addColumn(family, qual);
2699 if (coprocessorHost != null) {
2700 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2701 byteNow, get)) {
2702 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2703 }
2704 } else {
2705 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2706 }
2707 } else {
2708 CellUtil.updateLatestStamp(cell, byteNow, 0);
2709 }
2710 }
2711 }
2712 }
2713
2714 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2715 throws IOException {
2716 List<Cell> result = get(get, false);
2717
2718 if (result.size() < count) {
2719
2720 CellUtil.updateLatestStamp(cell, byteNow, 0);
2721 return;
2722 }
2723 if (result.size() > count) {
2724 throw new RuntimeException("Unexpected size: " + result.size());
2725 }
2726 Cell getCell = result.get(count - 1);
2727 CellUtil.setTimestamp(cell, getCell.getTimestamp());
2728 }
2729
2730 @Override
2731 public void put(Put put) throws IOException {
2732 checkReadOnly();
2733
2734
2735
2736
2737
2738 checkResources();
2739 startRegionOperation(Operation.PUT);
2740 try {
2741
2742 doBatchMutate(put);
2743 } finally {
2744 closeRegionOperation(Operation.PUT);
2745 }
2746 }
2747
2748
2749
2750
2751
2752
2753 private abstract static class BatchOperationInProgress<T> {
2754 T[] operations;
2755 int nextIndexToProcess = 0;
2756 OperationStatus[] retCodeDetails;
2757 WALEdit[] walEditsFromCoprocessors;
2758
2759 public BatchOperationInProgress(T[] operations) {
2760 this.operations = operations;
2761 this.retCodeDetails = new OperationStatus[operations.length];
2762 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2763 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2764 }
2765
2766 public abstract Mutation getMutation(int index);
2767 public abstract long getNonceGroup(int index);
2768 public abstract long getNonce(int index);
2769
2770 public abstract Mutation[] getMutationsForCoprocs();
2771 public abstract boolean isInReplay();
2772 public abstract long getReplaySequenceId();
2773
2774 public boolean isDone() {
2775 return nextIndexToProcess == operations.length;
2776 }
2777 }
2778
2779 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2780 private long nonceGroup;
2781 private long nonce;
2782 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2783 super(operations);
2784 this.nonceGroup = nonceGroup;
2785 this.nonce = nonce;
2786 }
2787
2788 @Override
2789 public Mutation getMutation(int index) {
2790 return this.operations[index];
2791 }
2792
2793 @Override
2794 public long getNonceGroup(int index) {
2795 return nonceGroup;
2796 }
2797
2798 @Override
2799 public long getNonce(int index) {
2800 return nonce;
2801 }
2802
2803 @Override
2804 public Mutation[] getMutationsForCoprocs() {
2805 return this.operations;
2806 }
2807
2808 @Override
2809 public boolean isInReplay() {
2810 return false;
2811 }
2812
2813 @Override
2814 public long getReplaySequenceId() {
2815 return 0;
2816 }
2817 }
2818
2819 private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2820 private long replaySeqId = 0;
2821 public ReplayBatch(MutationReplay[] operations, long seqId) {
2822 super(operations);
2823 this.replaySeqId = seqId;
2824 }
2825
2826 @Override
2827 public Mutation getMutation(int index) {
2828 return this.operations[index].mutation;
2829 }
2830
2831 @Override
2832 public long getNonceGroup(int index) {
2833 return this.operations[index].nonceGroup;
2834 }
2835
2836 @Override
2837 public long getNonce(int index) {
2838 return this.operations[index].nonce;
2839 }
2840
2841 @Override
2842 public Mutation[] getMutationsForCoprocs() {
2843 assert false;
2844 throw new RuntimeException("Should not be called for replay batch");
2845 }
2846
2847 @Override
2848 public boolean isInReplay() {
2849 return true;
2850 }
2851
2852 @Override
2853 public long getReplaySequenceId() {
2854 return this.replaySeqId;
2855 }
2856 }
2857
2858 @Override
2859 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2860 throws IOException {
2861
2862
2863
2864
2865 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2866 }
2867
2868 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2869 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2870 }
2871
2872 @Override
2873 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2874 throws IOException {
2875 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2876 && replaySeqId < lastReplayedOpenRegionSeqId) {
2877
2878
2879 if (LOG.isTraceEnabled()) {
2880 LOG.trace(getRegionInfo().getEncodedName() + " : "
2881 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2882 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2883 for (MutationReplay mut : mutations) {
2884 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2885 }
2886 }
2887
2888 OperationStatus[] statuses = new OperationStatus[mutations.length];
2889 for (int i = 0; i < statuses.length; i++) {
2890 statuses[i] = OperationStatus.SUCCESS;
2891 }
2892 return statuses;
2893 }
2894 return batchMutate(new ReplayBatch(mutations, replaySeqId));
2895 }
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2906 boolean initialized = false;
2907 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2908 startRegionOperation(op);
2909 try {
2910 while (!batchOp.isDone()) {
2911 if (!batchOp.isInReplay()) {
2912 checkReadOnly();
2913 }
2914 checkResources();
2915
2916 if (!initialized) {
2917 this.writeRequestsCount.add(batchOp.operations.length);
2918 if (!batchOp.isInReplay()) {
2919 doPreMutationHook(batchOp);
2920 }
2921 initialized = true;
2922 }
2923 doMiniBatchMutation(batchOp);
2924 long newSize = this.getMemstoreSize();
2925 if (isFlushSize(newSize)) {
2926 requestFlush();
2927 }
2928 }
2929 } finally {
2930 closeRegionOperation(op);
2931 }
2932 return batchOp.retCodeDetails;
2933 }
2934
2935
2936 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2937 throws IOException {
2938
2939 WALEdit walEdit = new WALEdit();
2940 if (coprocessorHost != null) {
2941 for (int i = 0 ; i < batchOp.operations.length; i++) {
2942 Mutation m = batchOp.getMutation(i);
2943 if (m instanceof Put) {
2944 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2945
2946
2947 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2948 }
2949 } else if (m instanceof Delete) {
2950 Delete curDel = (Delete) m;
2951 if (curDel.getFamilyCellMap().isEmpty()) {
2952
2953 prepareDelete(curDel);
2954 }
2955 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2956
2957
2958 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2959 }
2960 } else {
2961
2962
2963
2964 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2965 "Put/Delete mutations only supported in batchMutate() now");
2966 }
2967 if (!walEdit.isEmpty()) {
2968 batchOp.walEditsFromCoprocessors[i] = walEdit;
2969 walEdit = new WALEdit();
2970 }
2971 }
2972 }
2973 }
2974
2975 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2976 boolean isInReplay = batchOp.isInReplay();
2977
2978 boolean putsCfSetConsistent = true;
2979
2980 Set<byte[]> putsCfSet = null;
2981
2982 boolean deletesCfSetConsistent = true;
2983
2984 Set<byte[]> deletesCfSet = null;
2985
2986 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2987 WALEdit walEdit = new WALEdit(isInReplay);
2988 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2989 long txid = 0;
2990 boolean doRollBackMemstore = false;
2991 boolean locked = false;
2992
2993
2994 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2995
2996 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2997
2998 int firstIndex = batchOp.nextIndexToProcess;
2999 int lastIndexExclusive = firstIndex;
3000 boolean success = false;
3001 int noOfPuts = 0, noOfDeletes = 0;
3002 WALKey walKey = null;
3003 long mvccNum = 0;
3004 long addedSize = 0;
3005 try {
3006
3007
3008
3009
3010 int numReadyToWrite = 0;
3011 long now = EnvironmentEdgeManager.currentTime();
3012 while (lastIndexExclusive < batchOp.operations.length) {
3013 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
3014 boolean isPutMutation = mutation instanceof Put;
3015
3016 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
3017
3018 familyMaps[lastIndexExclusive] = familyMap;
3019
3020
3021 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
3022 != OperationStatusCode.NOT_RUN) {
3023 lastIndexExclusive++;
3024 continue;
3025 }
3026
3027 try {
3028 if (isPutMutation) {
3029
3030 if (isInReplay) {
3031 removeNonExistentColumnFamilyForReplay(familyMap);
3032 } else {
3033 checkFamilies(familyMap.keySet());
3034 }
3035 checkTimestamps(mutation.getFamilyCellMap(), now);
3036 } else {
3037 prepareDelete((Delete) mutation);
3038 }
3039 checkRow(mutation.getRow(), "doMiniBatchMutation");
3040 } catch (NoSuchColumnFamilyException nscf) {
3041 LOG.warn("No such column family in batch mutation", nscf);
3042 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3043 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
3044 lastIndexExclusive++;
3045 continue;
3046 } catch (FailedSanityCheckException fsce) {
3047 LOG.warn("Batch Mutation did not pass sanity check", fsce);
3048 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3049 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
3050 lastIndexExclusive++;
3051 continue;
3052 } catch (WrongRegionException we) {
3053 LOG.warn("Batch mutation had a row that does not belong to this region", we);
3054 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3055 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
3056 lastIndexExclusive++;
3057 continue;
3058 }
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070 boolean shouldBlock = numReadyToWrite == 0;
3071 RowLock rowLock = null;
3072 try {
3073 rowLock = getRowLock(mutation.getRow(), true, shouldBlock);
3074 } catch (IOException ioe) {
3075 LOG.warn("Failed getting lock in batch put, row="
3076 + Bytes.toStringBinary(mutation.getRow()), ioe);
3077 }
3078 if (rowLock == null) {
3079
3080
3081 break;
3082
3083 } else {
3084 acquiredRowLocks.add(rowLock);
3085 }
3086
3087 lastIndexExclusive++;
3088 numReadyToWrite++;
3089
3090 if (isPutMutation) {
3091
3092
3093
3094 if (putsCfSet == null) {
3095 putsCfSet = mutation.getFamilyCellMap().keySet();
3096 } else {
3097 putsCfSetConsistent = putsCfSetConsistent
3098 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
3099 }
3100 } else {
3101 if (deletesCfSet == null) {
3102 deletesCfSet = mutation.getFamilyCellMap().keySet();
3103 } else {
3104 deletesCfSetConsistent = deletesCfSetConsistent
3105 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
3106 }
3107 }
3108 }
3109
3110
3111
3112 now = EnvironmentEdgeManager.currentTime();
3113 byte[] byteNow = Bytes.toBytes(now);
3114
3115
3116 if (numReadyToWrite <= 0) return 0L;
3117
3118
3119
3120
3121
3122
3123 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3124
3125 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3126 != OperationStatusCode.NOT_RUN) continue;
3127
3128 Mutation mutation = batchOp.getMutation(i);
3129 if (mutation instanceof Put) {
3130 updateCellTimestamps(familyMaps[i].values(), byteNow);
3131 noOfPuts++;
3132 } else {
3133 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3134 noOfDeletes++;
3135 }
3136 rewriteCellTags(familyMaps[i], mutation);
3137 }
3138
3139 lock(this.updatesLock.readLock(), numReadyToWrite);
3140 locked = true;
3141
3142
3143 if (!isInReplay && coprocessorHost != null) {
3144 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3145 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3146 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3147 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
3148 }
3149
3150
3151
3152
3153 Durability durability = Durability.USE_DEFAULT;
3154 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3155
3156 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
3157 continue;
3158 }
3159
3160 Mutation m = batchOp.getMutation(i);
3161 Durability tmpDur = getEffectiveDurability(m.getDurability());
3162 if (tmpDur.ordinal() > durability.ordinal()) {
3163 durability = tmpDur;
3164 }
3165 if (tmpDur == Durability.SKIP_WAL) {
3166 recordMutationWithoutWal(m.getFamilyCellMap());
3167 continue;
3168 }
3169
3170 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3171
3172
3173
3174 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3175 if (walEdit.size() > 0) {
3176 if (!isInReplay) {
3177 throw new IOException("Multiple nonces per batch and not in replay");
3178 }
3179
3180
3181 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3182 this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3183 currentNonceGroup, currentNonce, mvcc);
3184 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
3185 walEdit, true);
3186 walEdit = new WALEdit(isInReplay);
3187 walKey = null;
3188 }
3189 currentNonceGroup = nonceGroup;
3190 currentNonce = nonce;
3191 }
3192
3193
3194 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3195 if (fromCP != null) {
3196 for (Cell cell : fromCP.getCells()) {
3197 walEdit.add(cell);
3198 }
3199 }
3200 addFamilyMapToWALEdit(familyMaps[i], walEdit);
3201 }
3202
3203
3204
3205
3206 Mutation mutation = batchOp.getMutation(firstIndex);
3207 if (isInReplay) {
3208
3209 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3210 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3211 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3212 long replaySeqId = batchOp.getReplaySequenceId();
3213 walKey.setOrigLogSeqNum(replaySeqId);
3214 }
3215 if (walEdit.size() > 0) {
3216 if (!isInReplay) {
3217
3218 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3219 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3220 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3221 }
3222 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
3223 }
3224
3225
3226
3227 if (walKey == null) {
3228
3229 walKey = this.appendEmptyEdit(this.wal);
3230 }
3231 if (!isInReplay) {
3232 writeEntry = walKey.getWriteEntry();
3233 mvccNum = writeEntry.getWriteNumber();
3234 } else {
3235 mvccNum = batchOp.getReplaySequenceId();
3236 }
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3248 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3249 != OperationStatusCode.NOT_RUN) {
3250 continue;
3251 }
3252 doRollBackMemstore = true;
3253 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
3254 }
3255
3256
3257
3258
3259 if (locked) {
3260 this.updatesLock.readLock().unlock();
3261 locked = false;
3262 }
3263 releaseRowLocks(acquiredRowLocks);
3264
3265
3266
3267
3268 if (txid != 0) {
3269 syncOrDefer(txid, durability);
3270 }
3271
3272 doRollBackMemstore = false;
3273
3274 this.addAndGetGlobalMemstoreSize(addedSize);
3275
3276
3277 if (!isInReplay && coprocessorHost != null) {
3278 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3279 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3280 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3281 coprocessorHost.postBatchMutate(miniBatchOp);
3282 }
3283
3284
3285
3286
3287 if (writeEntry != null) {
3288 mvcc.completeAndWait(writeEntry);
3289 writeEntry = null;
3290 } else if (isInReplay) {
3291
3292 mvcc.advanceTo(mvccNum);
3293 }
3294
3295 for (int i = firstIndex; i < lastIndexExclusive; i ++) {
3296 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
3297 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3298 }
3299 }
3300
3301
3302
3303
3304
3305 if (!isInReplay && coprocessorHost != null) {
3306 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3307
3308 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3309 != OperationStatusCode.SUCCESS) {
3310 continue;
3311 }
3312 Mutation m = batchOp.getMutation(i);
3313 if (m instanceof Put) {
3314 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3315 } else {
3316 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3317 }
3318 }
3319 }
3320
3321 success = true;
3322 return addedSize;
3323 } finally {
3324
3325 if (doRollBackMemstore) {
3326 for (int j = 0; j < familyMaps.length; j++) {
3327 for(List<Cell> cells:familyMaps[j].values()) {
3328 rollbackMemstore(cells);
3329 }
3330 }
3331 if (writeEntry != null) mvcc.complete(writeEntry);
3332 } else {
3333 if (writeEntry != null) {
3334 mvcc.completeAndWait(writeEntry);
3335 }
3336 }
3337
3338 if (locked) {
3339 this.updatesLock.readLock().unlock();
3340 }
3341 releaseRowLocks(acquiredRowLocks);
3342
3343
3344
3345
3346
3347
3348
3349 if (noOfPuts > 0) {
3350
3351 if (this.metricsRegion != null) {
3352 this.metricsRegion.updatePut();
3353 }
3354 }
3355 if (noOfDeletes > 0) {
3356
3357 if (this.metricsRegion != null) {
3358 this.metricsRegion.updateDelete();
3359 }
3360 }
3361 if (!success) {
3362 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3363 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3364 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3365 }
3366 }
3367 }
3368 if (coprocessorHost != null && !batchOp.isInReplay()) {
3369
3370
3371 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3372 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3373 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3374 lastIndexExclusive);
3375 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3376 }
3377
3378 batchOp.nextIndexToProcess = lastIndexExclusive;
3379 }
3380 }
3381
3382
3383
3384
3385
3386 protected Durability getEffectiveDurability(Durability d) {
3387 return d == Durability.USE_DEFAULT ? this.durability : d;
3388 }
3389
3390
3391
3392
3393
3394
3395 @Override
3396 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3397 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3398 boolean writeToWAL)
3399 throws IOException{
3400 checkReadOnly();
3401
3402
3403 checkResources();
3404 boolean isPut = w instanceof Put;
3405 if (!isPut && !(w instanceof Delete))
3406 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3407 "be Put or Delete");
3408 if (!Bytes.equals(row, w.getRow())) {
3409 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3410 "getRow must match the passed row");
3411 }
3412
3413 startRegionOperation();
3414 try {
3415 Get get = new Get(row);
3416 checkFamily(family);
3417 get.addColumn(family, qualifier);
3418
3419
3420 RowLock rowLock = getRowLock(get.getRow());
3421
3422 mvcc.await();
3423 try {
3424 if (this.getCoprocessorHost() != null) {
3425 Boolean processed = null;
3426 if (w instanceof Put) {
3427 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3428 qualifier, compareOp, comparator, (Put) w);
3429 } else if (w instanceof Delete) {
3430 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3431 qualifier, compareOp, comparator, (Delete) w);
3432 }
3433 if (processed != null) {
3434 return processed;
3435 }
3436 }
3437 List<Cell> result = get(get, false);
3438
3439 boolean valueIsNull = comparator.getValue() == null ||
3440 comparator.getValue().length == 0;
3441 boolean matches = false;
3442 long cellTs = 0;
3443 if (result.size() == 0 && valueIsNull) {
3444 matches = true;
3445 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3446 valueIsNull) {
3447 matches = true;
3448 cellTs = result.get(0).getTimestamp();
3449 } else if (result.size() == 1 && !valueIsNull) {
3450 Cell kv = result.get(0);
3451 cellTs = kv.getTimestamp();
3452 int compareResult = comparator.compareTo(kv.getValueArray(),
3453 kv.getValueOffset(), kv.getValueLength());
3454 switch (compareOp) {
3455 case LESS:
3456 matches = compareResult < 0;
3457 break;
3458 case LESS_OR_EQUAL:
3459 matches = compareResult <= 0;
3460 break;
3461 case EQUAL:
3462 matches = compareResult == 0;
3463 break;
3464 case NOT_EQUAL:
3465 matches = compareResult != 0;
3466 break;
3467 case GREATER_OR_EQUAL:
3468 matches = compareResult >= 0;
3469 break;
3470 case GREATER:
3471 matches = compareResult > 0;
3472 break;
3473 default:
3474 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3475 }
3476 }
3477
3478 if (matches) {
3479
3480
3481
3482
3483 long now = EnvironmentEdgeManager.currentTime();
3484 long ts = Math.max(now, cellTs);
3485 byte[] byteTs = Bytes.toBytes(ts);
3486
3487 if (w instanceof Put) {
3488 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3489 }
3490
3491
3492
3493
3494
3495 doBatchMutate(w);
3496 this.checkAndMutateChecksPassed.increment();
3497 return true;
3498 }
3499 this.checkAndMutateChecksFailed.increment();
3500 return false;
3501 } finally {
3502 rowLock.release();
3503 }
3504 } finally {
3505 closeRegionOperation();
3506 }
3507 }
3508
3509
3510
3511
3512
3513
3514 @Override
3515 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3516 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3517 boolean writeToWAL) throws IOException {
3518 checkReadOnly();
3519
3520
3521 checkResources();
3522
3523 startRegionOperation();
3524 try {
3525 Get get = new Get(row);
3526 checkFamily(family);
3527 get.addColumn(family, qualifier);
3528
3529
3530 RowLock rowLock = getRowLock(get.getRow());
3531
3532 mvcc.await();
3533 try {
3534 List<Cell> result = get(get, false);
3535
3536 boolean valueIsNull = comparator.getValue() == null ||
3537 comparator.getValue().length == 0;
3538 boolean matches = false;
3539 long cellTs = 0;
3540 if (result.size() == 0 && valueIsNull) {
3541 matches = true;
3542 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3543 valueIsNull) {
3544 matches = true;
3545 cellTs = result.get(0).getTimestamp();
3546 } else if (result.size() == 1 && !valueIsNull) {
3547 Cell kv = result.get(0);
3548 cellTs = kv.getTimestamp();
3549 int compareResult = comparator.compareTo(kv.getValueArray(),
3550 kv.getValueOffset(), kv.getValueLength());
3551 switch (compareOp) {
3552 case LESS:
3553 matches = compareResult < 0;
3554 break;
3555 case LESS_OR_EQUAL:
3556 matches = compareResult <= 0;
3557 break;
3558 case EQUAL:
3559 matches = compareResult == 0;
3560 break;
3561 case NOT_EQUAL:
3562 matches = compareResult != 0;
3563 break;
3564 case GREATER_OR_EQUAL:
3565 matches = compareResult >= 0;
3566 break;
3567 case GREATER:
3568 matches = compareResult > 0;
3569 break;
3570 default:
3571 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3572 }
3573 }
3574
3575 if (matches) {
3576
3577
3578
3579
3580 long now = EnvironmentEdgeManager.currentTime();
3581 long ts = Math.max(now, cellTs);
3582 byte[] byteTs = Bytes.toBytes(ts);
3583
3584 for (Mutation w : rm.getMutations()) {
3585 if (w instanceof Put) {
3586 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3587 }
3588
3589
3590 }
3591
3592
3593
3594 mutateRow(rm);
3595 this.checkAndMutateChecksPassed.increment();
3596 return true;
3597 }
3598 this.checkAndMutateChecksFailed.increment();
3599 return false;
3600 } finally {
3601 rowLock.release();
3602 }
3603 } finally {
3604 closeRegionOperation();
3605 }
3606 }
3607
3608 private void doBatchMutate(Mutation mutation) throws IOException {
3609
3610 OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
3611 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3612 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3613 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3614 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3615 }
3616 }
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631 public void addRegionToSnapshot(SnapshotDescription desc,
3632 ForeignExceptionSnare exnSnare) throws IOException {
3633 Path rootDir = FSUtils.getRootDir(conf);
3634 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3635
3636 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3637 snapshotDir, desc, exnSnare);
3638 manifest.addRegion(this);
3639 }
3640
3641 @Override
3642 public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3643 throws IOException {
3644 for (List<Cell> cells: cellItr) {
3645 if (cells == null) continue;
3646 assert cells instanceof RandomAccess;
3647 int listSize = cells.size();
3648 for (int i = 0; i < listSize; i++) {
3649 CellUtil.updateLatestStamp(cells.get(i), now, 0);
3650 }
3651 }
3652 }
3653
3654
3655
3656
3657 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3658
3659
3660
3661 if (m.getTTL() == Long.MAX_VALUE) {
3662 return;
3663 }
3664
3665
3666
3667 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3668 List<Cell> cells = e.getValue();
3669 assert cells instanceof RandomAccess;
3670 int listSize = cells.size();
3671 for (int i = 0; i < listSize; i++) {
3672 Cell cell = cells.get(i);
3673 List<Tag> newTags = Tag.carryForwardTags(null, cell);
3674 newTags = carryForwardTTLTag(newTags, m);
3675
3676
3677 cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3678 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3679 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3680 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3681 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3682 newTags));
3683 }
3684 }
3685 }
3686
3687
3688
3689
3690
3691
3692
3693 private void checkResources() throws RegionTooBusyException {
3694
3695 if (this.getRegionInfo().isMetaRegion()) return;
3696
3697 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3698 blockedRequestsCount.increment();
3699 requestFlush();
3700 throw new RegionTooBusyException("Above memstore limit, " +
3701 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3702 this.getRegionInfo().getRegionNameAsString()) +
3703 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3704 this.getRegionServerServices().getServerName()) +
3705 ", memstoreSize=" + memstoreSize.get() +
3706 ", blockingMemStoreSize=" + blockingMemStoreSize);
3707 }
3708 }
3709
3710
3711
3712
3713 protected void checkReadOnly() throws IOException {
3714 if (isReadOnly()) {
3715 throw new DoNotRetryIOException("region is read only");
3716 }
3717 }
3718
3719 protected void checkReadsEnabled() throws IOException {
3720 if (!this.writestate.readsEnabled) {
3721 throw new IOException(getRegionInfo().getEncodedName()
3722 + ": The region's reads are disabled. Cannot serve the request");
3723 }
3724 }
3725
3726 public void setReadsEnabled(boolean readsEnabled) {
3727 if (readsEnabled && !this.writestate.readsEnabled) {
3728 LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3729 }
3730 this.writestate.setReadsEnabled(readsEnabled);
3731 }
3732
3733
3734
3735
3736
3737
3738
3739 private void put(final byte [] row, byte [] family, List<Cell> edits)
3740 throws IOException {
3741 NavigableMap<byte[], List<Cell>> familyMap;
3742 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3743
3744 familyMap.put(family, edits);
3745 Put p = new Put(row);
3746 p.setFamilyCellMap(familyMap);
3747 doBatchMutate(p);
3748 }
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3766 long mvccNum, boolean isInReplay) throws IOException {
3767 long size = 0;
3768
3769 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3770 byte[] family = e.getKey();
3771 List<Cell> cells = e.getValue();
3772 assert cells instanceof RandomAccess;
3773 Store store = getStore(family);
3774 int listSize = cells.size();
3775 for (int i=0; i < listSize; i++) {
3776 Cell cell = cells.get(i);
3777 if (cell.getSequenceId() == 0 || isInReplay) {
3778 CellUtil.setSequenceId(cell, mvccNum);
3779 }
3780 size += store.add(cell);
3781 }
3782 }
3783
3784 return size;
3785 }
3786
3787
3788
3789
3790
3791
3792 private void rollbackMemstore(List<Cell> memstoreCells) {
3793 int kvsRolledback = 0;
3794
3795 for (Cell cell : memstoreCells) {
3796 byte[] family = CellUtil.cloneFamily(cell);
3797 Store store = getStore(family);
3798 store.rollback(cell);
3799 kvsRolledback++;
3800 }
3801 LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3802 }
3803
3804 @Override
3805 public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3806 for (byte[] family : families) {
3807 checkFamily(family);
3808 }
3809 }
3810
3811
3812
3813
3814
3815 private void removeNonExistentColumnFamilyForReplay(
3816 final Map<byte[], List<Cell>> familyMap) {
3817 List<byte[]> nonExistentList = null;
3818 for (byte[] family : familyMap.keySet()) {
3819 if (!this.htableDescriptor.hasFamily(family)) {
3820 if (nonExistentList == null) {
3821 nonExistentList = new ArrayList<byte[]>();
3822 }
3823 nonExistentList.add(family);
3824 }
3825 }
3826 if (nonExistentList != null) {
3827 for (byte[] family : nonExistentList) {
3828
3829 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3830 familyMap.remove(family);
3831 }
3832 }
3833 }
3834
3835 @Override
3836 public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
3837 throws FailedSanityCheckException {
3838 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3839 return;
3840 }
3841 long maxTs = now + timestampSlop;
3842 for (List<Cell> kvs : familyMap.values()) {
3843 assert kvs instanceof RandomAccess;
3844 int listSize = kvs.size();
3845 for (int i=0; i < listSize; i++) {
3846 Cell cell = kvs.get(i);
3847
3848 long ts = cell.getTimestamp();
3849 if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3850 throw new FailedSanityCheckException("Timestamp for KV out of range "
3851 + cell + " (too.new=" + timestampSlop + ")");
3852 }
3853 }
3854 }
3855 }
3856
3857
3858
3859
3860
3861
3862
3863 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3864 WALEdit walEdit) {
3865 for (List<Cell> edits : familyMap.values()) {
3866 assert edits instanceof RandomAccess;
3867 int listSize = edits.size();
3868 for (int i=0; i < listSize; i++) {
3869 Cell cell = edits.get(i);
3870 walEdit.add(cell);
3871 }
3872 }
3873 }
3874
3875 private void requestFlush() {
3876 if (this.rsServices == null) {
3877 return;
3878 }
3879 synchronized (writestate) {
3880 if (this.writestate.isFlushRequested()) {
3881 return;
3882 }
3883 writestate.flushRequested = true;
3884 }
3885
3886 this.rsServices.getFlushRequester().requestFlush(this, false);
3887 if (LOG.isDebugEnabled()) {
3888 LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
3889 }
3890 }
3891
3892
3893
3894
3895
3896 private boolean isFlushSize(final long size) {
3897 return size > this.memstoreFlushSize;
3898 }
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933 protected long replayRecoveredEditsIfAny(final Path regiondir,
3934 Map<byte[], Long> maxSeqIdInStores,
3935 final CancelableProgressable reporter, final MonitoredTask status)
3936 throws IOException {
3937 long minSeqIdForTheRegion = -1;
3938 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3939 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3940 minSeqIdForTheRegion = maxSeqIdInStore;
3941 }
3942 }
3943 long seqid = minSeqIdForTheRegion;
3944
3945 FileSystem fs = this.fs.getFileSystem();
3946 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3947 if (LOG.isDebugEnabled()) {
3948 LOG.debug("Found " + (files == null ? 0 : files.size())
3949 + " recovered edits file(s) under " + regiondir);
3950 }
3951
3952 if (files == null || files.isEmpty()) return seqid;
3953
3954 for (Path edits: files) {
3955 if (edits == null || !fs.exists(edits)) {
3956 LOG.warn("Null or non-existent edits file: " + edits);
3957 continue;
3958 }
3959 if (isZeroLengthThenDelete(fs, edits)) continue;
3960
3961 long maxSeqId;
3962 String fileName = edits.getName();
3963 maxSeqId = Math.abs(Long.parseLong(fileName));
3964 if (maxSeqId <= minSeqIdForTheRegion) {
3965 if (LOG.isDebugEnabled()) {
3966 String msg = "Maximum sequenceid for this wal is " + maxSeqId
3967 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3968 + ", skipped the whole file, path=" + edits;
3969 LOG.debug(msg);
3970 }
3971 continue;
3972 }
3973
3974 try {
3975
3976
3977 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3978 } catch (IOException e) {
3979 boolean skipErrors = conf.getBoolean(
3980 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3981 conf.getBoolean(
3982 "hbase.skip.errors",
3983 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3984 if (conf.get("hbase.skip.errors") != null) {
3985 LOG.warn(
3986 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3987 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3988 }
3989 if (skipErrors) {
3990 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3991 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3992 + "=true so continuing. Renamed " + edits +
3993 " as " + p, e);
3994 } else {
3995 throw e;
3996 }
3997 }
3998 }
3999
4000
4001 if (this.rsAccounting != null) {
4002 this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
4003 }
4004 if (seqid > minSeqIdForTheRegion) {
4005
4006 internalFlushcache(null, seqid, stores.values(), status, false);
4007 }
4008
4009 if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
4010
4011
4012
4013 String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
4014 Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
4015 for (Path file: files) {
4016 fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
4017 null, null));
4018 }
4019 getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
4020 } else {
4021 for (Path file: files) {
4022 if (!fs.delete(file, false)) {
4023 LOG.error("Failed delete of " + file);
4024 } else {
4025 LOG.debug("Deleted recovered.edits file=" + file);
4026 }
4027 }
4028 }
4029 return seqid;
4030 }
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041 private long replayRecoveredEdits(final Path edits,
4042 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
4043 throws IOException {
4044 String msg = "Replaying edits from " + edits;
4045 LOG.info(msg);
4046 MonitoredTask status = TaskMonitor.get().createStatus(msg);
4047 FileSystem fs = this.fs.getFileSystem();
4048
4049 status.setStatus("Opening recovered edits");
4050 WAL.Reader reader = null;
4051 try {
4052 reader = WALFactory.createReader(fs, edits, conf);
4053 long currentEditSeqId = -1;
4054 long currentReplaySeqId = -1;
4055 long firstSeqIdInLog = -1;
4056 long skippedEdits = 0;
4057 long editsCount = 0;
4058 long intervalEdits = 0;
4059 WAL.Entry entry;
4060 Store store = null;
4061 boolean reported_once = false;
4062 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
4063
4064 try {
4065
4066 int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
4067
4068 int period = this.conf.getInt("hbase.hstore.report.period", 300000);
4069 long lastReport = EnvironmentEdgeManager.currentTime();
4070
4071 while ((entry = reader.next()) != null) {
4072 WALKey key = entry.getKey();
4073 WALEdit val = entry.getEdit();
4074
4075 if (ng != null) {
4076 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
4077 }
4078
4079 if (reporter != null) {
4080 intervalEdits += val.size();
4081 if (intervalEdits >= interval) {
4082
4083 intervalEdits = 0;
4084 long cur = EnvironmentEdgeManager.currentTime();
4085 if (lastReport + period <= cur) {
4086 status.setStatus("Replaying edits..." +
4087 " skipped=" + skippedEdits +
4088 " edits=" + editsCount);
4089
4090 if(!reporter.progress()) {
4091 msg = "Progressable reporter failed, stopping replay";
4092 LOG.warn(msg);
4093 status.abort(msg);
4094 throw new IOException(msg);
4095 }
4096 reported_once = true;
4097 lastReport = cur;
4098 }
4099 }
4100 }
4101
4102 if (firstSeqIdInLog == -1) {
4103 firstSeqIdInLog = key.getLogSeqNum();
4104 }
4105 if (currentEditSeqId > key.getLogSeqNum()) {
4106
4107
4108 LOG.error(getRegionInfo().getEncodedName() + " : "
4109 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4110 + "; edit=" + val);
4111 } else {
4112 currentEditSeqId = key.getLogSeqNum();
4113 }
4114 currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4115 key.getOrigLogSeqNum() : currentEditSeqId;
4116
4117
4118
4119 if (coprocessorHost != null) {
4120 status.setStatus("Running pre-WAL-restore hook in coprocessors");
4121 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4122
4123 continue;
4124 }
4125 }
4126 boolean checkRowWithinBoundary = false;
4127
4128 if (!Bytes.equals(key.getEncodedRegionName(),
4129 this.getRegionInfo().getEncodedNameAsBytes())) {
4130 checkRowWithinBoundary = true;
4131 }
4132
4133 boolean flush = false;
4134 for (Cell cell: val.getCells()) {
4135
4136
4137 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4138
4139 if (!checkRowWithinBoundary) {
4140
4141 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4142 if (compaction != null) {
4143
4144 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4145 }
4146 }
4147 skippedEdits++;
4148 continue;
4149 }
4150
4151 if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4152 store = getStore(cell);
4153 }
4154 if (store == null) {
4155
4156
4157 LOG.warn("No family for " + cell);
4158 skippedEdits++;
4159 continue;
4160 }
4161 if (checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(),
4162 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) {
4163 LOG.warn("Row of " + cell + " is not within region boundary");
4164 skippedEdits++;
4165 continue;
4166 }
4167
4168 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4169 .getName())) {
4170 skippedEdits++;
4171 continue;
4172 }
4173 CellUtil.setSequenceId(cell, currentReplaySeqId);
4174
4175
4176
4177
4178 flush |= restoreEdit(store, cell);
4179 editsCount++;
4180 }
4181 if (flush) {
4182 internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4183 }
4184
4185 if (coprocessorHost != null) {
4186 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4187 }
4188 }
4189 } catch (EOFException eof) {
4190 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4191 msg = "Encountered EOF. Most likely due to Master failure during " +
4192 "wal splitting, so we have this data in another edit. " +
4193 "Continuing, but renaming " + edits + " as " + p;
4194 LOG.warn(msg, eof);
4195 status.abort(msg);
4196 } catch (IOException ioe) {
4197
4198
4199 if (ioe.getCause() instanceof ParseException) {
4200 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4201 msg = "File corruption encountered! " +
4202 "Continuing, but renaming " + edits + " as " + p;
4203 LOG.warn(msg, ioe);
4204 status.setStatus(msg);
4205 } else {
4206 status.abort(StringUtils.stringifyException(ioe));
4207
4208
4209 throw ioe;
4210 }
4211 }
4212 if (reporter != null && !reported_once) {
4213 reporter.progress();
4214 }
4215 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4216 ", firstSequenceIdInLog=" + firstSeqIdInLog +
4217 ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4218 status.markComplete(msg);
4219 LOG.debug(msg);
4220 return currentEditSeqId;
4221 } finally {
4222 status.cleanup();
4223 if (reader != null) {
4224 reader.close();
4225 }
4226 }
4227 }
4228
4229
4230
4231
4232
4233
4234 void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4235 boolean removeFiles, long replaySeqId)
4236 throws IOException {
4237 try {
4238 checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4239 "Compaction marker from WAL ", compaction);
4240 } catch (WrongRegionException wre) {
4241 if (RegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4242
4243 return;
4244 }
4245 throw wre;
4246 }
4247
4248 synchronized (writestate) {
4249 if (replaySeqId < lastReplayedOpenRegionSeqId) {
4250 LOG.warn(getRegionInfo().getEncodedName() + " : "
4251 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4252 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4253 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4254 return;
4255 }
4256 if (replaySeqId < lastReplayedCompactionSeqId) {
4257 LOG.warn(getRegionInfo().getEncodedName() + " : "
4258 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4259 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4260 + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4261 return;
4262 } else {
4263 lastReplayedCompactionSeqId = replaySeqId;
4264 }
4265
4266 if (LOG.isDebugEnabled()) {
4267 LOG.debug(getRegionInfo().getEncodedName() + " : "
4268 + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4269 + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4270 + lastReplayedOpenRegionSeqId);
4271 }
4272
4273 startRegionOperation(Operation.REPLAY_EVENT);
4274 try {
4275 Store store = this.getStore(compaction.getFamilyName().toByteArray());
4276 if (store == null) {
4277 LOG.warn(getRegionInfo().getEncodedName() + " : "
4278 + "Found Compaction WAL edit for deleted family:"
4279 + Bytes.toString(compaction.getFamilyName().toByteArray()));
4280 return;
4281 }
4282 store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4283 logRegionFiles();
4284 } catch (FileNotFoundException ex) {
4285 LOG.warn(getRegionInfo().getEncodedName() + " : "
4286 + "At least one of the store files in compaction: "
4287 + TextFormat.shortDebugString(compaction)
4288 + " doesn't exist any more. Skip loading the file(s)", ex);
4289 } finally {
4290 closeRegionOperation(Operation.REPLAY_EVENT);
4291 }
4292 }
4293 }
4294
4295 void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4296 checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4297 "Flush marker from WAL ", flush);
4298
4299 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4300 return;
4301 }
4302
4303 if (LOG.isDebugEnabled()) {
4304 LOG.debug(getRegionInfo().getEncodedName() + " : "
4305 + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4306 }
4307
4308 startRegionOperation(Operation.REPLAY_EVENT);
4309 try {
4310 FlushAction action = flush.getAction();
4311 switch (action) {
4312 case START_FLUSH:
4313 replayWALFlushStartMarker(flush);
4314 break;
4315 case COMMIT_FLUSH:
4316 replayWALFlushCommitMarker(flush);
4317 break;
4318 case ABORT_FLUSH:
4319 replayWALFlushAbortMarker(flush);
4320 break;
4321 case CANNOT_FLUSH:
4322 replayWALFlushCannotFlushMarker(flush, replaySeqId);
4323 break;
4324 default:
4325 LOG.warn(getRegionInfo().getEncodedName() + " : " +
4326 "Received a flush event with unknown action, ignoring. " +
4327 TextFormat.shortDebugString(flush));
4328 break;
4329 }
4330
4331 logRegionFiles();
4332 } finally {
4333 closeRegionOperation(Operation.REPLAY_EVENT);
4334 }
4335 }
4336
4337
4338
4339
4340
4341 @VisibleForTesting
4342 PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4343 long flushSeqId = flush.getFlushSequenceNumber();
4344
4345 HashSet<Store> storesToFlush = new HashSet<Store>();
4346 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4347 byte[] family = storeFlush.getFamilyName().toByteArray();
4348 Store store = getStore(family);
4349 if (store == null) {
4350 LOG.warn(getRegionInfo().getEncodedName() + " : "
4351 + "Received a flush start marker from primary, but the family is not found. Ignoring"
4352 + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4353 continue;
4354 }
4355 storesToFlush.add(store);
4356 }
4357
4358 MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4359
4360
4361
4362 synchronized (writestate) {
4363 try {
4364 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4365 LOG.warn(getRegionInfo().getEncodedName() + " : "
4366 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4367 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4368 + " of " + lastReplayedOpenRegionSeqId);
4369 return null;
4370 }
4371 if (numMutationsWithoutWAL.get() > 0) {
4372 numMutationsWithoutWAL.set(0);
4373 dataInMemoryWithoutWAL.set(0);
4374 }
4375
4376 if (!writestate.flushing) {
4377
4378
4379
4380
4381 PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4382 flushSeqId, storesToFlush, status, false);
4383 if (prepareResult.result == null) {
4384
4385 this.writestate.flushing = true;
4386 this.prepareFlushResult = prepareResult;
4387 status.markComplete("Flush prepare successful");
4388 if (LOG.isDebugEnabled()) {
4389 LOG.debug(getRegionInfo().getEncodedName() + " : "
4390 + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4391 }
4392 } else {
4393
4394
4395 if (prepareResult.getResult().getResult() ==
4396 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4397 this.writestate.flushing = true;
4398 this.prepareFlushResult = prepareResult;
4399 if (LOG.isDebugEnabled()) {
4400 LOG.debug(getRegionInfo().getEncodedName() + " : "
4401 + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4402 }
4403 }
4404 status.abort("Flush prepare failed with " + prepareResult.result);
4405
4406 }
4407 return prepareResult;
4408 } else {
4409
4410 if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4411
4412 LOG.warn(getRegionInfo().getEncodedName() + " : "
4413 + "Received a flush prepare marker with the same seqId: " +
4414 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4415 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4416
4417 } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4418
4419
4420 LOG.warn(getRegionInfo().getEncodedName() + " : "
4421 + "Received a flush prepare marker with a smaller seqId: " +
4422 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4423 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4424
4425 } else {
4426
4427 LOG.warn(getRegionInfo().getEncodedName() + " : "
4428 + "Received a flush prepare marker with a larger seqId: " +
4429 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4430 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4431
4432
4433
4434
4435
4436
4437
4438
4439
4440
4441
4442 }
4443 }
4444 } finally {
4445 status.cleanup();
4446 writestate.notifyAll();
4447 }
4448 }
4449 return null;
4450 }
4451
4452 @VisibleForTesting
4453 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4454 justification="Intentional; post memstore flush")
4455 void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4456 MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4457
4458
4459
4460
4461
4462 synchronized (writestate) {
4463 try {
4464 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4465 LOG.warn(getRegionInfo().getEncodedName() + " : "
4466 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4467 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4468 + " of " + lastReplayedOpenRegionSeqId);
4469 return;
4470 }
4471
4472 if (writestate.flushing) {
4473 PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4474 if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4475 if (LOG.isDebugEnabled()) {
4476 LOG.debug(getRegionInfo().getEncodedName() + " : "
4477 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4478 + " and a previous prepared snapshot was found");
4479 }
4480
4481
4482 replayFlushInStores(flush, prepareFlushResult, true);
4483
4484
4485 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4486
4487 this.prepareFlushResult = null;
4488 writestate.flushing = false;
4489 } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4490
4491
4492
4493
4494 LOG.warn(getRegionInfo().getEncodedName() + " : "
4495 + "Received a flush commit marker with smaller seqId: "
4496 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4497 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4498 +" prepared memstore snapshot");
4499 replayFlushInStores(flush, prepareFlushResult, false);
4500
4501
4502
4503 } else {
4504
4505
4506
4507
4508
4509 LOG.warn(getRegionInfo().getEncodedName() + " : "
4510 + "Received a flush commit marker with larger seqId: "
4511 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4512 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4513 +" memstore snapshot");
4514
4515 replayFlushInStores(flush, prepareFlushResult, true);
4516
4517
4518 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4519
4520
4521
4522 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4523
4524 this.prepareFlushResult = null;
4525 writestate.flushing = false;
4526 }
4527
4528
4529
4530
4531
4532 this.setReadsEnabled(true);
4533 } else {
4534 LOG.warn(getRegionInfo().getEncodedName() + " : "
4535 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4536 + ", but no previous prepared snapshot was found");
4537
4538
4539 replayFlushInStores(flush, null, false);
4540
4541
4542
4543 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4544 }
4545
4546 status.markComplete("Flush commit successful");
4547
4548
4549 this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4550
4551
4552 mvcc.advanceTo(flush.getFlushSequenceNumber());
4553
4554 } catch (FileNotFoundException ex) {
4555 LOG.warn(getRegionInfo().getEncodedName() + " : "
4556 + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4557 + " doesn't exist any more. Skip loading the file(s)", ex);
4558 }
4559 finally {
4560 status.cleanup();
4561 writestate.notifyAll();
4562 }
4563 }
4564
4565
4566
4567 synchronized (this) {
4568 notifyAll();
4569 }
4570 }
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580 private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4581 boolean dropMemstoreSnapshot)
4582 throws IOException {
4583 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4584 byte[] family = storeFlush.getFamilyName().toByteArray();
4585 Store store = getStore(family);
4586 if (store == null) {
4587 LOG.warn(getRegionInfo().getEncodedName() + " : "
4588 + "Received a flush commit marker from primary, but the family is not found."
4589 + "Ignoring StoreFlushDescriptor:" + storeFlush);
4590 continue;
4591 }
4592 List<String> flushFiles = storeFlush.getFlushOutputList();
4593 StoreFlushContext ctx = null;
4594 long startTime = EnvironmentEdgeManager.currentTime();
4595 if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4596 ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4597 } else {
4598 ctx = prepareFlushResult.storeFlushCtxs.get(family);
4599 startTime = prepareFlushResult.startTime;
4600 }
4601
4602 if (ctx == null) {
4603 LOG.warn(getRegionInfo().getEncodedName() + " : "
4604 + "Unexpected: flush commit marker received from store "
4605 + Bytes.toString(family) + " but no associated flush context. Ignoring");
4606 continue;
4607 }
4608
4609 ctx.replayFlush(flushFiles, dropMemstoreSnapshot);
4610
4611
4612 this.lastStoreFlushTimeMap.put(store, startTime);
4613 }
4614 }
4615
4616
4617
4618
4619
4620
4621 private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4622 long totalFreedSize = 0;
4623 this.updatesLock.writeLock().lock();
4624 try {
4625
4626 long currentSeqId = mvcc.getReadPoint();
4627 if (seqId >= currentSeqId) {
4628
4629 LOG.info(getRegionInfo().getEncodedName() + " : "
4630 + "Dropping memstore contents as well since replayed flush seqId: "
4631 + seqId + " is greater than current seqId:" + currentSeqId);
4632
4633
4634 if (store == null) {
4635 for (Store s : stores.values()) {
4636 totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4637 }
4638 } else {
4639 totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4640 }
4641 } else {
4642 LOG.info(getRegionInfo().getEncodedName() + " : "
4643 + "Not dropping memstore contents since replayed flush seqId: "
4644 + seqId + " is smaller than current seqId:" + currentSeqId);
4645 }
4646 } finally {
4647 this.updatesLock.writeLock().unlock();
4648 }
4649 return totalFreedSize;
4650 }
4651
4652 private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4653 long snapshotSize = s.getFlushableSize();
4654 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4655 StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4656 ctx.prepare();
4657 ctx.abort();
4658 return snapshotSize;
4659 }
4660
4661 private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4662
4663
4664
4665 }
4666
4667 private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4668 synchronized (writestate) {
4669 if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4670 LOG.warn(getRegionInfo().getEncodedName() + " : "
4671 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4672 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4673 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4674 return;
4675 }
4676
4677
4678
4679
4680
4681
4682 this.setReadsEnabled(true);
4683 }
4684 }
4685
4686 @VisibleForTesting
4687 PrepareFlushResult getPrepareFlushResult() {
4688 return prepareFlushResult;
4689 }
4690
4691 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4692 justification="Intentional; cleared the memstore")
4693 void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4694 checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4695 "RegionEvent marker from WAL ", regionEvent);
4696
4697 startRegionOperation(Operation.REPLAY_EVENT);
4698 try {
4699 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4700 return;
4701 }
4702
4703 if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4704
4705 return;
4706 }
4707 if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4708 LOG.warn(getRegionInfo().getEncodedName() + " : "
4709 + "Unknown region event received, ignoring :"
4710 + TextFormat.shortDebugString(regionEvent));
4711 return;
4712 }
4713
4714 if (LOG.isDebugEnabled()) {
4715 LOG.debug(getRegionInfo().getEncodedName() + " : "
4716 + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4717 }
4718
4719
4720 synchronized (writestate) {
4721
4722
4723
4724
4725
4726
4727 if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4728 this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4729 } else {
4730 LOG.warn(getRegionInfo().getEncodedName() + " : "
4731 + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4732 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4733 + " of " + lastReplayedOpenRegionSeqId);
4734 return;
4735 }
4736
4737
4738
4739 for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4740
4741 byte[] family = storeDescriptor.getFamilyName().toByteArray();
4742 Store store = getStore(family);
4743 if (store == null) {
4744 LOG.warn(getRegionInfo().getEncodedName() + " : "
4745 + "Received a region open marker from primary, but the family is not found. "
4746 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4747 continue;
4748 }
4749
4750 long storeSeqId = store.getMaxSequenceId();
4751 List<String> storeFiles = storeDescriptor.getStoreFileList();
4752 try {
4753 store.refreshStoreFiles(storeFiles);
4754 } catch (FileNotFoundException ex) {
4755 LOG.warn(getRegionInfo().getEncodedName() + " : "
4756 + "At least one of the store files: " + storeFiles
4757 + " doesn't exist any more. Skip loading the file(s)", ex);
4758 continue;
4759 }
4760 if (store.getMaxSequenceId() != storeSeqId) {
4761
4762 lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4763 }
4764
4765 if (writestate.flushing) {
4766
4767 if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4768 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4769 null : this.prepareFlushResult.storeFlushCtxs.get(family);
4770 if (ctx != null) {
4771 long snapshotSize = store.getFlushableSize();
4772 ctx.abort();
4773 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4774 this.prepareFlushResult.storeFlushCtxs.remove(family);
4775 }
4776 }
4777 }
4778
4779
4780 dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4781 if (storeSeqId > this.maxFlushedSeqId) {
4782 this.maxFlushedSeqId = storeSeqId;
4783 }
4784 }
4785
4786
4787
4788 dropPrepareFlushIfPossible();
4789
4790
4791 mvcc.await();
4792
4793
4794
4795 this.setReadsEnabled(true);
4796
4797
4798
4799 synchronized (this) {
4800 notifyAll();
4801 }
4802 }
4803 logRegionFiles();
4804 } finally {
4805 closeRegionOperation(Operation.REPLAY_EVENT);
4806 }
4807 }
4808
4809 void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4810 checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4811 "BulkLoad marker from WAL ", bulkLoadEvent);
4812
4813 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4814 return;
4815 }
4816
4817 if (LOG.isDebugEnabled()) {
4818 LOG.debug(getRegionInfo().getEncodedName() + " : "
4819 + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4820 }
4821
4822 boolean multipleFamilies = false;
4823 byte[] family = null;
4824 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4825 byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4826 if (family == null) {
4827 family = fam;
4828 } else if (!Bytes.equals(family, fam)) {
4829 multipleFamilies = true;
4830 break;
4831 }
4832 }
4833
4834 startBulkRegionOperation(multipleFamilies);
4835 try {
4836
4837 synchronized (writestate) {
4838
4839
4840
4841
4842
4843
4844 if (bulkLoadEvent.getBulkloadSeqNum() >= 0
4845 && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
4846 LOG.warn(getRegionInfo().getEncodedName() + " : "
4847 + "Skipping replaying bulkload event :"
4848 + TextFormat.shortDebugString(bulkLoadEvent)
4849 + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
4850 + " =" + lastReplayedOpenRegionSeqId);
4851
4852 return;
4853 }
4854
4855 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4856
4857 family = storeDescriptor.getFamilyName().toByteArray();
4858 Store store = getStore(family);
4859 if (store == null) {
4860 LOG.warn(getRegionInfo().getEncodedName() + " : "
4861 + "Received a bulk load marker from primary, but the family is not found. "
4862 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4863 continue;
4864 }
4865
4866 List<String> storeFiles = storeDescriptor.getStoreFileList();
4867 for (String storeFile : storeFiles) {
4868 StoreFileInfo storeFileInfo = null;
4869 try {
4870 storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
4871 store.bulkLoadHFile(storeFileInfo);
4872 } catch(FileNotFoundException ex) {
4873 LOG.warn(getRegionInfo().getEncodedName() + " : "
4874 + ((storeFileInfo != null) ? storeFileInfo.toString() :
4875 (new Path(Bytes.toString(family), storeFile)).toString())
4876 + " doesn't exist any more. Skip loading the file");
4877 }
4878 }
4879 }
4880 }
4881 if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
4882 mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
4883 }
4884 } finally {
4885 closeBulkRegionOperation();
4886 }
4887 }
4888
4889
4890
4891
4892 private void dropPrepareFlushIfPossible() {
4893 if (writestate.flushing) {
4894 boolean canDrop = true;
4895 if (prepareFlushResult.storeFlushCtxs != null) {
4896 for (Entry<byte[], StoreFlushContext> entry
4897 : prepareFlushResult.storeFlushCtxs.entrySet()) {
4898 Store store = getStore(entry.getKey());
4899 if (store == null) {
4900 continue;
4901 }
4902 if (store.getSnapshotSize() > 0) {
4903 canDrop = false;
4904 break;
4905 }
4906 }
4907 }
4908
4909
4910
4911 if (canDrop) {
4912 writestate.flushing = false;
4913 this.prepareFlushResult = null;
4914 }
4915 }
4916 }
4917
4918 @Override
4919 public boolean refreshStoreFiles() throws IOException {
4920 return refreshStoreFiles(false);
4921 }
4922
4923 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4924 justification="Notify is about post replay. Intentional")
4925 protected boolean refreshStoreFiles(boolean force) throws IOException {
4926 if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4927 return false;
4928 }
4929
4930 if (LOG.isDebugEnabled()) {
4931 LOG.debug(getRegionInfo().getEncodedName() + " : "
4932 + "Refreshing store files to see whether we can free up memstore");
4933 }
4934
4935 long totalFreedSize = 0;
4936
4937 long smallestSeqIdInStores = Long.MAX_VALUE;
4938
4939 startRegionOperation();
4940 try {
4941 Map<Store, Long> map = new HashMap<Store, Long>();
4942 synchronized (writestate) {
4943 for (Store store : getStores()) {
4944
4945
4946 long maxSeqIdBefore = store.getMaxSequenceId();
4947
4948
4949 store.refreshStoreFiles();
4950
4951 long storeSeqId = store.getMaxSequenceId();
4952 if (storeSeqId < smallestSeqIdInStores) {
4953 smallestSeqIdInStores = storeSeqId;
4954 }
4955
4956
4957 if (storeSeqId > maxSeqIdBefore) {
4958
4959 if (writestate.flushing) {
4960
4961 if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
4962 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4963 null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
4964 if (ctx != null) {
4965 long snapshotSize = store.getFlushableSize();
4966 ctx.abort();
4967 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4968 this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
4969 totalFreedSize += snapshotSize;
4970 }
4971 }
4972 }
4973
4974 map.put(store, storeSeqId);
4975 }
4976 }
4977
4978
4979
4980 dropPrepareFlushIfPossible();
4981
4982
4983
4984 for (Store s : getStores()) {
4985 mvcc.advanceTo(s.getMaxMemstoreTS());
4986 }
4987
4988
4989
4990
4991
4992
4993 if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
4994 this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
4995 }
4996 }
4997 if (!map.isEmpty()) {
4998 if (!force) {
4999 for (Map.Entry<Store, Long> entry : map.entrySet()) {
5000
5001 totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
5002 }
5003 } else {
5004 synchronized (storeSeqIds) {
5005
5006 storeSeqIds.add(map);
5007 }
5008 }
5009 }
5010
5011
5012 synchronized (this) {
5013 notifyAll();
5014 }
5015 return totalFreedSize > 0;
5016 } finally {
5017 closeRegionOperation();
5018 }
5019 }
5020
5021 private void logRegionFiles() {
5022 if (LOG.isTraceEnabled()) {
5023 LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
5024 for (Store s : stores.values()) {
5025 Collection<StoreFile> storeFiles = s.getStorefiles();
5026 if (storeFiles == null) continue;
5027 for (StoreFile sf : storeFiles) {
5028 LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
5029 }
5030 }
5031 }
5032 }
5033
5034
5035
5036
5037 private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
5038 throws WrongRegionException {
5039 if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
5040 return;
5041 }
5042
5043 if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
5044 Bytes.equals(encodedRegionName,
5045 this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
5046 return;
5047 }
5048
5049 throw new WrongRegionException(exceptionMsg + payload
5050 + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
5051 + " does not match this region: " + this.getRegionInfo());
5052 }
5053
5054
5055
5056
5057
5058
5059
5060 protected boolean restoreEdit(final Store s, final Cell cell) {
5061 long kvSize = s.add(cell);
5062 if (this.rsAccounting != null) {
5063 rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
5064 }
5065 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
5066 }
5067
5068
5069
5070
5071
5072
5073
5074 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
5075 throws IOException {
5076 FileStatus stat = fs.getFileStatus(p);
5077 if (stat.getLen() > 0) return false;
5078 LOG.warn("File " + p + " is zero-length, deleting.");
5079 fs.delete(p, false);
5080 return true;
5081 }
5082
5083 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
5084 return new HStore(this, family, this.conf);
5085 }
5086
5087 @Override
5088 public Store getStore(final byte[] column) {
5089 return this.stores.get(column);
5090 }
5091
5092
5093
5094
5095
5096 private Store getStore(Cell cell) {
5097 for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
5098 if (Bytes.equals(
5099 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
5100 famStore.getKey(), 0, famStore.getKey().length)) {
5101 return famStore.getValue();
5102 }
5103 }
5104
5105 return null;
5106 }
5107
5108 @Override
5109 public List<Store> getStores() {
5110 List<Store> list = new ArrayList<Store>(stores.size());
5111 list.addAll(stores.values());
5112 return list;
5113 }
5114
5115 @Override
5116 public List<String> getStoreFileList(final byte [][] columns)
5117 throws IllegalArgumentException {
5118 List<String> storeFileNames = new ArrayList<String>();
5119 synchronized(closeLock) {
5120 for(byte[] column : columns) {
5121 Store store = this.stores.get(column);
5122 if (store == null) {
5123 throw new IllegalArgumentException("No column family : " +
5124 new String(column) + " available");
5125 }
5126 Collection<StoreFile> storeFiles = store.getStorefiles();
5127 if (storeFiles == null) continue;
5128 for (StoreFile storeFile: storeFiles) {
5129 storeFileNames.add(storeFile.getPath().toString());
5130 }
5131
5132 logRegionFiles();
5133 }
5134 }
5135 return storeFileNames;
5136 }
5137
5138
5139
5140
5141
5142
5143 void checkRow(final byte [] row, String op) throws IOException {
5144 if (!rowIsInRange(getRegionInfo(), row)) {
5145 throw new WrongRegionException("Requested row out of range for " +
5146 op + " on HRegion " + this + ", startKey='" +
5147 Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5148 Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5149 Bytes.toStringBinary(row) + "'");
5150 }
5151 }
5152
5153
5154
5155
5156
5157
5158
5159
5160 public RowLock getRowLock(byte[] row) throws IOException {
5161 return getRowLock(row, false);
5162 }
5163
5164
5165
5166
5167
5168
5169
5170
5171
5172
5173
5174
5175
5176 public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
5177 return getRowLock(row, readLock, true);
5178 }
5179
5180
5181
5182
5183
5184
5185
5186
5187
5188
5189
5190
5191
5192
5193 public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException {
5194
5195 checkRow(row, "row lock");
5196
5197 HashedBytes rowKey = new HashedBytes(row);
5198
5199 RowLockContext rowLockContext = null;
5200 RowLockImpl result = null;
5201 TraceScope traceScope = null;
5202
5203
5204 if (Trace.isTracing()) {
5205 traceScope = Trace.startSpan("HRegion.getRowLock");
5206 traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
5207 }
5208
5209 boolean success = false;
5210 try {
5211
5212
5213 while (result == null) {
5214
5215
5216
5217 rowLockContext = new RowLockContext(rowKey);
5218 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5219
5220
5221 if (existingContext != null) {
5222 rowLockContext = existingContext;
5223 }
5224
5225
5226
5227
5228 if (readLock) {
5229 result = rowLockContext.newReadLock();
5230 } else {
5231 result = rowLockContext.newWriteLock();
5232 }
5233 }
5234 boolean lockAvailable = false;
5235 if(waitForLock) {
5236
5237 lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS);
5238 } else {
5239
5240
5241 lockAvailable = result.getLock().tryLock();
5242 }
5243 if(!lockAvailable) {
5244 if (traceScope != null) {
5245 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5246 }
5247 result = null;
5248 if(waitForLock) {
5249 throw new IOException("Timed out waiting for lock for row: " + rowKey);
5250 } else {
5251 return null;
5252 }
5253 }
5254 rowLockContext.setThreadName(Thread.currentThread().getName());
5255 success = true;
5256 return result;
5257 } catch (InterruptedException ie) {
5258 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5259 InterruptedIOException iie = new InterruptedIOException();
5260 iie.initCause(ie);
5261 if (traceScope != null) {
5262 traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
5263 }
5264 Thread.currentThread().interrupt();
5265 throw iie;
5266 } finally {
5267
5268 if (!success && rowLockContext != null) {
5269 rowLockContext.cleanUp();
5270 }
5271 if (traceScope != null) {
5272 traceScope.close();
5273 }
5274 }
5275 }
5276
5277 @Override
5278 public void releaseRowLocks(List<RowLock> rowLocks) {
5279 if (rowLocks != null) {
5280 for (RowLock rowLock : rowLocks) {
5281 rowLock.release();
5282 }
5283 rowLocks.clear();
5284 }
5285 }
5286
5287 public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
5288 return lockedRows;
5289 }
5290
5291 @VisibleForTesting
5292 class RowLockContext {
5293 private final HashedBytes row;
5294 final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
5295 final AtomicBoolean usable = new AtomicBoolean(true);
5296 final AtomicInteger count = new AtomicInteger(0);
5297 final Object lock = new Object();
5298 private String threadName;
5299
5300 RowLockContext(HashedBytes row) {
5301 this.row = row;
5302 }
5303
5304 RowLockImpl newWriteLock() {
5305 Lock l = readWriteLock.writeLock();
5306 return getRowLock(l);
5307 }
5308 RowLockImpl newReadLock() {
5309 Lock l = readWriteLock.readLock();
5310 return getRowLock(l);
5311 }
5312
5313 private RowLockImpl getRowLock(Lock l) {
5314 count.incrementAndGet();
5315 synchronized (lock) {
5316 if (usable.get()) {
5317 return new RowLockImpl(this, l);
5318 } else {
5319 return null;
5320 }
5321 }
5322 }
5323
5324 void cleanUp() {
5325 long c = count.decrementAndGet();
5326 if (c <= 0) {
5327 synchronized (lock) {
5328 if (count.get() <= 0 && usable.get()){
5329 usable.set(false);
5330 RowLockContext removed = lockedRows.remove(row);
5331 assert removed == this: "we should never remove a different context";
5332 }
5333 }
5334 }
5335 }
5336
5337 public void setThreadName(String threadName) {
5338 this.threadName = threadName;
5339 }
5340
5341 @Override
5342 public String toString() {
5343 return "RowLockContext{" +
5344 "row=" + row +
5345 ", readWriteLock=" + readWriteLock +
5346 ", count=" + count +
5347 ", threadName=" + threadName +
5348 '}';
5349 }
5350 }
5351
5352
5353
5354
5355 public static class RowLockImpl implements RowLock {
5356 private final RowLockContext context;
5357 private final Lock lock;
5358
5359 public RowLockImpl(RowLockContext context, Lock lock) {
5360 this.context = context;
5361 this.lock = lock;
5362 }
5363
5364 public Lock getLock() {
5365 return lock;
5366 }
5367
5368 @VisibleForTesting
5369 public RowLockContext getContext() {
5370 return context;
5371 }
5372
5373 @Override
5374 public void release() {
5375 lock.unlock();
5376 context.cleanUp();
5377 }
5378
5379 @Override
5380 public String toString() {
5381 return "RowLockImpl{" +
5382 "context=" + context +
5383 ", lock=" + lock +
5384 '}';
5385 }
5386 }
5387
5388
5389
5390
5391
5392
5393
5394 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5395 boolean multipleFamilies = false;
5396 byte[] family = null;
5397 for (Pair<byte[], String> pair : familyPaths) {
5398 byte[] fam = pair.getFirst();
5399 if (family == null) {
5400 family = fam;
5401 } else if (!Bytes.equals(family, fam)) {
5402 multipleFamilies = true;
5403 break;
5404 }
5405 }
5406 return multipleFamilies;
5407 }
5408
5409 @Override
5410 public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5411 BulkLoadListener bulkLoadListener) throws IOException {
5412 long seqId = -1;
5413 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5414 Preconditions.checkNotNull(familyPaths);
5415
5416 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5417 try {
5418 this.writeRequestsCount.increment();
5419
5420
5421
5422
5423 List<IOException> ioes = new ArrayList<IOException>();
5424 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5425 for (Pair<byte[], String> p : familyPaths) {
5426 byte[] familyName = p.getFirst();
5427 String path = p.getSecond();
5428
5429 Store store = getStore(familyName);
5430 if (store == null) {
5431 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5432 "No such column family " + Bytes.toStringBinary(familyName));
5433 ioes.add(ioe);
5434 } else {
5435 try {
5436 store.assertBulkLoadHFileOk(new Path(path));
5437 } catch (WrongRegionException wre) {
5438
5439 failures.add(p);
5440 } catch (IOException ioe) {
5441
5442 ioes.add(ioe);
5443 }
5444 }
5445 }
5446
5447
5448 if (ioes.size() != 0) {
5449 IOException e = MultipleIOException.createIOException(ioes);
5450 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5451 throw e;
5452 }
5453
5454
5455 if (failures.size() != 0) {
5456 StringBuilder list = new StringBuilder();
5457 for (Pair<byte[], String> p : failures) {
5458 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5459 .append(p.getSecond());
5460 }
5461
5462 LOG.warn("There was a recoverable bulk load failure likely due to a" +
5463 " split. These (family, HFile) pairs were not loaded: " + list);
5464 return false;
5465 }
5466
5467
5468
5469
5470
5471
5472 if (assignSeqId) {
5473 FlushResult fs = flushcache(true, false);
5474 if (fs.isFlushSucceeded()) {
5475 seqId = ((FlushResultImpl)fs).flushSequenceId;
5476 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5477 seqId = ((FlushResultImpl)fs).flushSequenceId;
5478 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH) {
5479
5480
5481 waitForFlushes();
5482 } else {
5483 throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5484 "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5485 }
5486 }
5487
5488 for (Pair<byte[], String> p : familyPaths) {
5489 byte[] familyName = p.getFirst();
5490 String path = p.getSecond();
5491 Store store = getStore(familyName);
5492 try {
5493 String finalPath = path;
5494 if (bulkLoadListener != null) {
5495 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
5496 }
5497 Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
5498
5499 if(storeFiles.containsKey(familyName)) {
5500 storeFiles.get(familyName).add(commitedStoreFile);
5501 } else {
5502 List<Path> storeFileNames = new ArrayList<Path>();
5503 storeFileNames.add(commitedStoreFile);
5504 storeFiles.put(familyName, storeFileNames);
5505 }
5506 if (bulkLoadListener != null) {
5507 bulkLoadListener.doneBulkLoad(familyName, path);
5508 }
5509 } catch (IOException ioe) {
5510
5511
5512
5513
5514 LOG.error("There was a partial failure due to IO when attempting to" +
5515 " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5516 if (bulkLoadListener != null) {
5517 try {
5518 bulkLoadListener.failedBulkLoad(familyName, path);
5519 } catch (Exception ex) {
5520 LOG.error("Error while calling failedBulkLoad for family " +
5521 Bytes.toString(familyName) + " with path " + path, ex);
5522 }
5523 }
5524 throw ioe;
5525 }
5526 }
5527
5528 return true;
5529 } finally {
5530 if (wal != null && !storeFiles.isEmpty()) {
5531
5532 try {
5533 WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5534 this.getRegionInfo().getTable(),
5535 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
5536 WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5537 loadDescriptor, mvcc);
5538 } catch (IOException ioe) {
5539 if (this.rsServices != null) {
5540
5541
5542 this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5543 }
5544 }
5545 }
5546
5547 closeBulkRegionOperation();
5548 }
5549 }
5550
5551 @Override
5552 public boolean equals(Object o) {
5553 return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5554 ((HRegion) o).getRegionInfo().getRegionName());
5555 }
5556
5557 @Override
5558 public int hashCode() {
5559 return Bytes.hashCode(getRegionInfo().getRegionName());
5560 }
5561
5562 @Override
5563 public String toString() {
5564 return getRegionInfo().getRegionNameAsString();
5565 }
5566
5567
5568
5569
5570 class RegionScannerImpl implements RegionScanner {
5571
5572 KeyValueHeap storeHeap = null;
5573
5574
5575 KeyValueHeap joinedHeap = null;
5576
5577
5578
5579 protected Cell joinedContinuationRow = null;
5580 private boolean filterClosed = false;
5581
5582 protected final int isScan;
5583 protected final byte[] stopRow;
5584 protected final HRegion region;
5585
5586 private final long readPt;
5587 private final long maxResultSize;
5588 private final ScannerContext defaultScannerContext;
5589 private final FilterWrapper filter;
5590
5591 @Override
5592 public HRegionInfo getRegionInfo() {
5593 return region.getRegionInfo();
5594 }
5595
5596 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
5597 throws IOException {
5598 this.region = region;
5599 this.maxResultSize = scan.getMaxResultSize();
5600 if (scan.hasFilter()) {
5601 this.filter = new FilterWrapper(scan.getFilter());
5602 } else {
5603 this.filter = null;
5604 }
5605
5606
5607
5608
5609
5610
5611 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
5612
5613 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5614 this.stopRow = null;
5615 } else {
5616 this.stopRow = scan.getStopRow();
5617 }
5618
5619
5620 this.isScan = scan.isGetScan() ? -1 : 0;
5621
5622
5623
5624 IsolationLevel isolationLevel = scan.getIsolationLevel();
5625 synchronized(scannerReadPoints) {
5626 this.readPt = getReadpoint(isolationLevel);
5627 scannerReadPoints.put(this, this.readPt);
5628 }
5629
5630
5631
5632 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
5633 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
5634
5635 List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
5636
5637 if (additionalScanners != null && !additionalScanners.isEmpty()) {
5638 scanners.addAll(additionalScanners);
5639 instantiatedScanners.addAll(additionalScanners);
5640 }
5641
5642 try {
5643 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
5644 Store store = stores.get(entry.getKey());
5645 KeyValueScanner scanner;
5646 try {
5647 scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5648 } catch (FileNotFoundException e) {
5649 throw handleFileNotFound(e);
5650 }
5651 instantiatedScanners.add(scanner);
5652 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5653 || this.filter.isFamilyEssential(entry.getKey())) {
5654 scanners.add(scanner);
5655 } else {
5656 joinedScanners.add(scanner);
5657 }
5658 }
5659 initializeKVHeap(scanners, joinedScanners, region);
5660 } catch (Throwable t) {
5661 throw handleException(instantiatedScanners, t);
5662 }
5663 }
5664
5665 protected void initializeKVHeap(List<KeyValueScanner> scanners,
5666 List<KeyValueScanner> joinedScanners, HRegion region)
5667 throws IOException {
5668 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
5669 if (!joinedScanners.isEmpty()) {
5670 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
5671 }
5672 }
5673
5674 private IOException handleException(List<KeyValueScanner> instantiatedScanners,
5675 Throwable t) {
5676
5677 scannerReadPoints.remove(this);
5678 if (storeHeap != null) {
5679 storeHeap.close();
5680 storeHeap = null;
5681 if (joinedHeap != null) {
5682 joinedHeap.close();
5683 joinedHeap = null;
5684 }
5685 } else {
5686
5687 for (KeyValueScanner scanner : instantiatedScanners) {
5688 scanner.close();
5689 }
5690 }
5691 return t instanceof IOException ? (IOException) t : new IOException(t);
5692 }
5693
5694 @Override
5695 public long getMaxResultSize() {
5696 return maxResultSize;
5697 }
5698
5699 @Override
5700 public long getMvccReadPoint() {
5701 return this.readPt;
5702 }
5703
5704 @Override
5705 public int getBatch() {
5706 return this.defaultScannerContext.getBatchLimit();
5707 }
5708
5709
5710
5711
5712
5713
5714 protected void resetFilters() throws IOException {
5715 if (filter != null) {
5716 filter.reset();
5717 }
5718 }
5719
5720 @Override
5721 public boolean next(List<Cell> outResults)
5722 throws IOException {
5723
5724 return next(outResults, defaultScannerContext);
5725 }
5726
5727 @Override
5728 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
5729 throws IOException {
5730 if (this.filterClosed) {
5731 throw new UnknownScannerException("Scanner was closed (timed out?) " +
5732 "after we renewed it. Could be caused by a very slow scanner " +
5733 "or a lengthy garbage collection");
5734 }
5735 startRegionOperation(Operation.SCAN);
5736 readRequestsCount.increment();
5737 try {
5738 return nextRaw(outResults, scannerContext);
5739 } finally {
5740 closeRegionOperation(Operation.SCAN);
5741 }
5742 }
5743
5744 @Override
5745 public boolean nextRaw(List<Cell> outResults) throws IOException {
5746
5747 return nextRaw(outResults, defaultScannerContext);
5748 }
5749
5750 @Override
5751 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5752 throws IOException {
5753 if (storeHeap == null) {
5754
5755 throw new UnknownScannerException("Scanner was closed");
5756 }
5757 boolean moreValues;
5758 if (outResults.isEmpty()) {
5759
5760
5761 moreValues = nextInternal(outResults, scannerContext);
5762 } else {
5763 List<Cell> tmpList = new ArrayList<Cell>();
5764 moreValues = nextInternal(tmpList, scannerContext);
5765 outResults.addAll(tmpList);
5766 }
5767
5768
5769
5770
5771 if (!scannerContext.midRowResultFormed()) resetFilters();
5772
5773 if (isFilterDoneInternal()) {
5774 moreValues = false;
5775 }
5776 return moreValues;
5777 }
5778
5779
5780
5781
5782 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5783 throws IOException {
5784 assert joinedContinuationRow != null;
5785 boolean moreValues =
5786 populateResult(results, this.joinedHeap, scannerContext,
5787 joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
5788 joinedContinuationRow.getRowLength());
5789
5790 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5791
5792 joinedContinuationRow = null;
5793 }
5794
5795
5796 Collections.sort(results, comparator);
5797 return moreValues;
5798 }
5799
5800
5801
5802
5803
5804
5805
5806
5807
5808
5809
5810 private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5811 ScannerContext scannerContext, byte[] currentRow, int offset, short length)
5812 throws IOException {
5813 Cell nextKv;
5814 boolean moreCellsInRow = false;
5815 boolean tmpKeepProgress = scannerContext.getKeepProgress();
5816
5817 LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5818 try {
5819 do {
5820
5821
5822
5823 scannerContext.setKeepProgress(true);
5824 heap.next(results, scannerContext);
5825 scannerContext.setKeepProgress(tmpKeepProgress);
5826
5827 nextKv = heap.peek();
5828 moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
5829 if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
5830
5831 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
5832 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5833 } else if (scannerContext.checkSizeLimit(limitScope)) {
5834 ScannerContext.NextState state =
5835 moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
5836 return scannerContext.setScannerState(state).hasMoreValues();
5837 } else if (scannerContext.checkTimeLimit(limitScope)) {
5838 ScannerContext.NextState state =
5839 moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
5840 return scannerContext.setScannerState(state).hasMoreValues();
5841 }
5842 } while (moreCellsInRow);
5843 } catch (FileNotFoundException e) {
5844 throw handleFileNotFound(e);
5845 }
5846 return nextKv != null;
5847 }
5848
5849
5850
5851
5852
5853
5854
5855
5856
5857
5858
5859 private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
5860 short length) {
5861 return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
5862 }
5863
5864
5865
5866
5867 @Override
5868 public synchronized boolean isFilterDone() throws IOException {
5869 return isFilterDoneInternal();
5870 }
5871
5872 private boolean isFilterDoneInternal() throws IOException {
5873 return this.filter != null && this.filter.filterAllRemaining();
5874 }
5875
5876 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5877 throws IOException {
5878 if (!results.isEmpty()) {
5879 throw new IllegalArgumentException("First parameter should be an empty list");
5880 }
5881 if (scannerContext == null) {
5882 throw new IllegalArgumentException("Scanner context cannot be null");
5883 }
5884 RpcCallContext rpcCall = RpcServer.getCurrentCall();
5885
5886
5887
5888
5889 int initialBatchProgress = scannerContext.getBatchProgress();
5890 long initialSizeProgress = scannerContext.getSizeProgress();
5891 long initialTimeProgress = scannerContext.getTimeProgress();
5892
5893
5894
5895
5896
5897
5898 while (true) {
5899
5900
5901 if (scannerContext.getKeepProgress()) {
5902
5903 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5904 initialTimeProgress);
5905 } else {
5906 scannerContext.clearProgress();
5907 }
5908
5909 if (rpcCall != null) {
5910
5911
5912
5913
5914 long afterTime = rpcCall.disconnectSince();
5915 if (afterTime >= 0) {
5916 throw new CallerDisconnectedException(
5917 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5918 this + " after " + afterTime + " ms, since " +
5919 "caller disconnected");
5920 }
5921 }
5922
5923
5924 Cell current = this.storeHeap.peek();
5925
5926 byte[] currentRow = null;
5927 int offset = 0;
5928 short length = 0;
5929 if (current != null) {
5930 currentRow = current.getRowArray();
5931 offset = current.getRowOffset();
5932 length = current.getRowLength();
5933 }
5934
5935 boolean stopRow = isStopRow(currentRow, offset, length);
5936
5937
5938
5939
5940 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5941
5942
5943
5944
5945
5946 if (hasFilterRow) {
5947 if (LOG.isTraceEnabled()) {
5948 LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
5949 + " formed. Changing scope of limits that may create partials");
5950 }
5951 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
5952 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
5953 }
5954
5955
5956
5957 if (joinedContinuationRow == null) {
5958
5959 if (stopRow) {
5960 if (hasFilterRow) {
5961 filter.filterRowCells(results);
5962 }
5963 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5964 }
5965
5966
5967
5968 if (filterRowKey(currentRow, offset, length)) {
5969 incrementCountOfRowsFilteredMetric(scannerContext);
5970
5971 if (isFilterDoneInternal()) {
5972 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5973 }
5974
5975
5976
5977 incrementCountOfRowsScannedMetric(scannerContext);
5978 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
5979 if (!moreRows) {
5980 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5981 }
5982 results.clear();
5983 continue;
5984 }
5985
5986
5987 populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
5988
5989 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5990 if (hasFilterRow) {
5991 throw new IncompatibleFilterException(
5992 "Filter whose hasFilterRow() returns true is incompatible with scans that must "
5993 + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
5994 }
5995 return true;
5996 }
5997
5998 Cell nextKv = this.storeHeap.peek();
5999 stopRow = nextKv == null ||
6000 isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
6001
6002 final boolean isEmptyRow = results.isEmpty();
6003
6004
6005
6006 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
6007 if (hasFilterRow) {
6008 ret = filter.filterRowCellsWithRet(results);
6009
6010
6011
6012
6013 long timeProgress = scannerContext.getTimeProgress();
6014 if (scannerContext.getKeepProgress()) {
6015 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
6016 initialTimeProgress);
6017 } else {
6018 scannerContext.clearProgress();
6019 }
6020 scannerContext.setTimeProgress(timeProgress);
6021 scannerContext.incrementBatchProgress(results.size());
6022 for (Cell cell : results) {
6023 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
6024 }
6025 }
6026
6027 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
6028 incrementCountOfRowsFilteredMetric(scannerContext);
6029 results.clear();
6030 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
6031 if (!moreRows) {
6032 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6033 }
6034
6035
6036
6037 if (!stopRow) continue;
6038 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6039 }
6040
6041
6042
6043
6044
6045 if (this.joinedHeap != null) {
6046 boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
6047 if (mayHaveData) {
6048 joinedContinuationRow = current;
6049 populateFromJoinedHeap(results, scannerContext);
6050
6051 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
6052 return true;
6053 }
6054 }
6055 }
6056 } else {
6057
6058 populateFromJoinedHeap(results, scannerContext);
6059 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
6060 return true;
6061 }
6062 }
6063
6064
6065 if (joinedContinuationRow != null) {
6066 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
6067 }
6068
6069
6070
6071
6072 if (results.isEmpty()) {
6073 incrementCountOfRowsFilteredMetric(scannerContext);
6074 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
6075 if (!moreRows) {
6076 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6077 }
6078 if (!stopRow) continue;
6079 }
6080
6081 if (stopRow) {
6082 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6083 } else {
6084 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
6085 }
6086 }
6087 }
6088
6089 protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
6090 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
6091
6092 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
6093 }
6094
6095 protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
6096 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
6097
6098 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
6099 }
6100
6101
6102
6103
6104
6105
6106
6107
6108 private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
6109 throws IOException {
6110 Cell nextJoinedKv = joinedHeap.peek();
6111 boolean matchCurrentRow =
6112 nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
6113 boolean matchAfterSeek = false;
6114
6115
6116
6117 if (!matchCurrentRow) {
6118 Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
6119 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
6120 matchAfterSeek =
6121 seekSuccessful && joinedHeap.peek() != null
6122 && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
6123 }
6124
6125 return matchCurrentRow || matchAfterSeek;
6126 }
6127
6128
6129
6130
6131
6132
6133
6134
6135 private boolean filterRow() throws IOException {
6136
6137
6138 return filter != null && (!filter.hasFilterRow())
6139 && filter.filterRow();
6140 }
6141
6142 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
6143 return filter != null
6144 && filter.filterRowKey(row, offset, length);
6145 }
6146
6147 protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
6148 short length) throws IOException {
6149 assert this.joinedContinuationRow == null:
6150 "Trying to go to next row during joinedHeap read.";
6151 Cell next;
6152 while ((next = this.storeHeap.peek()) != null &&
6153 CellUtil.matchingRow(next, currentRow, offset, length)) {
6154 this.storeHeap.next(MOCKED_LIST);
6155 }
6156 resetFilters();
6157
6158
6159 return this.region.getCoprocessorHost() == null
6160 || this.region.getCoprocessorHost()
6161 .postScannerFilterRow(this, currentRow, offset, length);
6162 }
6163
6164 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
6165 return currentRow == null ||
6166 (stopRow != null &&
6167 comparator.compareRows(stopRow, 0, stopRow.length,
6168 currentRow, offset, length) <= isScan);
6169 }
6170
6171 @Override
6172 public synchronized void close() {
6173 if (storeHeap != null) {
6174 storeHeap.close();
6175 storeHeap = null;
6176 }
6177 if (joinedHeap != null) {
6178 joinedHeap.close();
6179 joinedHeap = null;
6180 }
6181
6182 scannerReadPoints.remove(this);
6183 this.filterClosed = true;
6184 }
6185
6186 KeyValueHeap getStoreHeapForTesting() {
6187 return storeHeap;
6188 }
6189
6190 @Override
6191 public synchronized boolean reseek(byte[] row) throws IOException {
6192 if (row == null) {
6193 throw new IllegalArgumentException("Row cannot be null.");
6194 }
6195 boolean result = false;
6196 startRegionOperation();
6197 KeyValue kv = KeyValueUtil.createFirstOnRow(row);
6198 try {
6199
6200 result = this.storeHeap.requestSeek(kv, true, true);
6201 if (this.joinedHeap != null) {
6202 result = this.joinedHeap.requestSeek(kv, true, true) || result;
6203 }
6204 } catch (FileNotFoundException e) {
6205 throw handleFileNotFound(e);
6206 } finally {
6207 closeRegionOperation();
6208 }
6209 return result;
6210 }
6211
6212 private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
6213
6214
6215 try {
6216 region.refreshStoreFiles(true);
6217 return new IOException("unable to read store file");
6218 } catch (IOException e) {
6219 String msg = "a store file got lost: " + fnfe.getMessage();
6220 LOG.error(msg);
6221 LOG.error("unable to refresh store files", e);
6222 abortRegionServer(msg);
6223 return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing");
6224 }
6225 }
6226
6227 private void abortRegionServer(String msg) throws IOException {
6228 if (rsServices instanceof HRegionServer) {
6229 ((HRegionServer)rsServices).abort(msg);
6230 }
6231 throw new UnsupportedOperationException("not able to abort RS after: " + msg);
6232 }
6233 }
6234
6235
6236
6237
6238
6239
6240
6241
6242
6243
6244
6245
6246
6247
6248
6249
6250
6251
6252
6253
6254 static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
6255 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
6256 RegionServerServices rsServices) {
6257 try {
6258 @SuppressWarnings("unchecked")
6259 Class<? extends HRegion> regionClass =
6260 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
6261
6262 Constructor<? extends HRegion> c =
6263 regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
6264 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
6265 RegionServerServices.class);
6266
6267 return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
6268 } catch (Throwable e) {
6269
6270 throw new IllegalStateException("Could not instantiate a region instance.", e);
6271 }
6272 }
6273
6274
6275
6276
6277
6278
6279
6280
6281
6282
6283
6284
6285
6286
6287
6288
6289
6290 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6291 final Configuration conf, final HTableDescriptor hTableDescriptor)
6292 throws IOException {
6293 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
6294 }
6295
6296
6297
6298
6299
6300
6301
6302
6303
6304
6305
6306 public static void closeHRegion(final HRegion r) throws IOException {
6307 if (r == null) return;
6308 r.close();
6309 if (r.getWAL() == null) return;
6310 r.getWAL().close();
6311 }
6312
6313
6314
6315
6316
6317
6318
6319
6320
6321
6322
6323
6324
6325
6326 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6327 final Configuration conf,
6328 final HTableDescriptor hTableDescriptor,
6329 final WAL wal,
6330 final boolean initialize)
6331 throws IOException {
6332 return createHRegion(info, rootDir, conf, hTableDescriptor,
6333 wal, initialize, false);
6334 }
6335
6336
6337
6338
6339
6340
6341
6342
6343
6344
6345
6346
6347
6348
6349
6350 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6351 final Configuration conf,
6352 final HTableDescriptor hTableDescriptor,
6353 final WAL wal,
6354 final boolean initialize, final boolean ignoreWAL)
6355 throws IOException {
6356 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6357 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
6358 ignoreWAL);
6359 }
6360
6361
6362
6363
6364
6365
6366
6367
6368
6369
6370
6371
6372
6373
6374
6375
6376 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6377 final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor,
6378 final WAL wal, final boolean initialize, final boolean ignoreWAL)
6379 throws IOException {
6380 LOG.info("creating HRegion " + info.getTable().getNameAsString()
6381 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6382 " Table name == " + info.getTable().getNameAsString());
6383 FileSystem fs = FileSystem.get(conf);
6384 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6385 WAL effectiveWAL = wal;
6386 if (wal == null && !ignoreWAL) {
6387
6388
6389
6390 Configuration confForWAL = new Configuration(conf);
6391 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
6392 effectiveWAL = (new WALFactory(confForWAL,
6393 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
6394 "hregion-" + RandomStringUtils.randomNumeric(8))).
6395 getWAL(info.getEncodedNameAsBytes());
6396 }
6397 HRegion region = HRegion.newHRegion(tableDir,
6398 effectiveWAL, fs, conf, info, hTableDescriptor, null);
6399 if (initialize) region.initialize(null);
6400 return region;
6401 }
6402
6403 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6404 final Configuration conf,
6405 final HTableDescriptor hTableDescriptor,
6406 final WAL wal)
6407 throws IOException {
6408 return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6409 }
6410
6411
6412
6413
6414
6415
6416
6417
6418
6419
6420
6421
6422
6423 public static HRegion openHRegion(final HRegionInfo info,
6424 final HTableDescriptor htd, final WAL wal,
6425 final Configuration conf)
6426 throws IOException {
6427 return openHRegion(info, htd, wal, conf, null, null);
6428 }
6429
6430
6431
6432
6433
6434
6435
6436
6437
6438
6439
6440
6441
6442
6443
6444
6445 public static HRegion openHRegion(final HRegionInfo info,
6446 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6447 final RegionServerServices rsServices,
6448 final CancelableProgressable reporter)
6449 throws IOException {
6450 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6451 }
6452
6453
6454
6455
6456
6457
6458
6459
6460
6461
6462
6463
6464
6465
6466 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6467 final HTableDescriptor htd, final WAL wal, final Configuration conf)
6468 throws IOException {
6469 return openHRegion(rootDir, info, htd, wal, conf, null, null);
6470 }
6471
6472
6473
6474
6475
6476
6477
6478
6479
6480
6481
6482
6483
6484
6485
6486
6487 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6488 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6489 final RegionServerServices rsServices,
6490 final CancelableProgressable reporter)
6491 throws IOException {
6492 FileSystem fs = null;
6493 if (rsServices != null) {
6494 fs = rsServices.getFileSystem();
6495 }
6496 if (fs == null) {
6497 fs = FileSystem.get(conf);
6498 }
6499 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6500 }
6501
6502
6503
6504
6505
6506
6507
6508
6509
6510
6511
6512
6513
6514
6515
6516 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6517 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6518 throws IOException {
6519 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6520 }
6521
6522
6523
6524
6525
6526
6527
6528
6529
6530
6531
6532
6533
6534
6535
6536
6537
6538 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6539 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6540 final RegionServerServices rsServices, final CancelableProgressable reporter)
6541 throws IOException {
6542 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6543 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6544 }
6545
6546
6547
6548
6549
6550
6551
6552
6553
6554
6555
6556
6557
6558
6559
6560
6561
6562 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6563 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6564 final WAL wal, final RegionServerServices rsServices,
6565 final CancelableProgressable reporter)
6566 throws IOException {
6567 if (info == null) throw new NullPointerException("Passed region info is null");
6568 if (LOG.isDebugEnabled()) {
6569 LOG.debug("Opening region: " + info);
6570 }
6571 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6572 return r.openHRegion(reporter);
6573 }
6574
6575
6576
6577
6578
6579
6580
6581
6582
6583 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6584 throws IOException {
6585 HRegionFileSystem regionFs = other.getRegionFileSystem();
6586 HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6587 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6588 return r.openHRegion(reporter);
6589 }
6590
6591 public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6592 throws IOException {
6593 return openHRegion((HRegion)other, reporter);
6594 }
6595
6596
6597
6598
6599
6600
6601
6602 protected HRegion openHRegion(final CancelableProgressable reporter)
6603 throws IOException {
6604
6605 checkCompressionCodecs();
6606
6607
6608 checkEncryption();
6609
6610 checkClassLoading();
6611 this.openSeqNum = initialize(reporter);
6612 this.mvcc.advanceTo(openSeqNum);
6613 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6614 && !recovering) {
6615
6616
6617
6618 writeRegionOpenMarker(wal, openSeqNum);
6619 }
6620 return this;
6621 }
6622
6623 public static void warmupHRegion(final HRegionInfo info,
6624 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6625 final RegionServerServices rsServices,
6626 final CancelableProgressable reporter)
6627 throws IOException {
6628
6629 if (info == null) throw new NullPointerException("Passed region info is null");
6630
6631 if (LOG.isDebugEnabled()) {
6632 LOG.debug("HRegion.Warming up region: " + info);
6633 }
6634
6635 Path rootDir = FSUtils.getRootDir(conf);
6636 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6637
6638 FileSystem fs = null;
6639 if (rsServices != null) {
6640 fs = rsServices.getFileSystem();
6641 }
6642 if (fs == null) {
6643 fs = FileSystem.get(conf);
6644 }
6645
6646 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
6647 r.initializeWarmup(reporter);
6648 }
6649
6650
6651 private void checkCompressionCodecs() throws IOException {
6652 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6653 CompressionTest.testCompression(fam.getCompression());
6654 CompressionTest.testCompression(fam.getCompactionCompression());
6655 }
6656 }
6657
6658 private void checkEncryption() throws IOException {
6659 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6660 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6661 }
6662 }
6663
6664 private void checkClassLoading() throws IOException {
6665 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6666 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6667 }
6668
6669
6670
6671
6672
6673
6674 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6675
6676 fs.commitDaughterRegion(hri);
6677
6678
6679 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6680 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6681 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6682 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6683 return r;
6684 }
6685
6686
6687
6688
6689
6690
6691
6692 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6693 final HRegion region_b) throws IOException {
6694 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6695 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6696 this.getTableDesc(), this.rsServices);
6697 r.readRequestsCount.set(this.getReadRequestsCount()
6698 + region_b.getReadRequestsCount());
6699 r.writeRequestsCount.set(this.getWriteRequestsCount()
6700
6701 + region_b.getWriteRequestsCount());
6702 this.fs.commitMergedRegion(mergedRegionInfo);
6703 return r;
6704 }
6705
6706
6707
6708
6709
6710
6711
6712
6713
6714
6715
6716
6717 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6718 meta.checkResources();
6719
6720 byte[] row = r.getRegionInfo().getRegionName();
6721 final long now = EnvironmentEdgeManager.currentTime();
6722 final List<Cell> cells = new ArrayList<Cell>(2);
6723 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6724 HConstants.REGIONINFO_QUALIFIER, now,
6725 r.getRegionInfo().toByteArray()));
6726
6727 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6728 HConstants.META_VERSION_QUALIFIER, now,
6729 Bytes.toBytes(HConstants.META_VERSION)));
6730 meta.put(row, HConstants.CATALOG_FAMILY, cells);
6731 }
6732
6733
6734
6735
6736
6737
6738
6739
6740
6741 @Deprecated
6742 public static Path getRegionDir(final Path tabledir, final String name) {
6743 return new Path(tabledir, name);
6744 }
6745
6746
6747
6748
6749
6750
6751
6752
6753
6754 @Deprecated
6755 @VisibleForTesting
6756 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6757 return new Path(
6758 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6759 }
6760
6761
6762
6763
6764
6765
6766
6767
6768
6769 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6770 return ((info.getStartKey().length == 0) ||
6771 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6772 ((info.getEndKey().length == 0) ||
6773 (Bytes.compareTo(info.getEndKey(), row) > 0));
6774 }
6775
6776 public static boolean rowIsInRange(HRegionInfo info, final byte [] row, final int offset,
6777 final short length) {
6778 return ((info.getStartKey().length == 0) ||
6779 (Bytes.compareTo(info.getStartKey(), 0, info.getStartKey().length,
6780 row, offset, length) <= 0)) &&
6781 ((info.getEndKey().length == 0) ||
6782 (Bytes.compareTo(info.getEndKey(), 0, info.getEndKey().length, row, offset, length) > 0));
6783 }
6784
6785
6786
6787
6788
6789
6790
6791 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6792 throws IOException {
6793 HRegion a = srcA;
6794 HRegion b = srcB;
6795
6796
6797
6798 if (srcA.getRegionInfo().getStartKey() == null) {
6799 if (srcB.getRegionInfo().getStartKey() == null) {
6800 throw new IOException("Cannot merge two regions with null start key");
6801 }
6802
6803 } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6804 (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6805 srcB.getRegionInfo().getStartKey()) > 0)) {
6806 a = srcB;
6807 b = srcA;
6808 }
6809
6810 if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6811 b.getRegionInfo().getStartKey()) == 0)) {
6812 throw new IOException("Cannot merge non-adjacent regions");
6813 }
6814 return merge(a, b);
6815 }
6816
6817
6818
6819
6820
6821
6822
6823
6824
6825 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6826 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6827 throw new IOException("Regions do not belong to the same table");
6828 }
6829
6830 FileSystem fs = a.getRegionFileSystem().getFileSystem();
6831
6832 a.flush(true);
6833 b.flush(true);
6834
6835
6836 a.compact(true);
6837 if (LOG.isDebugEnabled()) {
6838 LOG.debug("Files for region: " + a);
6839 a.getRegionFileSystem().logFileSystemState(LOG);
6840 }
6841 b.compact(true);
6842 if (LOG.isDebugEnabled()) {
6843 LOG.debug("Files for region: " + b);
6844 b.getRegionFileSystem().logFileSystemState(LOG);
6845 }
6846
6847 RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6848 if (!rmt.prepare(null)) {
6849 throw new IOException("Unable to merge regions " + a + " and " + b);
6850 }
6851 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6852 LOG.info("starting merge of regions: " + a + " and " + b
6853 + " into new region " + mergedRegionInfo.getRegionNameAsString()
6854 + " with start key <"
6855 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6856 + "> and end key <"
6857 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6858 HRegion dstRegion;
6859 try {
6860 dstRegion = (HRegion)rmt.execute(null, null);
6861 } catch (IOException ioe) {
6862 rmt.rollback(null, null);
6863 throw new IOException("Failed merging region " + a + " and " + b
6864 + ", and successfully rolled back");
6865 }
6866 dstRegion.compact(true);
6867
6868 if (LOG.isDebugEnabled()) {
6869 LOG.debug("Files for new region");
6870 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6871 }
6872
6873 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6874 throw new IOException("Merged region " + dstRegion
6875 + " still has references after the compaction, is compaction canceled?");
6876 }
6877
6878
6879 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6880
6881 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6882
6883 LOG.info("merge completed. New region is " + dstRegion);
6884 return dstRegion;
6885 }
6886
6887 @Override
6888 public Result get(final Get get) throws IOException {
6889 checkRow(get.getRow(), "Get");
6890
6891 if (get.hasFamilies()) {
6892 for (byte [] family: get.familySet()) {
6893 checkFamily(family);
6894 }
6895 } else {
6896 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6897 get.addFamily(family);
6898 }
6899 }
6900 List<Cell> results = get(get, true);
6901 boolean stale = this.getRegionInfo().getReplicaId() != 0;
6902 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6903 }
6904
6905 private Scan buildScanForGetWithClosestRowBefore(Get get) throws IOException {
6906 Scan scan = new Scan().setStartRow(get.getRow())
6907 .addFamily(get.getFamilyMap().keySet().iterator().next()).setReversed(true)
6908 .setStopRow(HConstants.EMPTY_END_ROW);
6909 if (this.getRegionInfo().isMetaRegion()) {
6910 int delimiterIdx =
6911 KeyValue.getDelimiter(get.getRow(), 0, get.getRow().length, HConstants.DELIMITER);
6912 if (delimiterIdx >= 0) {
6913 scan.setFilter(new PrefixFilter(Bytes.copy(get.getRow(), 0, delimiterIdx + 1)));
6914 }
6915 }
6916 return scan;
6917 }
6918
6919 @Override
6920 public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6921
6922 List<Cell> results = new ArrayList<Cell>();
6923
6924
6925 if (withCoprocessor && (coprocessorHost != null)) {
6926 if (coprocessorHost.preGet(get, results)) {
6927 return results;
6928 }
6929 }
6930
6931 Scan scan;
6932 if (get.isClosestRowBefore()) {
6933 scan = buildScanForGetWithClosestRowBefore(get);
6934 } else {
6935 scan = new Scan(get);
6936 }
6937
6938 if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
6939 scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
6940 }
6941 RegionScanner scanner = null;
6942 try {
6943 scanner = getScanner(scan);
6944 scanner.next(results);
6945 } finally {
6946 if (scanner != null)
6947 scanner.close();
6948 }
6949
6950
6951 if (withCoprocessor && (coprocessorHost != null)) {
6952 coprocessorHost.postGet(get, results);
6953 }
6954
6955
6956 if (this.metricsRegion != null) {
6957 long totalSize = 0L;
6958 for (Cell cell : results) {
6959 totalSize += CellUtil.estimatedSerializedSizeOf(cell);
6960 }
6961 this.metricsRegion.updateGet(totalSize);
6962 }
6963
6964 return results;
6965 }
6966
6967 @Override
6968 public void mutateRow(RowMutations rm) throws IOException {
6969
6970 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6971 }
6972
6973
6974
6975
6976
6977 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6978 Collection<byte[]> rowsToLock) throws IOException {
6979 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6980 }
6981
6982
6983
6984
6985
6986
6987
6988
6989
6990
6991
6992
6993
6994 @Override
6995 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6996 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6997 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6998 processRowsWithLocks(proc, -1, nonceGroup, nonce);
6999 }
7000
7001
7002
7003
7004 public ClientProtos.RegionLoadStats getRegionStats() {
7005 if (!regionStatsEnabled) {
7006 return null;
7007 }
7008 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
7009 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
7010 .memstoreFlushSize)));
7011 if (rsServices.getHeapMemoryManager() != null) {
7012
7013
7014
7015
7016 final float occupancy = rsServices.getHeapMemoryManager().getHeapOccupancyPercent();
7017 if (occupancy != HeapMemoryManager.HEAP_OCCUPANCY_ERROR_VALUE) {
7018 stats.setHeapOccupancy((int)(occupancy * 100));
7019 }
7020 }
7021 stats.setCompactionPressure((int) (rsServices.getCompactionPressure() * 100 > 100 ? 100
7022 : rsServices.getCompactionPressure() * 100));
7023 return stats.build();
7024 }
7025
7026 @Override
7027 public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
7028 processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
7029 HConstants.NO_NONCE);
7030 }
7031
7032 @Override
7033 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
7034 throws IOException {
7035 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
7036 }
7037
7038 @Override
7039 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
7040 long nonceGroup, long nonce) throws IOException {
7041
7042 for (byte[] row : processor.getRowsToLock()) {
7043 checkRow(row, "processRowsWithLocks");
7044 }
7045 if (!processor.readOnly()) {
7046 checkReadOnly();
7047 }
7048 checkResources();
7049
7050 startRegionOperation();
7051 WALEdit walEdit = new WALEdit();
7052
7053
7054 try {
7055 processor.preProcess(this, walEdit);
7056 } catch (IOException e) {
7057 closeRegionOperation();
7058 throw e;
7059 }
7060
7061 if (processor.readOnly()) {
7062 try {
7063 long now = EnvironmentEdgeManager.currentTime();
7064 doProcessRowWithTimeout(
7065 processor, now, this, null, null, timeout);
7066 processor.postProcess(this, walEdit, true);
7067 } finally {
7068 closeRegionOperation();
7069 }
7070 return;
7071 }
7072
7073 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
7074 boolean locked = false;
7075 boolean walSyncSuccessful = false;
7076 List<RowLock> acquiredRowLocks = null;
7077 long addedSize = 0;
7078 List<Mutation> mutations = new ArrayList<Mutation>();
7079 Collection<byte[]> rowsToLock = processor.getRowsToLock();
7080 long mvccNum = 0;
7081 WALKey walKey = null;
7082 try {
7083 try {
7084
7085 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
7086 for (byte[] row : rowsToLock) {
7087
7088
7089 acquiredRowLocks.add(getRowLock(row));
7090 }
7091
7092 lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
7093 locked = true;
7094
7095 long now = EnvironmentEdgeManager.currentTime();
7096
7097
7098 doProcessRowWithTimeout(
7099 processor, now, this, mutations, walEdit, timeout);
7100
7101 if (!mutations.isEmpty()) {
7102
7103
7104 processor.preBatchMutate(this, walEdit);
7105
7106 long txid = 0;
7107
7108 if (!walEdit.isEmpty()) {
7109
7110 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7111 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
7112 processor.getClusterIds(), nonceGroup, nonce, mvcc);
7113 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7114 walKey, walEdit, true);
7115 }
7116 if(walKey == null){
7117
7118
7119 walKey = this.appendEmptyEdit(this.wal);
7120 }
7121
7122
7123 writeEntry = walKey.getWriteEntry();
7124 mvccNum = walKey.getSequenceId();
7125
7126
7127
7128
7129 for (Mutation m : mutations) {
7130
7131 rewriteCellTags(m.getFamilyCellMap(), m);
7132
7133 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
7134 Cell cell = cellScanner.current();
7135 CellUtil.setSequenceId(cell, mvccNum);
7136 Store store = getStore(cell);
7137 if (store == null) {
7138 checkFamily(CellUtil.cloneFamily(cell));
7139
7140 }
7141 addedSize += store.add(cell);
7142 }
7143 }
7144
7145
7146 if (locked) {
7147 this.updatesLock.readLock().unlock();
7148 locked = false;
7149 }
7150
7151
7152 releaseRowLocks(acquiredRowLocks);
7153
7154
7155 if (txid != 0) {
7156 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
7157 }
7158 walSyncSuccessful = true;
7159
7160 processor.postBatchMutate(this);
7161 }
7162 } finally {
7163
7164
7165
7166 if (!mutations.isEmpty() && !walSyncSuccessful) {
7167 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
7168 " memstore keyvalues" + (processor.getRowsToLock().isEmpty() ? "" :
7169 (" for row(s):" + StringUtils.byteToHexString(
7170 processor.getRowsToLock().iterator().next()) + "...")));
7171 for (Mutation m : mutations) {
7172 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
7173 Cell cell = cellScanner.current();
7174 getStore(cell).rollback(cell);
7175 }
7176 }
7177 if (writeEntry != null) {
7178 mvcc.complete(writeEntry);
7179 writeEntry = null;
7180 }
7181 }
7182
7183 if (writeEntry != null) {
7184 mvcc.completeAndWait(writeEntry);
7185 }
7186 if (locked) {
7187 this.updatesLock.readLock().unlock();
7188 }
7189
7190 releaseRowLocks(acquiredRowLocks);
7191 }
7192
7193
7194 processor.postProcess(this, walEdit, walSyncSuccessful);
7195
7196 } finally {
7197 closeRegionOperation();
7198 if (!mutations.isEmpty() &&
7199 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
7200 requestFlush();
7201 }
7202 }
7203 }
7204
7205 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
7206 final long now,
7207 final HRegion region,
7208 final List<Mutation> mutations,
7209 final WALEdit walEdit,
7210 final long timeout) throws IOException {
7211
7212 if (timeout < 0) {
7213 try {
7214 processor.process(now, region, mutations, walEdit);
7215 } catch (IOException e) {
7216 String row = processor.getRowsToLock().isEmpty() ? "" :
7217 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7218 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7219 " throws Exception" + row, e);
7220 throw e;
7221 }
7222 return;
7223 }
7224
7225
7226 FutureTask<Void> task =
7227 new FutureTask<Void>(new Callable<Void>() {
7228 @Override
7229 public Void call() throws IOException {
7230 try {
7231 processor.process(now, region, mutations, walEdit);
7232 return null;
7233 } catch (IOException e) {
7234 String row = processor.getRowsToLock().isEmpty() ? "" :
7235 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7236 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7237 " throws Exception" + row, e);
7238 throw e;
7239 }
7240 }
7241 });
7242 rowProcessorExecutor.execute(task);
7243 try {
7244 task.get(timeout, TimeUnit.MILLISECONDS);
7245 } catch (TimeoutException te) {
7246 String row = processor.getRowsToLock().isEmpty() ? "" :
7247 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7248 LOG.error("RowProcessor timeout:" + timeout + " ms" + row);
7249 throw new IOException(te);
7250 } catch (Exception e) {
7251 throw new IOException(e);
7252 }
7253 }
7254
7255
7256
7257
7258
7259
7260 private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
7261 if (cell.getTagsLength() <= 0) return tags;
7262 List<Tag> newTags = tags == null? new ArrayList<Tag>():
7263 Iterator<Tag> i =
7264 CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
7265 while (i.hasNext()) newTags.add(i.next());
7266 return newTags;
7267 }
7268
7269
7270
7271
7272
7273
7274
7275
7276
7277
7278 private List<Cell> doGet(final Store store, final byte [] row,
7279 final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
7280 throws IOException {
7281
7282
7283
7284
7285 Collections.sort(family.getValue(), store.getComparator());
7286
7287 Get get = new Get(row);
7288 for (Cell cell : family.getValue()) {
7289 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
7290 }
7291 if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
7292 return get(get, false);
7293 }
7294
7295 public Result append(Append append) throws IOException {
7296 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
7297 }
7298
7299
7300
7301
7302
7303
7304
7305
7306 @SuppressWarnings("unchecked")
7307 private void dropMemstoreContents() throws IOException {
7308 long totalFreedSize = 0;
7309 while (!storeSeqIds.isEmpty()) {
7310 Map<Store, Long> map = null;
7311 synchronized (storeSeqIds) {
7312 if (storeSeqIds.isEmpty()) break;
7313 map = storeSeqIds.remove(storeSeqIds.size()-1);
7314 }
7315 for (Map.Entry<Store, Long> entry : map.entrySet()) {
7316
7317 totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
7318 }
7319 }
7320 if (totalFreedSize > 0) {
7321 LOG.debug("Freed " + totalFreedSize + " bytes from memstore");
7322 }
7323 }
7324
7325 @Override
7326 public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
7327 Operation op = Operation.APPEND;
7328 byte[] row = mutate.getRow();
7329 checkRow(row, op.toString());
7330 boolean flush = false;
7331 Durability durability = getEffectiveDurability(mutate.getDurability());
7332 boolean writeToWAL = durability != Durability.SKIP_WAL;
7333 WALEdit walEdits = null;
7334 List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
7335 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
7336 long size = 0;
7337 long txid = 0;
7338 checkReadOnly();
7339 checkResources();
7340
7341 startRegionOperation(op);
7342 this.writeRequestsCount.increment();
7343 RowLock rowLock = null;
7344 WALKey walKey = null;
7345 boolean doRollBackMemstore = false;
7346 try {
7347 rowLock = getRowLock(row);
7348 assert rowLock != null;
7349 try {
7350 lock(this.updatesLock.readLock());
7351 try {
7352
7353
7354 mvcc.await();
7355 if (this.coprocessorHost != null) {
7356 Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);
7357 if (r!= null) {
7358 return r;
7359 }
7360 }
7361 long now = EnvironmentEdgeManager.currentTime();
7362
7363 for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {
7364 Store store = stores.get(family.getKey());
7365 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
7366
7367 List<Cell> results = doGet(store, row, family, null);
7368
7369
7370
7371
7372
7373
7374
7375 int idx = 0;
7376 for (Cell cell : family.getValue()) {
7377 Cell newCell;
7378 Cell oldCell = null;
7379 if (idx < results.size()
7380 && CellUtil.matchingQualifier(results.get(idx), cell)) {
7381 oldCell = results.get(idx);
7382 long ts = Math.max(now, oldCell.getTimestamp() + 1);
7383
7384
7385
7386 List<Tag> tags = Tag.carryForwardTags(null, oldCell);
7387 tags = Tag.carryForwardTags(tags, cell);
7388 tags = carryForwardTTLTag(tags, mutate);
7389
7390 newCell = getNewCell(row, ts, cell, oldCell, Tag.fromList(tags));
7391
7392 idx++;
7393 } else {
7394
7395 CellUtil.updateLatestStamp(cell, now);
7396
7397
7398 newCell = getNewCell(mutate, cell);
7399 }
7400
7401
7402 if (coprocessorHost != null) {
7403 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7404 mutate, oldCell, newCell);
7405 }
7406 kvs.add(newCell);
7407
7408
7409 if (writeToWAL) {
7410 if (walEdits == null) {
7411 walEdits = new WALEdit();
7412 }
7413 walEdits.add(newCell);
7414 }
7415 }
7416
7417
7418 tempMemstore.put(store, kvs);
7419 }
7420
7421
7422 if (walEdits != null && !walEdits.isEmpty()) {
7423 if (writeToWAL) {
7424
7425
7426
7427
7428 walKey = new HLogKey(
7429 getRegionInfo().getEncodedNameAsBytes(),
7430 this.htableDescriptor.getTableName(),
7431 WALKey.NO_SEQUENCE_ID,
7432 nonceGroup,
7433 nonce,
7434 mvcc);
7435 txid =
7436 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
7437 } else {
7438 recordMutationWithoutWal(mutate.getFamilyCellMap());
7439 }
7440 }
7441 if (walKey == null) {
7442
7443 walKey = this.appendEmptyEdit(this.wal);
7444 }
7445
7446
7447 walKey.getWriteEntry();
7448
7449
7450 if (!tempMemstore.isEmpty()) {
7451 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7452 Store store = entry.getKey();
7453 if (store.getFamily().getMaxVersions() == 1) {
7454
7455
7456 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7457 } else {
7458
7459 for (Cell cell: entry.getValue()) {
7460
7461
7462 CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
7463 size += store.add(cell);
7464 doRollBackMemstore = true;
7465 }
7466 }
7467
7468
7469 allKVs.addAll(entry.getValue());
7470 }
7471
7472 size = this.addAndGetGlobalMemstoreSize(size);
7473 flush = isFlushSize(size);
7474 }
7475 } finally {
7476 this.updatesLock.readLock().unlock();
7477
7478
7479
7480 dropMemstoreContents();
7481 }
7482
7483 } finally {
7484 rowLock.release();
7485 rowLock = null;
7486 }
7487
7488 if(txid != 0){
7489 syncOrDefer(txid, durability);
7490 }
7491 doRollBackMemstore = false;
7492 } finally {
7493 if (rowLock != null) {
7494 rowLock.release();
7495 }
7496
7497 WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
7498 if (doRollBackMemstore) {
7499 rollbackMemstore(allKVs);
7500 if (we != null) mvcc.complete(we);
7501 } else if (we != null) {
7502 mvcc.completeAndWait(we);
7503 }
7504
7505 closeRegionOperation(op);
7506 }
7507
7508 if (this.metricsRegion != null) {
7509 this.metricsRegion.updateAppend();
7510 }
7511
7512 if (flush) {
7513
7514 requestFlush();
7515 }
7516
7517 return mutate.isReturnResults() ? Result.create(allKVs) : null;
7518 }
7519
7520 private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
7521 final Cell oldCell, final byte [] tagBytes) {
7522
7523 Cell newCell = new KeyValue(row.length, cell.getFamilyLength(),
7524 cell.getQualifierLength(), ts, KeyValue.Type.Put,
7525 oldCell.getValueLength() + cell.getValueLength(),
7526 tagBytes == null? 0: tagBytes.length);
7527
7528 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
7529 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
7530 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
7531 newCell.getFamilyArray(), newCell.getFamilyOffset(),
7532 cell.getFamilyLength());
7533 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
7534 newCell.getQualifierArray(), newCell.getQualifierOffset(),
7535 cell.getQualifierLength());
7536
7537 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
7538 newCell.getValueArray(), newCell.getValueOffset(),
7539 oldCell.getValueLength());
7540 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7541 newCell.getValueArray(),
7542 newCell.getValueOffset() + oldCell.getValueLength(),
7543 cell.getValueLength());
7544
7545 if (tagBytes != null) {
7546 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7547 tagBytes.length);
7548 }
7549 return newCell;
7550 }
7551
7552 private static Cell getNewCell(final Mutation mutate, final Cell cell) {
7553 Cell newCell = null;
7554 if (mutate.getTTL() != Long.MAX_VALUE) {
7555
7556 newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
7557 cell.getRowLength(),
7558 cell.getFamilyArray(), cell.getFamilyOffset(),
7559 cell.getFamilyLength(),
7560 cell.getQualifierArray(), cell.getQualifierOffset(),
7561 cell.getQualifierLength(),
7562 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
7563 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
7564 carryForwardTTLTag(mutate));
7565 } else {
7566 newCell = cell;
7567 }
7568 return newCell;
7569 }
7570
7571 public Result increment(Increment increment) throws IOException {
7572 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7573 }
7574
7575
7576
7577
7578
7579
7580
7581
7582
7583 @Override
7584 public Result increment(Increment mutation, long nonceGroup, long nonce)
7585 throws IOException {
7586 Operation op = Operation.INCREMENT;
7587 checkReadOnly();
7588 checkResources();
7589 checkRow(mutation.getRow(), op.toString());
7590 checkFamilies(mutation.getFamilyCellMap().keySet());
7591 startRegionOperation(op);
7592 this.writeRequestsCount.increment();
7593 try {
7594
7595
7596
7597
7598
7599
7600
7601
7602
7603
7604
7605
7606
7607
7608
7609
7610 return doIncrement(mutation, nonceGroup, nonce);
7611 } finally {
7612 if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
7613 closeRegionOperation(op);
7614 }
7615 }
7616
7617 private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
7618 RowLock rowLock = null;
7619 WALKey walKey = null;
7620 boolean doRollBackMemstore = false;
7621 long accumulatedResultSize = 0;
7622 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
7623 List<Cell> memstoreCells = new ArrayList<Cell>();
7624 Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
7625 try {
7626 rowLock = getRowLock(increment.getRow());
7627 long txid = 0;
7628 try {
7629 lock(this.updatesLock.readLock());
7630 try {
7631
7632
7633 this.mvcc.await();
7634 if (this.coprocessorHost != null) {
7635 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
7636 if (r != null) return r;
7637 }
7638 long now = EnvironmentEdgeManager.currentTime();
7639 final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
7640 WALEdit walEdits = null;
7641
7642
7643 Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
7644 for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
7645 byte [] columnFamilyName = entry.getKey();
7646 List<Cell> increments = entry.getValue();
7647 Store store = this.stores.get(columnFamilyName);
7648
7649
7650 List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
7651 sort(increments, store.getComparator()), now,
7652 MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, null);
7653 if (!results.isEmpty()) {
7654 forMemStore.put(store, results);
7655
7656 if (writeToWAL) {
7657 if (walEdits == null) walEdits = new WALEdit();
7658 walEdits.getCells().addAll(results);
7659 }
7660 }
7661 }
7662
7663 if (walEdits != null && !walEdits.isEmpty()) {
7664
7665
7666
7667 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7668 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
7669 getMVCC());
7670 txid =
7671 this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
7672 } else {
7673
7674 walKey = this.appendEmptyEdit(this.wal);
7675 }
7676
7677 walKey.getWriteEntry();
7678
7679
7680 for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
7681 Store store = entry.getKey();
7682 List<Cell> results = entry.getValue();
7683 if (store.getFamily().getMaxVersions() == 1) {
7684
7685 accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
7686
7687 } else {
7688
7689 for (Cell cell: results) {
7690
7691 CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
7692 accumulatedResultSize += store.add(cell);
7693 doRollBackMemstore = true;
7694 }
7695 }
7696 }
7697 } finally {
7698 this.updatesLock.readLock().unlock();
7699
7700
7701
7702 dropMemstoreContents();
7703 }
7704 } finally {
7705 rowLock.release();
7706 rowLock = null;
7707 }
7708
7709 if(txid != 0) {
7710 syncOrDefer(txid, durability);
7711 }
7712 doRollBackMemstore = false;
7713 } finally {
7714 if (rowLock != null) {
7715 rowLock.release();
7716 }
7717
7718 if (doRollBackMemstore) {
7719 rollbackMemstore(memstoreCells);
7720 if (walKey != null) mvcc.complete(walKey.getWriteEntry());
7721 } else {
7722 if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry());
7723 }
7724 }
7725
7726
7727 if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
7728 return increment.isReturnResults() ? Result.create(allKVs) : null;
7729 }
7730
7731
7732
7733
7734 private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
7735 Collections.sort(cells, comparator);
7736 return cells;
7737 }
7738
7739
7740
7741
7742
7743
7744
7745
7746
7747
7748
7749
7750 private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
7751 List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
7752 final IsolationLevel isolation)
7753 throws IOException {
7754 List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
7755 byte [] row = increment.getRow();
7756
7757 List<Cell> currentValues =
7758 getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
7759
7760
7761 int idx = 0;
7762 for (int i = 0; i < sortedIncrements.size(); i++) {
7763 Cell inc = sortedIncrements.get(i);
7764 long incrementAmount = getLongValue(inc);
7765
7766 boolean writeBack = (incrementAmount != 0);
7767
7768 List<Tag> tags = Tag.carryForwardTags(inc);
7769
7770 Cell currentValue = null;
7771 long ts = now;
7772 if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
7773 currentValue = currentValues.get(idx);
7774 ts = Math.max(now, currentValue.getTimestamp() + 1);
7775 incrementAmount += getLongValue(currentValue);
7776
7777 tags = Tag.carryForwardTags(tags, currentValue);
7778 if (i < (sortedIncrements.size() - 1) &&
7779 !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
7780 }
7781
7782
7783 byte [] qualifier = CellUtil.cloneQualifier(inc);
7784 byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
7785 tags = carryForwardTTLTag(tags, increment);
7786
7787 Cell newValue = new KeyValue(row, 0, row.length,
7788 columnFamilyName, 0, columnFamilyName.length,
7789 qualifier, 0, qualifier.length,
7790 ts, KeyValue.Type.Put,
7791 incrementAmountInBytes, 0, incrementAmountInBytes.length,
7792 tags);
7793
7794
7795
7796 if (mvccNum != MultiVersionConcurrencyControl.NO_WRITE_NUMBER) {
7797 CellUtil.setSequenceId(newValue, mvccNum);
7798 }
7799
7800
7801 if (coprocessorHost != null) {
7802 newValue = coprocessorHost.postMutationBeforeWAL(
7803 RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
7804 }
7805 allKVs.add(newValue);
7806 if (writeBack) {
7807 results.add(newValue);
7808 }
7809 }
7810 return results;
7811 }
7812
7813
7814
7815
7816
7817 private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
7818 int len = cell.getValueLength();
7819 if (len != Bytes.SIZEOF_LONG) {
7820
7821 throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
7822 }
7823 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
7824 }
7825
7826
7827
7828
7829
7830
7831
7832
7833
7834
7835 private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
7836 final List<Cell> increments, final IsolationLevel isolation)
7837 throws IOException {
7838 Get get = new Get(increment.getRow());
7839 if (isolation != null) get.setIsolationLevel(isolation);
7840 for (Cell cell: increments) {
7841 get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
7842 }
7843 TimeRange tr = increment.getTimeRange();
7844 if (tr != null) {
7845 get.setTimeRange(tr.getMin(), tr.getMax());
7846 }
7847 return get(get, false);
7848 }
7849
7850 private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
7851 return carryForwardTTLTag(null, mutation);
7852 }
7853
7854
7855
7856
7857 private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
7858 final Mutation mutation) {
7859 long ttl = mutation.getTTL();
7860 if (ttl == Long.MAX_VALUE) return tagsOrNull;
7861 List<Tag> tags = tagsOrNull;
7862
7863
7864
7865 if (tags == null) {
7866 tags = new ArrayList<Tag>(1);
7867 } else {
7868
7869 Iterator<Tag> tagsItr = tags.iterator();
7870 while (tagsItr.hasNext()) {
7871 Tag tag = tagsItr.next();
7872 if (tag.getType() == TagType.TTL_TAG_TYPE) {
7873 tagsItr.remove();
7874 break;
7875 }
7876 }
7877 }
7878 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
7879 return tags;
7880 }
7881
7882
7883
7884
7885
7886 private void checkFamily(final byte [] family)
7887 throws NoSuchColumnFamilyException {
7888 if (!this.htableDescriptor.hasFamily(family)) {
7889 throw new NoSuchColumnFamilyException("Column family " +
7890 Bytes.toString(family) + " does not exist in region " + this
7891 + " in table " + this.htableDescriptor);
7892 }
7893 }
7894
7895 public static final long FIXED_OVERHEAD = ClassSize.align(
7896 ClassSize.OBJECT +
7897 ClassSize.ARRAY +
7898 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7899 (14 * Bytes.SIZEOF_LONG) +
7900 5 * Bytes.SIZEOF_BOOLEAN);
7901
7902
7903
7904
7905
7906
7907
7908
7909
7910
7911
7912 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7913 ClassSize.OBJECT +
7914 (2 * ClassSize.ATOMIC_BOOLEAN) +
7915 (3 * ClassSize.ATOMIC_LONG) +
7916 (2 * ClassSize.CONCURRENT_HASHMAP) +
7917 WriteState.HEAP_SIZE +
7918 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
7919 (2 * ClassSize.REENTRANT_LOCK) +
7920 MultiVersionConcurrencyControl.FIXED_SIZE
7921 + ClassSize.TREEMAP
7922 + 2 * ClassSize.ATOMIC_INTEGER
7923 ;
7924
7925 @Override
7926 public long heapSize() {
7927 long heapSize = DEEP_OVERHEAD;
7928 for (Store store : this.stores.values()) {
7929 heapSize += store.heapSize();
7930 }
7931
7932 return heapSize;
7933 }
7934
7935
7936
7937
7938
7939 private static void printUsageAndExit(final String message) {
7940 if (message != null && message.length() > 0) System.out.println(message);
7941 System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7942 System.out.println("Options:");
7943 System.out.println(" major_compact Pass this option to major compact " +
7944 "passed region.");
7945 System.out.println("Default outputs scan of passed region.");
7946 System.exit(1);
7947 }
7948
7949 @Override
7950 public boolean registerService(Service instance) {
7951
7952
7953
7954 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7955 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
7956 LOG.error("Coprocessor service " + serviceDesc.getFullName() +
7957 " already registered, rejecting request from " + instance
7958 );
7959 return false;
7960 }
7961
7962 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
7963 if (LOG.isDebugEnabled()) {
7964 LOG.debug("Registered coprocessor service: region=" +
7965 Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7966 " service=" + serviceDesc.getFullName());
7967 }
7968 return true;
7969 }
7970
7971 @Override
7972 public Message execService(RpcController controller, CoprocessorServiceCall call)
7973 throws IOException {
7974 String serviceName = call.getServiceName();
7975 String methodName = call.getMethodName();
7976 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7977 throw new UnknownProtocolException(null,
7978 "No registered coprocessor service found for name "+serviceName+
7979 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7980 }
7981
7982 Service service = coprocessorServiceHandlers.get(serviceName);
7983 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7984 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7985 if (methodDesc == null) {
7986 throw new UnknownProtocolException(service.getClass(),
7987 "Unknown method "+methodName+" called on service "+serviceName+
7988 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7989 }
7990
7991 Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7992 ProtobufUtil.mergeFrom(builder, call.getRequest());
7993 Message request = builder.build();
7994
7995 if (coprocessorHost != null) {
7996 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7997 }
7998
7999 final Message.Builder responseBuilder =
8000 service.getResponsePrototype(methodDesc).newBuilderForType();
8001 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
8002 @Override
8003 public void run(Message message) {
8004 if (message != null) {
8005 responseBuilder.mergeFrom(message);
8006 }
8007 }
8008 });
8009
8010 if (coprocessorHost != null) {
8011 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
8012 }
8013
8014 IOException exception = ResponseConverter.getControllerException(controller);
8015 if (exception != null) {
8016 throw exception;
8017 }
8018
8019 return responseBuilder.build();
8020 }
8021
8022
8023
8024
8025
8026
8027 private static void processTable(final FileSystem fs, final Path p,
8028 final WALFactory walFactory, final Configuration c,
8029 final boolean majorCompact)
8030 throws IOException {
8031 HRegion region;
8032 FSTableDescriptors fst = new FSTableDescriptors(c);
8033
8034 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
8035 final WAL wal = walFactory.getMetaWAL(
8036 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
8037 region = HRegion.newHRegion(p, wal, fs, c,
8038 HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
8039 } else {
8040 throw new IOException("Not a known catalog table: " + p.toString());
8041 }
8042 try {
8043 region.mvcc.advanceTo(region.initialize(null));
8044 if (majorCompact) {
8045 region.compact(true);
8046 } else {
8047
8048 Scan scan = new Scan();
8049
8050 RegionScanner scanner = region.getScanner(scan);
8051 try {
8052 List<Cell> kvs = new ArrayList<Cell>();
8053 boolean done;
8054 do {
8055 kvs.clear();
8056 done = scanner.next(kvs);
8057 if (kvs.size() > 0) LOG.info(kvs);
8058 } while (done);
8059 } finally {
8060 scanner.close();
8061 }
8062 }
8063 } finally {
8064 region.close();
8065 }
8066 }
8067
8068 boolean shouldForceSplit() {
8069 return this.splitRequest;
8070 }
8071
8072 byte[] getExplicitSplitPoint() {
8073 return this.explicitSplitPoint;
8074 }
8075
8076 void forceSplit(byte[] sp) {
8077
8078
8079 this.splitRequest = true;
8080 if (sp != null) {
8081 this.explicitSplitPoint = sp;
8082 }
8083 }
8084
8085 void clearSplit() {
8086 this.splitRequest = false;
8087 this.explicitSplitPoint = null;
8088 }
8089
8090
8091
8092
8093 protected void prepareToSplit() {
8094
8095 }
8096
8097
8098
8099
8100
8101
8102
8103 public byte[] checkSplit() {
8104
8105 if (this.getRegionInfo().isMetaTable() ||
8106 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
8107 if (shouldForceSplit()) {
8108 LOG.warn("Cannot split meta region in HBase 0.20 and above");
8109 }
8110 return null;
8111 }
8112
8113
8114 if (this.isRecovering()) {
8115 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
8116 return null;
8117 }
8118
8119 if (!splitPolicy.shouldSplit()) {
8120 return null;
8121 }
8122
8123 byte[] ret = splitPolicy.getSplitPoint();
8124
8125 if (ret != null) {
8126 try {
8127 checkRow(ret, "calculated split");
8128 } catch (IOException e) {
8129 LOG.error("Ignoring invalid split", e);
8130 return null;
8131 }
8132 }
8133 return ret;
8134 }
8135
8136
8137
8138
8139 public int getCompactPriority() {
8140 int count = Integer.MAX_VALUE;
8141 for (Store store : stores.values()) {
8142 count = Math.min(count, store.getCompactPriority());
8143 }
8144 return count;
8145 }
8146
8147
8148
8149 @Override
8150 public RegionCoprocessorHost getCoprocessorHost() {
8151 return coprocessorHost;
8152 }
8153
8154
8155 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
8156 this.coprocessorHost = coprocessorHost;
8157 }
8158
8159 @Override
8160 public void startRegionOperation() throws IOException {
8161 startRegionOperation(Operation.ANY);
8162 }
8163
8164 @Override
8165 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
8166 justification="Intentional")
8167 public void startRegionOperation(Operation op) throws IOException {
8168 switch (op) {
8169 case GET:
8170 case SCAN:
8171 checkReadsEnabled();
8172 case INCREMENT:
8173 case APPEND:
8174 case SPLIT_REGION:
8175 case MERGE_REGION:
8176 case PUT:
8177 case DELETE:
8178 case BATCH_MUTATE:
8179 case COMPACT_REGION:
8180
8181 if (isRecovering() && (this.disallowWritesInRecovering ||
8182 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
8183 throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
8184 " is recovering; cannot take reads");
8185 }
8186 break;
8187 default:
8188 break;
8189 }
8190 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
8191 || op == Operation.COMPACT_REGION) {
8192
8193
8194 return;
8195 }
8196 if (this.closing.get()) {
8197 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
8198 }
8199 lock(lock.readLock());
8200 if (this.closed.get()) {
8201 lock.readLock().unlock();
8202 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
8203 }
8204 try {
8205 if (coprocessorHost != null) {
8206 coprocessorHost.postStartRegionOperation(op);
8207 }
8208 } catch (Exception e) {
8209 lock.readLock().unlock();
8210 throw new IOException(e);
8211 }
8212 }
8213
8214 @Override
8215 public void closeRegionOperation() throws IOException {
8216 closeRegionOperation(Operation.ANY);
8217 }
8218
8219
8220
8221
8222
8223
8224 public void closeRegionOperation(Operation operation) throws IOException {
8225 lock.readLock().unlock();
8226 if (coprocessorHost != null) {
8227 coprocessorHost.postCloseRegionOperation(operation);
8228 }
8229 }
8230
8231
8232
8233
8234
8235
8236
8237
8238
8239
8240 private void startBulkRegionOperation(boolean writeLockNeeded)
8241 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
8242 if (this.closing.get()) {
8243 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
8244 }
8245 if (writeLockNeeded) lock(lock.writeLock());
8246 else lock(lock.readLock());
8247 if (this.closed.get()) {
8248 if (writeLockNeeded) lock.writeLock().unlock();
8249 else lock.readLock().unlock();
8250 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
8251 }
8252 }
8253
8254
8255
8256
8257
8258 private void closeBulkRegionOperation(){
8259 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
8260 else lock.readLock().unlock();
8261 }
8262
8263
8264
8265
8266
8267 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
8268 numMutationsWithoutWAL.increment();
8269 if (numMutationsWithoutWAL.get() <= 1) {
8270 LOG.info("writing data to region " + this +
8271 " with WAL disabled. Data may be lost in the event of a crash.");
8272 }
8273
8274 long mutationSize = 0;
8275 for (List<Cell> cells: familyMap.values()) {
8276 assert cells instanceof RandomAccess;
8277 int listSize = cells.size();
8278 for (int i=0; i < listSize; i++) {
8279 Cell cell = cells.get(i);
8280
8281 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
8282 }
8283 }
8284
8285 dataInMemoryWithoutWAL.add(mutationSize);
8286 }
8287
8288 private void lock(final Lock lock)
8289 throws RegionTooBusyException, InterruptedIOException {
8290 lock(lock, 1);
8291 }
8292
8293
8294
8295
8296
8297
8298 private void lock(final Lock lock, final int multiplier)
8299 throws RegionTooBusyException, InterruptedIOException {
8300 try {
8301 final long waitTime = Math.min(maxBusyWaitDuration,
8302 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
8303 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
8304 throw new RegionTooBusyException(
8305 "failed to get a lock in " + waitTime + " ms. " +
8306 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
8307 this.getRegionInfo().getRegionNameAsString()) +
8308 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
8309 this.getRegionServerServices().getServerName()));
8310 }
8311 } catch (InterruptedException ie) {
8312 LOG.info("Interrupted while waiting for a lock");
8313 InterruptedIOException iie = new InterruptedIOException();
8314 iie.initCause(ie);
8315 throw iie;
8316 }
8317 }
8318
8319
8320
8321
8322
8323
8324
8325 private void syncOrDefer(long txid, Durability durability) throws IOException {
8326 if (this.getRegionInfo().isMetaRegion()) {
8327 this.wal.sync(txid);
8328 } else {
8329 switch(durability) {
8330 case USE_DEFAULT:
8331
8332 if (shouldSyncWAL()) {
8333 this.wal.sync(txid);
8334 }
8335 break;
8336 case SKIP_WAL:
8337
8338 break;
8339 case ASYNC_WAL:
8340
8341 break;
8342 case SYNC_WAL:
8343 case FSYNC_WAL:
8344
8345 this.wal.sync(txid);
8346 break;
8347 default:
8348 throw new RuntimeException("Unknown durability " + durability);
8349 }
8350 }
8351 }
8352
8353
8354
8355
8356 private boolean shouldSyncWAL() {
8357 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
8358 }
8359
8360
8361
8362
8363 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
8364
8365 @Override
8366 public void add(int index, Cell element) {
8367
8368 }
8369
8370 @Override
8371 public boolean addAll(int index, Collection<? extends Cell> c) {
8372 return false;
8373 }
8374
8375 @Override
8376 public KeyValue get(int index) {
8377 throw new UnsupportedOperationException();
8378 }
8379
8380 @Override
8381 public int size() {
8382 return 0;
8383 }
8384 };
8385
8386
8387
8388
8389
8390
8391
8392
8393
8394
8395 public static void main(String[] args) throws IOException {
8396 if (args.length < 1) {
8397 printUsageAndExit(null);
8398 }
8399 boolean majorCompact = false;
8400 if (args.length > 1) {
8401 if (!args[1].toLowerCase().startsWith("major")) {
8402 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
8403 }
8404 majorCompact = true;
8405 }
8406 final Path tableDir = new Path(args[0]);
8407 final Configuration c = HBaseConfiguration.create();
8408 final FileSystem fs = FileSystem.get(c);
8409 final Path logdir = new Path(c.get("hbase.tmp.dir"));
8410 final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
8411
8412 final Configuration walConf = new Configuration(c);
8413 FSUtils.setRootDir(walConf, logdir);
8414 final WALFactory wals = new WALFactory(walConf, null, logname);
8415 try {
8416 processTable(fs, tableDir, wals, c, majorCompact);
8417 } finally {
8418 wals.close();
8419
8420 BlockCache bc = new CacheConfig(c).getBlockCache();
8421 if (bc != null) bc.shutdown();
8422 }
8423 }
8424
8425 @Override
8426 public long getOpenSeqNum() {
8427 return this.openSeqNum;
8428 }
8429
8430 @Override
8431 public Map<byte[], Long> getMaxStoreSeqId() {
8432 return this.maxSeqIdInStores;
8433 }
8434
8435 @Override
8436 public long getOldestSeqIdOfStore(byte[] familyName) {
8437 return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
8438 }
8439
8440 @Override
8441 public CompactionState getCompactionState() {
8442 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
8443 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
8444 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
8445 }
8446
8447 public void reportCompactionRequestStart(boolean isMajor){
8448 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
8449 }
8450
8451 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
8452 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
8453
8454
8455 compactionsFinished.incrementAndGet();
8456 compactionNumFilesCompacted.addAndGet(numFiles);
8457 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
8458
8459 assert newValue >= 0;
8460 }
8461
8462
8463
8464
8465
8466 @VisibleForTesting
8467 public long getSequenceId() {
8468 return this.mvcc.getReadPoint();
8469 }
8470
8471
8472
8473
8474
8475
8476
8477
8478
8479 private WALKey appendEmptyEdit(final WAL wal) throws IOException {
8480
8481 @SuppressWarnings("deprecation")
8482 WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
8483 getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
8484 HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
8485
8486
8487
8488 try {
8489 wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false);
8490 } catch (Throwable t) {
8491
8492 getMVCC().complete(key.getWriteEntry());
8493 }
8494 return key;
8495 }
8496
8497
8498
8499
8500 @Override
8501 public void onConfigurationChange(Configuration conf) {
8502
8503 }
8504
8505
8506
8507
8508 @Override
8509 public void registerChildren(ConfigurationManager manager) {
8510 configurationManager = Optional.of(manager);
8511 for (Store s : this.stores.values()) {
8512 configurationManager.get().registerObserver(s);
8513 }
8514 }
8515
8516
8517
8518
8519 @Override
8520 public void deregisterChildren(ConfigurationManager manager) {
8521 for (Store s : this.stores.values()) {
8522 configurationManager.get().deregisterObserver(s);
8523 }
8524 }
8525
8526
8527
8528
8529 public RegionSplitPolicy getSplitPolicy() {
8530 return this.splitPolicy;
8531 }
8532 }